killbill-uncached

Changes

account/pom.xml 4(+4 -0)

beatrix/pom.xml 6(+3 -3)

entitlement/pom.xml 10(+10 -0)

invoice/pom.xml 4(+4 -0)

jaxrs/pom.xml 4(+4 -0)

junction/pom.xml 4(+4 -0)

overdue/pom.xml 5(+4 -1)

payment/pom.xml 4(+4 -0)

pom.xml 2(+1 -1)

server/pom.xml 4(+4 -0)

util/pom.xml 15(+15 -0)

util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java 123(+0 -123)

util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java 117(+0 -117)

util/src/main/java/com/ning/billing/util/bus/InMemoryInternalBus.java 131(+0 -131)

util/src/main/java/com/ning/billing/util/bus/PersistentBusConfig.java 48(+0 -48)

util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java 226(+0 -226)

util/src/main/java/com/ning/billing/util/config/PersistentQueueConfig.java 28(+0 -28)

util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java 138(+0 -138)

util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java 248(+0 -248)

util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java 155(+0 -155)

util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java 43(+0 -43)

util/src/main/java/com/ning/billing/util/notificationq/DefaultUUIDNotificationKey.java 66(+0 -66)

util/src/main/java/com/ning/billing/util/notificationq/Notification.java 42(+0 -42)

util/src/main/java/com/ning/billing/util/notificationq/NotificationError.java 38(+0 -38)

util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java 24(+0 -24)

util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java 99(+0 -99)

util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueConfig.java 48(+0 -48)

util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java 283(+0 -283)

util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java 97(+0 -97)

util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java 121(+0 -121)

util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java 206(+0 -206)

util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java 49(+0 -49)

util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java 30(+0 -30)

util/src/main/java/com/ning/billing/util/svcsapi/bus/InternalBus.java 103(+0 -103)

util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg 104(+0 -104)

util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg 162(+0 -162)

util/src/test/java/com/ning/billing/util/bus/TestEventBus.java 44(+0 -44)

util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java 239(+0 -239)

util/src/test/java/com/ning/billing/util/bus/TestPersistentEventBus.java 46(+0 -46)

util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java 157(+0 -157)

util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java 38(+0 -38)

util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java 64(+0 -64)

util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java 183(+0 -183)

util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java 91(+0 -91)

util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java 344(+0 -344)

util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg 21(+0 -21)

Details

account/pom.xml 4(+4 -0)

diff --git a/account/pom.xml b/account/pom.xml
index 886d9a0..e8b4697 100644
--- a/account/pom.xml
+++ b/account/pom.xml
@@ -72,6 +72,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-queue</artifactId>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
             <scope>provided</scope>
diff --git a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
index e77b523..2928870 100644
--- a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
@@ -29,6 +29,8 @@ import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
 import com.ning.billing.account.api.user.DefaultAccountChangeEvent;
 import com.ning.billing.account.api.user.DefaultAccountCreationEvent;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.util.audit.ChangeType;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContext;
@@ -44,8 +46,6 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.events.AccountChangeInternalEvent;
 import com.ning.billing.util.events.AccountCreationInternalEvent;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
-import com.ning.billing.util.svcsapi.bus.InternalBus.EventBusException;
 
 import com.google.inject.Inject;
 
@@ -53,11 +53,11 @@ public class DefaultAccountDao extends EntityDaoBase<AccountModelDao, Account, A
 
     private static final Logger log = LoggerFactory.getLogger(DefaultAccountDao.class);
 
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
-    public DefaultAccountDao(final IDBI dbi, final InternalBus eventBus, final Clock clock, final CacheControllerDispatcher cacheControllerDispatcher,
+    public DefaultAccountDao(final IDBI dbi, final PersistentBus eventBus, final Clock clock, final CacheControllerDispatcher cacheControllerDispatcher,
                              final InternalCallContextFactory internalCallContextFactory, final NonEntityDao nonEntityDao) {
         super(new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, cacheControllerDispatcher, nonEntityDao), AccountSqlDao.class);
         this.eventBus = eventBus;
@@ -88,7 +88,7 @@ public class DefaultAccountDao extends EntityDaoBase<AccountModelDao, Account, A
                                                                                            context.getAccountRecordId(),
                                                                                            context.getTenantRecordId());
         try {
-            eventBus.postFromTransaction(creationEvent, entitySqlDaoWrapperFactory, rehydratedContext);
+            eventBus.postFromTransaction(creationEvent, entitySqlDaoWrapperFactory.getSqlDao());
         } catch (final EventBusException e) {
             log.warn("Failed to post account creation event for account " + savedAccount.getId(), e);
         }
@@ -140,7 +140,7 @@ public class DefaultAccountDao extends EntityDaoBase<AccountModelDao, Account, A
                                                                                              context.getAccountRecordId(),
                                                                                              context.getTenantRecordId());
                 try {
-                    eventBus.postFromTransaction(changeEvent, entitySqlDaoWrapperFactory, context);
+                    eventBus.postFromTransaction(changeEvent, entitySqlDaoWrapperFactory.getSqlDao());
                 } catch (final EventBusException e) {
                     log.warn("Failed to post account change event for account " + accountId, e);
                 }
@@ -176,7 +176,7 @@ public class DefaultAccountDao extends EntityDaoBase<AccountModelDao, Account, A
                                                                                              context.getAccountRecordId(), context.getTenantRecordId());
 
                 try {
-                    eventBus.postFromTransaction(changeEvent, entitySqlDaoWrapperFactory, context);
+                    eventBus.postFromTransaction(changeEvent, entitySqlDaoWrapperFactory.getSqlDao());
                 } catch (final EventBusException e) {
                     log.warn("Failed to post account change event for account " + accountId, e);
                 }
diff --git a/account/src/test/java/com/ning/billing/account/AccountTestSuiteNoDB.java b/account/src/test/java/com/ning/billing/account/AccountTestSuiteNoDB.java
index 1171013..d9ca697 100644
--- a/account/src/test/java/com/ning/billing/account/AccountTestSuiteNoDB.java
+++ b/account/src/test/java/com/ning/billing/account/AccountTestSuiteNoDB.java
@@ -24,13 +24,13 @@ import com.ning.billing.GuicyKillbillTestSuiteNoDB;
 import com.ning.billing.account.api.AccountUserApi;
 import com.ning.billing.account.dao.AccountDao;
 import com.ning.billing.account.glue.TestAccountModuleNoDB;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.util.audit.dao.AuditDao;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.customfield.dao.CustomFieldDao;
 import com.ning.billing.util.dao.NonEntityDao;
 import com.ning.billing.util.glue.RealImplementation;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 import com.ning.billing.util.tag.api.user.TagEventBuilder;
 import com.ning.billing.util.tag.dao.TagDao;
 import com.ning.billing.util.tag.dao.TagDefinitionDao;
@@ -55,7 +55,7 @@ public abstract class AccountTestSuiteNoDB extends GuicyKillbillTestSuiteNoDB {
     @Inject
     protected CustomFieldDao customFieldDao;
     @Inject
-    protected InternalBus bus;
+    protected PersistentBus bus;
     @Inject
     protected TagDao tagDao;
     @Inject
diff --git a/account/src/test/java/com/ning/billing/account/AccountTestSuiteWithEmbeddedDB.java b/account/src/test/java/com/ning/billing/account/AccountTestSuiteWithEmbeddedDB.java
index f1dff1a..0bfec40 100644
--- a/account/src/test/java/com/ning/billing/account/AccountTestSuiteWithEmbeddedDB.java
+++ b/account/src/test/java/com/ning/billing/account/AccountTestSuiteWithEmbeddedDB.java
@@ -24,13 +24,13 @@ import com.ning.billing.GuicyKillbillTestSuiteWithEmbeddedDB;
 import com.ning.billing.account.api.AccountUserApi;
 import com.ning.billing.account.dao.AccountDao;
 import com.ning.billing.account.glue.TestAccountModuleWithEmbeddedDB;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.util.audit.dao.AuditDao;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.customfield.dao.CustomFieldDao;
 import com.ning.billing.util.dao.NonEntityDao;
 import com.ning.billing.util.glue.RealImplementation;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 import com.ning.billing.util.tag.api.user.TagEventBuilder;
 import com.ning.billing.util.tag.dao.TagDao;
 import com.ning.billing.util.tag.dao.TagDefinitionDao;
@@ -55,7 +55,7 @@ public abstract class AccountTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
     @Inject
     protected CustomFieldDao customFieldDao;
     @Inject
-    protected InternalBus bus;
+    protected PersistentBus bus;
     @Inject
     protected TagDao tagDao;
     @Inject
diff --git a/account/src/test/java/com/ning/billing/account/api/user/TestDefaultAccountUserApiWithMocks.java b/account/src/test/java/com/ning/billing/account/api/user/TestDefaultAccountUserApiWithMocks.java
index 5de0e23..45a4f32 100644
--- a/account/src/test/java/com/ning/billing/account/api/user/TestDefaultAccountUserApiWithMocks.java
+++ b/account/src/test/java/com/ning/billing/account/api/user/TestDefaultAccountUserApiWithMocks.java
@@ -32,12 +32,12 @@ import com.ning.billing.account.api.DefaultAccountEmail;
 import com.ning.billing.account.dao.AccountDao;
 import com.ning.billing.account.dao.AccountModelDao;
 import com.ning.billing.account.dao.MockAccountDao;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.util.callcontext.CallContext;
 import com.ning.billing.util.callcontext.CallContextFactory;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.InternalTenantContext;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 public class TestDefaultAccountUserApiWithMocks extends AccountTestSuiteNoDB {
 
@@ -51,7 +51,7 @@ public class TestDefaultAccountUserApiWithMocks extends AccountTestSuiteNoDB {
 
     @BeforeMethod(groups = "fast")
     public void setUp() throws Exception {
-        accountDao = new MockAccountDao(Mockito.mock(InternalBus.class));
+        accountDao = new MockAccountDao(Mockito.mock(PersistentBus.class));
         accountUserApi = new DefaultAccountUserApi(factory, internalFactory, accountDao);
     }
 
diff --git a/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java b/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java
index b56fe0a..a154c0c 100644
--- a/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java
+++ b/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java
@@ -31,13 +31,13 @@ import com.ning.billing.account.api.DefaultAccount;
 import com.ning.billing.account.api.DefaultMutableAccountData;
 import com.ning.billing.account.api.user.DefaultAccountChangeEvent;
 import com.ning.billing.account.api.user.DefaultAccountCreationEvent;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 import com.ning.billing.util.entity.dao.MockEntityDaoBase;
 import com.ning.billing.util.events.AccountChangeInternalEvent;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
-import com.ning.billing.util.svcsapi.bus.InternalBus.EventBusException;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
@@ -47,10 +47,10 @@ import com.google.inject.Inject;
 public class MockAccountDao extends MockEntityDaoBase<AccountModelDao, Account, AccountApiException> implements AccountDao {
 
     private final MockEntityDaoBase<AccountEmailModelDao, AccountEmail, AccountApiException> accountEmailSqlDao = new MockEntityDaoBase<AccountEmailModelDao, AccountEmail, AccountApiException>();
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
 
     @Inject
-    public MockAccountDao(final InternalBus eventBus) {
+    public MockAccountDao(final PersistentBus eventBus) {
         this.eventBus = eventBus;
     }
 
@@ -62,7 +62,7 @@ public class MockAccountDao extends MockEntityDaoBase<AccountModelDao, Account, 
             final Long accountRecordId = getRecordId(account.getId(), context);
             final long tenantRecordId = context == null ? InternalCallContextFactory.INTERNAL_TENANT_RECORD_ID
                                                         : context.getTenantRecordId();
-            eventBus.post(new DefaultAccountCreationEvent(account, null, accountRecordId, tenantRecordId), context);
+            eventBus.post(new DefaultAccountCreationEvent(account, null, accountRecordId, tenantRecordId));
         } catch (final EventBusException ex) {
             Assert.fail(ex.toString());
         }
@@ -80,7 +80,7 @@ public class MockAccountDao extends MockEntityDaoBase<AccountModelDao, Account, 
                                                                                      accountRecordId, tenantRecordId);
         if (changeEvent.hasChanges()) {
             try {
-                eventBus.post(changeEvent, context);
+                eventBus.post(changeEvent);
             } catch (final EventBusException ex) {
                 Assert.fail(ex.toString());
             }

beatrix/pom.xml 6(+3 -3)

diff --git a/beatrix/pom.xml b/beatrix/pom.xml
index 72bf9d0..8abd8e9 100644
--- a/beatrix/pom.xml
+++ b/beatrix/pom.xml
@@ -211,9 +211,9 @@
                         </goals>
                         <configuration>
                             <tasks>
-                                <copy file="${basedir}/../osgi-bundles/tests/beatrix/target/killbill-osgi-bundles-test-beatrix-${project.version}-jar-with-dependencies.jar" tofile="${basedir}/src/test/resources/killbill-osgi-bundles-test-beatrix-${project.version}-jar-with-dependencies.jar" />
-                                <copy file="${basedir}/../osgi-bundles/tests/payment/target/killbill-osgi-bundles-test-payment-${project.version}-jar-with-dependencies.jar" tofile="${basedir}/src/test/resources/killbill-osgi-bundles-test-payment-${project.version}-jar-with-dependencies.jar" />
-                                <copy file="${basedir}/../osgi-bundles/bundles/jruby/target/killbill-osgi-bundles-jruby-${project.version}.jar" tofile="${basedir}/src/test/resources/killbill-osgi-bundles-jruby-${project.version}.jar" />
+                                <copy file="${basedir}/../osgi-bundles/tests/beatrix/target/killbill-osgi-bundles-test-beatrix-${project.version}-jar-with-dependencies.jar" tofile="${basedir}/src/test/resources/killbill-osgi-bundles-test-beatrix-${project.version}-jar-with-dependencies.jar"></copy>
+                                <copy file="${basedir}/../osgi-bundles/tests/payment/target/killbill-osgi-bundles-test-payment-${project.version}-jar-with-dependencies.jar" tofile="${basedir}/src/test/resources/killbill-osgi-bundles-test-payment-${project.version}-jar-with-dependencies.jar"></copy>
+                                <copy file="${basedir}/../osgi-bundles/bundles/jruby/target/killbill-osgi-bundles-jruby-${project.version}.jar" tofile="${basedir}/src/test/resources/killbill-osgi-bundles-jruby-${project.version}.jar"></copy>
                             </tasks>
                         </configuration>
                     </execution>
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java b/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java
index 95228cd..1cdba7b 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java
@@ -13,6 +13,7 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
+
 package com.ning.billing.beatrix;
 
 import javax.inject.Inject;
@@ -21,9 +22,9 @@ import com.ning.billing.beatrix.bus.api.BeatrixService;
 import com.ning.billing.beatrix.bus.api.ExternalBus;
 import com.ning.billing.beatrix.extbus.BeatrixListener;
 import com.ning.billing.beatrix.extbus.PersistentExternalBus;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
 import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 
 public class DefaultBeatrixService implements BeatrixService {
@@ -31,11 +32,11 @@ public class DefaultBeatrixService implements BeatrixService {
     public static final String BEATRIX_SERVICE_NAME = "beatrix-service";
 
     private final BeatrixListener beatrixListener;
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
     private final ExternalBus externalBus;
 
     @Inject
-    public DefaultBeatrixService(final InternalBus eventBus, final ExternalBus externalBus, final BeatrixListener beatrixListener) {
+    public DefaultBeatrixService(final PersistentBus eventBus, final ExternalBus externalBus, final BeatrixListener beatrixListener) {
         this.eventBus = eventBus;
         this.externalBus = externalBus;
         this.beatrixListener = beatrixListener;
@@ -51,7 +52,7 @@ public class DefaultBeatrixService implements BeatrixService {
     public void registerForNotifications() {
         try {
             eventBus.register(beatrixListener);
-        } catch (InternalBus.EventBusException e) {
+        } catch (PersistentBus.EventBusException e) {
             throw new RuntimeException("Unable to register to the EventBus!", e);
         }
     }
@@ -60,7 +61,7 @@ public class DefaultBeatrixService implements BeatrixService {
     public void unregisterForNotifications() {
         try {
             eventBus.unregister(beatrixListener);
-        } catch (InternalBus.EventBusException e) {
+        } catch (PersistentBus.EventBusException e) {
             throw new RuntimeException("Unable to unregister to the EventBus!", e);
         }
     }
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
index d6080ec..19d3852 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 import com.ning.billing.ObjectType;
 import com.ning.billing.beatrix.bus.api.ExternalBus;
 import com.ning.billing.beatrix.extbus.dao.ExtBusEventEntry;
+import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
 import com.ning.billing.notification.plugin.api.ExtBusEventType;
 import com.ning.billing.util.Hostname;
@@ -47,7 +48,6 @@ import com.ning.billing.util.events.PaymentInfoInternalEvent;
 import com.ning.billing.util.events.SubscriptionInternalEvent;
 import com.ning.billing.util.events.UserTagCreationInternalEvent;
 import com.ning.billing.util.events.UserTagDeletionInternalEvent;
-import com.ning.billing.util.svcsapi.bus.InternalBus.EventBusException;
 
 import com.google.common.eventbus.Subscribe;
 
@@ -70,14 +70,14 @@ public class BeatrixListener {
     @Subscribe
     public void handleAllInternalKillbillEvents(final BusInternalEvent event) {
         final ExtBusEventEntry externalEvent = computeExtBusEventEntryFromBusInternalEvent(event);
-        try {
 
-            if (externalEvent != null) {
-                final InternalCallContext internalContext =  internalCallContextFactory.createInternalCallContext(event.getTenantRecordId(), event.getAccountRecordId(), "BeatrixListener", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+        if (externalEvent != null) {
+            final InternalCallContext internalContext =  internalCallContextFactory.createInternalCallContext(event.getTenantRecordId(), event.getAccountRecordId(), "BeatrixListener", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+            try {
                 ((PersistentExternalBus) externalBus).post(externalEvent, internalContext);
+            } catch (EventBusException e) {
+               log.warn("Failed to dispatch external bus events", e);
             }
-        }  catch (EventBusException e) {
-            log.warn("Failed to post external bus event {} {} ", externalEvent.getExtBusType(), externalEvent.getObjectId());
         }
     }
 
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java
index 7a78190..e99c527 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java
@@ -22,7 +22,7 @@ import org.joda.time.DateTime;
 
 import com.ning.billing.ObjectType;
 import com.ning.billing.notification.plugin.api.ExtBusEventType;
-import com.ning.billing.util.queue.PersistentQueueEntryLifecycle;
+import com.ning.billing.queue.PersistentQueueEntryLifecycle;
 
 public class ExtBusEventEntry implements PersistentQueueEntryLifecycle {
 
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java
index 2895e43..b4f0a6a 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java
@@ -37,12 +37,12 @@ import org.skife.jdbi.v2.tweak.ResultSetMapper;
 
 import com.ning.billing.ObjectType;
 import com.ning.billing.notification.plugin.api.ExtBusEventType;
+import com.ning.billing.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 import com.ning.billing.util.callcontext.InternalTenantContextBinder;
 import com.ning.billing.util.dao.BinderBase;
 import com.ning.billing.util.dao.MapperBase;
-import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
 @ExternalizedSqlViaStringTemplate3()
 public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
index 5392d4a..1e32c6e 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
@@ -32,18 +32,18 @@ import com.ning.billing.account.api.AccountApiException;
 import com.ning.billing.beatrix.bus.api.ExternalBus;
 import com.ning.billing.beatrix.extbus.dao.ExtBusEventEntry;
 import com.ning.billing.beatrix.extbus.dao.ExtBusSqlDao;
+import com.ning.billing.bus.PersistentBus.EventBusException;
+import com.ning.billing.bus.PersistentBusConfig;
 import com.ning.billing.notification.plugin.api.ExtBusEvent;
+import com.ning.billing.queue.PersistentQueueBase;
 import com.ning.billing.util.Hostname;
 import com.ning.billing.util.bus.DefaultBusService;
-import com.ning.billing.util.bus.PersistentBusConfig;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.queue.PersistentQueueBase;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus.EventBusException;
 
 import com.google.common.eventbus.EventBus;
 import com.google.inject.Inject;
@@ -76,8 +76,8 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
             @Override
             public Thread newThread(final Runnable r) {
                 return new Thread(new ThreadGroup(DefaultBusService.EVENT_BUS_GROUP_NAME),
-                        r,
-                        DefaultBusService.EVENT_BUS_TH_NAME);
+                                  r,
+                                  DefaultBusService.EVENT_BUS_TH_NAME);
             }
         }), config.getNbThreads(), config);
         this.dao = dbi.onDemand(ExtBusSqlDao.class);
@@ -150,7 +150,7 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
         eventBusDelegate.unregister(handlerInstance);
     }
 
-    public void post(final ExtBusEventEntry event, final InternalCallContext context) throws EventBusException{
+    public void post(final ExtBusEventEntry event, final InternalCallContext context) throws EventBusException {
         dao.insertBusExtEvent(event, context);
     }
 }

entitlement/pom.xml 10(+10 -0)

diff --git a/entitlement/pom.xml b/entitlement/pom.xml
index 0af7924..ec44ba6 100644
--- a/entitlement/pom.xml
+++ b/entitlement/pom.xml
@@ -79,6 +79,16 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-queue</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-queue</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
             <scope>provided</scope>
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/alignment/BaseAligner.java b/entitlement/src/main/java/com/ning/billing/entitlement/alignment/BaseAligner.java
new file mode 100644
index 0000000..7c73ea9
--- /dev/null
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/alignment/BaseAligner.java
@@ -0,0 +1,38 @@
+package com.ning.billing.entitlement.alignment;
+
+import org.joda.time.DateTime;
+
+import com.ning.billing.catalog.api.Duration;
+
+public class BaseAligner {
+
+    protected DateTime addDuration(final DateTime input, final Duration duration) {
+        return addOrRemoveDuration(input, duration, true);
+    }
+
+    protected DateTime removeDuration(final DateTime input, final Duration duration) {
+        return addOrRemoveDuration(input, duration, false);
+    }
+
+    private DateTime addOrRemoveDuration(final DateTime input, final Duration duration, boolean add) {
+        DateTime result = input;
+        switch (duration.getUnit()) {
+            case DAYS:
+                result = add ? result.plusDays(duration.getNumber()) : result.minusDays(duration.getNumber());
+                ;
+                break;
+
+            case MONTHS:
+                result = add ? result.plusMonths(duration.getNumber()) : result.minusMonths(duration.getNumber());
+                break;
+
+            case YEARS:
+                result = add ? result.plusYears(duration.getNumber()) : result.minusYears(duration.getNumber());
+                break;
+            case UNLIMITED:
+            default:
+                throw new RuntimeException("Trying to move to unlimited time period");
+        }
+        return result;
+    }
+}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/alignment/MigrationPlanAligner.java b/entitlement/src/main/java/com/ning/billing/entitlement/alignment/MigrationPlanAligner.java
index dd8b7b1..249c93e 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/alignment/MigrationPlanAligner.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/alignment/MigrationPlanAligner.java
@@ -33,7 +33,7 @@ import com.ning.billing.util.clock.DefaultClock;
 
 import com.google.inject.Inject;
 
-public class MigrationPlanAligner {
+public class MigrationPlanAligner extends BaseAligner {
 
     private final CatalogService catalogService;
 
@@ -87,7 +87,7 @@ public class MigrationPlanAligner {
                                                                              plan0.getName(), curPhaseType));
                 }
 
-                migrationStartDate = DefaultClock.removeDuration(input[1].getEffectiveDate(), curPhaseDuration);
+                migrationStartDate = removeDuration(input[1].getEffectiveDate(), curPhaseDuration);
                 events = getEventsOnFuturePhaseChangeMigration(plan0,
                                                                getPlanPhase(plan0, input[0].getPlanPhaseSpecifier().getPhaseType()),
                                                                input[0].getPlanPhaseSpecifier().getPriceListName(),
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/alignment/PlanAligner.java b/entitlement/src/main/java/com/ning/billing/entitlement/alignment/PlanAligner.java
index cd1e510..a91aa69 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/alignment/PlanAligner.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/alignment/PlanAligner.java
@@ -49,7 +49,8 @@ import com.google.inject.Inject;
  * PlanAligner offers specific APIs to return the correct {@code TimedPhase} when creating, changing Plan or to compute
  * next Phase on current Plan.
  */
-public class PlanAligner {
+public class PlanAligner extends BaseAligner {
+
     private final CatalogService catalogService;
 
     @Inject
@@ -300,7 +301,7 @@ public class PlanAligner {
             // STEPH check for duration null instead TimeUnit UNLIMITED
             if (cur.getPhaseType() != PhaseType.EVERGREEN) {
                 final Duration curPhaseDuration = cur.getDuration();
-                nextPhaseStart = DefaultClock.addDuration(curPhaseStart, curPhaseDuration);
+                nextPhaseStart = addDuration(curPhaseStart, curPhaseDuration);
                 if (nextPhaseStart == null) {
                     throw new EntitlementError(String.format("Unexpected non ending UNLIMITED phase for plan %s",
                                                              plan.getName()));
@@ -337,4 +338,7 @@ public class PlanAligner {
                 throw new EntitlementError(String.format("Unexpected %s TimedPhase", which));
         }
     }
+
+
+
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionEvent.java
index 45393b2..4a980f2 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionEvent.java
@@ -29,6 +29,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 public abstract class DefaultSubscriptionEvent extends DefaultBusInternalEvent implements SubscriptionInternalEvent {
+
     private final Long totalOrdering;
     private final UUID subscriptionId;
     private final UUID bundleId;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index e3dab3e..22e100b 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -22,6 +22,8 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.alignment.TimedPhase;
@@ -40,20 +42,18 @@ import com.ning.billing.entitlement.events.user.ApiEvent;
 import com.ning.billing.entitlement.exceptions.EntitlementError;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
 import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
+import com.ning.billing.notificationq.NotificationKey;
+import com.ning.billing.notificationq.NotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService;
+import com.ning.billing.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
+import com.ning.billing.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.events.EffectiveSubscriptionInternalEvent;
-import com.ning.billing.util.notificationq.NotificationKey;
-import com.ning.billing.util.notificationq.NotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
-import com.ning.billing.util.svcsapi.bus.InternalBus.EventBusException;
 
 import com.google.inject.Inject;
 
@@ -68,7 +68,7 @@ public class Engine implements EventListener, EntitlementService {
     private final EntitlementDao dao;
     private final PlanAligner planAligner;
     private final AddonUtils addonUtils;
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
     private final NotificationQueueService notificationQueueService;
     private final InternalCallContextFactory internalCallContextFactory;
     private NotificationQueue subscriptionEventQueue;
@@ -76,7 +76,7 @@ public class Engine implements EventListener, EntitlementService {
 
     @Inject
     public Engine(final Clock clock, final EntitlementDao dao, final PlanAligner planAligner,
-                  final AddonUtils addonUtils, final InternalBus eventBus,
+                  final AddonUtils addonUtils, final PersistentBus eventBus,
                   final NotificationQueueService notificationQueueService,
                   final InternalCallContextFactory internalCallContextFactory,
                   final SubscriptionApiService apiService) {
@@ -171,7 +171,7 @@ public class Engine implements EventListener, EntitlementService {
             final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, subscription.getAlignStartDate(),
                                                                                                       context.getUserToken(),
                                                                                                       context.getAccountRecordId(), context.getTenantRecordId());
-            eventBus.post(busEvent, context);
+            eventBus.post(busEvent);
         } catch (EventBusException e) {
             log.warn("Failed to post entitlement event " + event, e);
         }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java
index 0e5d522..1a98751 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java
@@ -13,11 +13,12 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
+
 package com.ning.billing.entitlement.engine.core;
 
 import java.util.UUID;
 
-import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.notificationq.NotificationKey;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -27,10 +28,10 @@ public class EntitlementNotificationKey implements NotificationKey {
     private final UUID eventId;
     private final int seqId;
 
-    
+
     @JsonCreator
     public EntitlementNotificationKey(@JsonProperty("eventId") final UUID eventId,
-            @JsonProperty("seqId") final int seqId) {
+                                      @JsonProperty("seqId") final int seqId) {
         this.eventId = eventId;
         this.seqId = seqId;
     }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
index 8128583..0b13bbc 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
@@ -37,6 +37,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.ErrorCode;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.ProductCategory;
@@ -70,6 +72,10 @@ import com.ning.billing.entitlement.events.user.ApiEventChange;
 import com.ning.billing.entitlement.events.user.ApiEventMigrateBilling;
 import com.ning.billing.entitlement.events.user.ApiEventType;
 import com.ning.billing.entitlement.exceptions.EntitlementError;
+import com.ning.billing.notificationq.NotificationKey;
+import com.ning.billing.notificationq.NotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService;
+import com.ning.billing.notificationq.NotificationQueueService.NoSuchNotificationQueue;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
@@ -82,12 +88,6 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.events.EffectiveSubscriptionInternalEvent;
 import com.ning.billing.util.events.RepairEntitlementInternalEvent;
-import com.ning.billing.util.notificationq.NotificationKey;
-import com.ning.billing.util.notificationq.NotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
-import com.ning.billing.util.svcsapi.bus.InternalBus.EventBusException;
 
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
@@ -102,12 +102,12 @@ public class DefaultEntitlementDao implements EntitlementDao {
     private final EntitySqlDaoTransactionalJdbiWrapper transactionalSqlDao;
     private final NotificationQueueService notificationQueueService;
     private final AddonUtils addonUtils;
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
     private final CatalogService catalogService;
 
     @Inject
     public DefaultEntitlementDao(final IDBI dbi, final Clock clock, final AddonUtils addonUtils,
-                                 final NotificationQueueService notificationQueueService, final InternalBus eventBus, final CatalogService catalogService,
+                                 final NotificationQueueService notificationQueueService, final PersistentBus eventBus, final CatalogService catalogService,
                                  final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
         this.clock = clock;
         this.transactionalSqlDao = new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, cacheControllerDispatcher, nonEntityDao);
@@ -296,8 +296,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
                 transactional.create(new EntitlementEventModelDao(nextPhase), context);
                 recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
                                                         nextPhase.getEffectiveDate(),
-                                                        new EntitlementNotificationKey(nextPhase.getId()),
-                                                        context);
+                                                        new EntitlementNotificationKey(nextPhase.getId()), context);
 
                 // Notify the Bus of the requested change
                 notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, nextPhase, context);
@@ -415,7 +414,6 @@ public class DefaultEntitlementDao implements EntitlementDao {
     }
 
 
-
     @Override
     public void recreateSubscription(final SubscriptionData subscription, final List<EntitlementEvent> recreateEvents, final InternalCallContext context) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -455,7 +453,6 @@ public class DefaultEntitlementDao implements EntitlementDao {
     }
 
 
-
     @Override
     public void cancelSubscription(final SubscriptionData subscription, final EntitlementEvent cancelEvent, final InternalCallContext context, final int seqId) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -834,7 +831,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
                     // Note: we don't send a requested change event here, but a repair event
                     final RepairEntitlementInternalEvent busEvent = new DefaultRepairEntitlementEvent(context.getUserToken(), accountId, bundleId, clock.getUTCNow(),
                                                                                                       context.getAccountRecordId(), context.getTenantRecordId());
-                    eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory, context);
+                    eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory.getSqlDao());
                 } catch (EventBusException e) {
                     log.warn("Failed to post repair entitlement event for bundle " + bundleId, e);
                 }
@@ -904,7 +901,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
     // - IMM CANCEL or CHANGE
     //
     private void notifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final SubscriptionData subscription,
-                                            final EntitlementEvent immediateEvent, final int seqId, final InternalCallContext context) {
+                                                     final EntitlementEvent immediateEvent, final int seqId, final InternalCallContext context) {
         try {
             final SubscriptionData upToDateSubscription = createSubscriptionWithNewEvent(subscription, immediateEvent);
 
@@ -914,7 +911,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
                                                                                                       context.getAccountRecordId(), context.getTenantRecordId());
 
 
-            eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory, context);
+            eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory.getSqlDao());
         } catch (EventBusException e) {
             log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
         }
@@ -923,7 +920,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
     private void notifyBusOfRequestedChange(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final SubscriptionData subscription,
                                             final EntitlementEvent nextEvent, final InternalCallContext context) {
         try {
-            eventBus.postFromTransaction(new DefaultRequestedSubscriptionEvent(subscription, nextEvent, context.getAccountRecordId(), context.getTenantRecordId()), entitySqlDaoWrapperFactory, context);
+            eventBus.postFromTransaction(new DefaultRequestedSubscriptionEvent(subscription, nextEvent, context.getAccountRecordId(), context.getTenantRecordId()), entitySqlDaoWrapperFactory.getSqlDao());
         } catch (EventBusException e) {
             log.warn("Failed to post requested change event for subscription " + subscription.getId(), e);
         }
@@ -934,13 +931,14 @@ public class DefaultEntitlementDao implements EntitlementDao {
         try {
             final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
                                                                                                            Engine.NOTIFICATION_QUEUE_NAME);
-            subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, notificationKey, context);
+            subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory.getSqlDao(), effectiveDate, notificationKey, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId());
         } catch (NoSuchNotificationQueue e) {
             throw new RuntimeException(e);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
     }
+
     private void migrateBundleDataFromTransaction(final BundleMigrationData bundleTransferData, final EntitlementEventSqlDao transactional,
                                                   final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final InternalCallContext context) throws EntityPersistenceException {
 
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestEntitlementHelper.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestEntitlementHelper.java
index 801a5de..1a8915b 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestEntitlementHelper.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestEntitlementHelper.java
@@ -585,6 +585,49 @@ public class TestEntitlementHelper {
 
 
 
+    public static DateTime addOrRemoveDuration(final DateTime input, final List<Duration> durations, final boolean add) {
+        DateTime result = input;
+        for (final Duration cur : durations) {
+            switch (cur.getUnit()) {
+                case DAYS:
+                    result = add ? result.plusDays(cur.getNumber()) : result.minusDays(cur.getNumber());
+                    break;
+
+                case MONTHS:
+                    result = add ? result.plusMonths(cur.getNumber()) : result.minusMonths(cur.getNumber());
+                    break;
+
+                case YEARS:
+                    result = add ? result.plusYears(cur.getNumber()) : result.minusYears(cur.getNumber());
+                    break;
+                case UNLIMITED:
+                default:
+                    throw new RuntimeException("Trying to move to unlimited time period");
+            }
+        }
+        return result;
+    }
+
+    public static DateTime addDuration(final DateTime input, final List<Duration> durations) {
+        return addOrRemoveDuration(input, durations, true);
+    }
+
+    public static DateTime removeDuration(final DateTime input, final List<Duration> durations) {
+        return addOrRemoveDuration(input, durations, false);
+    }
+
+    public static DateTime addDuration(final DateTime input, final Duration duration) {
+        final List<Duration> list = new ArrayList<Duration>();
+        list.add(duration);
+        return addOrRemoveDuration(input, list, true);
+    }
+
+    public static DateTime removeDuration(final DateTime input, final Duration duration) {
+        final List<Duration> list = new ArrayList<Duration>();
+        list.add(duration);
+        return addOrRemoveDuration(input, list, false);
+    }
+
 
 
     public static class EntitlementSubscriptionMigrationCaseWithCTD implements EntitlementSubscriptionMigrationCase {
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiAddOn.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiAddOn.java
index 356efe1..38b128b 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiAddOn.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiAddOn.java
@@ -106,11 +106,11 @@ public class TestUserApiAddOn extends EntitlementTestSuiteWithEmbeddedDB {
             // SET CTD TO CANCEL IN FUTURE
             final DateTime now = clock.getUTCNow();
             final Duration aoCtd = testUtil.getDurationMonth(1);
-            final DateTime newAOChargedThroughDate = DefaultClock.addDuration(now, aoCtd);
+            final DateTime newAOChargedThroughDate = TestEntitlementHelper.addDuration(now, aoCtd);
             entitlementInternalApi.setChargedThroughDate(aoSubscription.getId(), newAOChargedThroughDate, internalCallContext);
 
             final Duration bpCtd = testUtil.getDurationMonth(11);
-            final DateTime newBPChargedThroughDate = DefaultClock.addDuration(now, bpCtd);
+            final DateTime newBPChargedThroughDate = TestEntitlementHelper.addDuration(now, bpCtd);
             entitlementInternalApi.setChargedThroughDate(baseSubscription.getId(), newBPChargedThroughDate, internalCallContext);
 
             baseSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(baseSubscription.getId(), callContext);
@@ -189,7 +189,7 @@ public class TestUserApiAddOn extends EntitlementTestSuiteWithEmbeddedDB {
             final DateTime now = clock.getUTCNow();
             final Duration ctd = testUtil.getDurationMonth(1);
             // Why not just use clock.getUTCNow().plusMonths(1) ?
-            final DateTime newChargedThroughDate = DefaultClock.addDuration(now, ctd);
+            final DateTime newChargedThroughDate = TestEntitlementHelper.addDuration(now, ctd);
             entitlementInternalApi.setChargedThroughDate(baseSubscription.getId(), newChargedThroughDate, internalCallContext);
             baseSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(baseSubscription.getId(), callContext);
 
@@ -251,7 +251,7 @@ public class TestUserApiAddOn extends EntitlementTestSuiteWithEmbeddedDB {
             final DateTime now = clock.getUTCNow();
             final Duration ctd = testUtil.getDurationMonth(1);
             // Why not just use clock.getUTCNow().plusMonths(1) ?
-            final DateTime newChargedThroughDate = DefaultClock.addDuration(now, ctd);
+            final DateTime newChargedThroughDate = TestEntitlementHelper.addDuration(now, ctd);
             entitlementInternalApi.setChargedThroughDate(baseSubscription.getId(), newChargedThroughDate, internalCallContext);
             baseSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(baseSubscription.getId(), callContext);
 
@@ -325,7 +325,7 @@ public class TestUserApiAddOn extends EntitlementTestSuiteWithEmbeddedDB {
             // SET CTD TO CHANGE IN FUTURE
             final DateTime now = clock.getUTCNow();
             final Duration ctd = testUtil.getDurationMonth(1);
-            final DateTime newChargedThroughDate = DefaultClock.addDuration(now, ctd);
+            final DateTime newChargedThroughDate = TestEntitlementHelper.addDuration(now, ctd);
             entitlementInternalApi.setChargedThroughDate(baseSubscription.getId(), newChargedThroughDate, internalCallContext);
             baseSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(baseSubscription.getId(), callContext);
 
@@ -388,7 +388,7 @@ public class TestUserApiAddOn extends EntitlementTestSuiteWithEmbeddedDB {
             // SET CTD TO CANCEL IN FUTURE
             final DateTime now = clock.getUTCNow();
             final Duration ctd = testUtil.getDurationMonth(1);
-            final DateTime newChargedThroughDate = DefaultClock.addDuration(now, ctd);
+            final DateTime newChargedThroughDate = TestEntitlementHelper.addDuration(now, ctd);
             entitlementInternalApi.setChargedThroughDate(baseSubscription.getId(), newChargedThroughDate, internalCallContext);
             baseSubscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(baseSubscription.getId(), callContext);
 
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancel.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancel.java
index d94d83a..f9c4d55 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancel.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancel.java
@@ -99,7 +99,7 @@ public class TestUserApiCancel extends EntitlementTestSuiteWithEmbeddedDB {
             assertEquals(trialPhase.getPhaseType(), PhaseType.TRIAL);
 
             // NEXT PHASE
-            final DateTime expectedPhaseTrialChange = DefaultClock.addDuration(subscription.getStartDate(), trialPhase.getDuration());
+            final DateTime expectedPhaseTrialChange = TestEntitlementHelper.addDuration(subscription.getStartDate(), trialPhase.getDuration());
             testUtil.checkNextPhaseChange(subscription, 1, expectedPhaseTrialChange);
 
             // MOVE TO NEXT PHASE
@@ -113,7 +113,7 @@ public class TestUserApiCancel extends EntitlementTestSuiteWithEmbeddedDB {
 
             // SET CTD + RE READ SUBSCRIPTION + CHANGE PLAN
             final Duration ctd = testUtil.getDurationMonth(1);
-            final DateTime newChargedThroughDate = DefaultClock.addDuration(expectedPhaseTrialChange, ctd);
+            final DateTime newChargedThroughDate = TestEntitlementHelper.addDuration(expectedPhaseTrialChange, ctd);
             entitlementInternalApi.setChargedThroughDate(subscription.getId(), newChargedThroughDate, internalCallContext);
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId(), callContext);
 
@@ -176,7 +176,7 @@ public class TestUserApiCancel extends EntitlementTestSuiteWithEmbeddedDB {
             assertEquals(trialPhase.getPhaseType(), PhaseType.TRIAL);
 
             // NEXT PHASE
-            final DateTime expectedPhaseTrialChange = DefaultClock.addDuration(subscription.getStartDate(), trialPhase.getDuration());
+            final DateTime expectedPhaseTrialChange = TestEntitlementHelper.addDuration(subscription.getStartDate(), trialPhase.getDuration());
             testUtil.checkNextPhaseChange(subscription, 1, expectedPhaseTrialChange);
 
             // MOVE TO NEXT PHASE
@@ -220,7 +220,7 @@ public class TestUserApiCancel extends EntitlementTestSuiteWithEmbeddedDB {
             assertEquals(trialPhase.getPhaseType(), PhaseType.TRIAL);
 
             // NEXT PHASE
-            final DateTime expectedPhaseTrialChange = DefaultClock.addDuration(subscription.getStartDate(), trialPhase.getDuration());
+            final DateTime expectedPhaseTrialChange = TestEntitlementHelper.addDuration(subscription.getStartDate(), trialPhase.getDuration());
             testUtil.checkNextPhaseChange(subscription, 1, expectedPhaseTrialChange);
 
             // MOVE TO NEXT PHASE
@@ -233,7 +233,7 @@ public class TestUserApiCancel extends EntitlementTestSuiteWithEmbeddedDB {
 
             // SET CTD + RE READ SUBSCRIPTION + CHANGE PLAN
             final Duration ctd = testUtil.getDurationMonth(1);
-            final DateTime newChargedThroughDate = DefaultClock.addDuration(expectedPhaseTrialChange, ctd);
+            final DateTime newChargedThroughDate = TestEntitlementHelper.addDuration(expectedPhaseTrialChange, ctd);
             entitlementInternalApi.setChargedThroughDate(subscription.getId(), newChargedThroughDate, internalCallContext);
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId(), callContext);
 
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
index d3c1929..3427337 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
@@ -78,7 +78,7 @@ public class TestUserApiChangePlan extends EntitlementTestSuiteWithEmbeddedDB {
             clock.addDeltaFromReality(it.toDurationMillis());
 
             final DateTime futureNow = clock.getUTCNow();
-            final DateTime nextExpectedPhaseChange = DefaultClock.addDuration(subscription.getStartDate(), currentPhase.getDuration());
+            final DateTime nextExpectedPhaseChange = TestEntitlementHelper.addDuration(subscription.getStartDate(), currentPhase.getDuration());
             assertTrue(futureNow.isAfter(nextExpectedPhaseChange));
             assertTrue(testListener.isCompleted(5000));
 
@@ -108,7 +108,7 @@ public class TestUserApiChangePlan extends EntitlementTestSuiteWithEmbeddedDB {
             // CREATE
             SubscriptionData subscription = testUtil.createSubscription(bundle, fromProd, fromTerm, fromPlanSet);
             final PlanPhase trialPhase = subscription.getCurrentPhase();
-            final DateTime expectedPhaseTrialChange = DefaultClock.addDuration(subscription.getStartDate(), trialPhase.getDuration());
+            final DateTime expectedPhaseTrialChange = TestEntitlementHelper.addDuration(subscription.getStartDate(), trialPhase.getDuration());
             assertEquals(trialPhase.getPhaseType(), PhaseType.TRIAL);
 
             // MOVE TO NEXT PHASE
@@ -121,7 +121,7 @@ public class TestUserApiChangePlan extends EntitlementTestSuiteWithEmbeddedDB {
 
             // SET CTD
             final Duration ctd = testUtil.getDurationMonth(1);
-            final DateTime newChargedThroughDate = DefaultClock.addDuration(expectedPhaseTrialChange, ctd);
+            final DateTime newChargedThroughDate = TestEntitlementHelper.addDuration(expectedPhaseTrialChange, ctd);
             entitlementInternalApi.setChargedThroughDate(subscription.getId(), newChargedThroughDate, internalCallContext);
 
             // RE READ SUBSCRIPTION + CHANGE PLAN
@@ -137,7 +137,7 @@ public class TestUserApiChangePlan extends EntitlementTestSuiteWithEmbeddedDB {
             checkChangePlan(subscription, fromProd, ProductCategory.BASE, fromTerm, PhaseType.DISCOUNT);
 
             // NEXT PHASE
-            final DateTime nextExpectedPhaseChange = DefaultClock.addDuration(expectedPhaseTrialChange, currentPhase.getDuration());
+            final DateTime nextExpectedPhaseChange = TestEntitlementHelper.addDuration(expectedPhaseTrialChange, currentPhase.getDuration());
             testUtil.checkNextPhaseChange(subscription, 2, nextExpectedPhaseChange);
 
             // ALSO VERIFY PENDING CHANGE EVENT
@@ -183,7 +183,7 @@ public class TestUserApiChangePlan extends EntitlementTestSuiteWithEmbeddedDB {
             assertTrue(testListener.isCompleted(5000));
 
             final PlanPhase currentPhase = subscription.getCurrentPhase();
-            final DateTime nextExpectedPhaseChange = DefaultClock.addDuration(subscription.getStartDate(), currentPhase.getDuration());
+            final DateTime nextExpectedPhaseChange = TestEntitlementHelper.addDuration(subscription.getStartDate(), currentPhase.getDuration());
             testUtil.checkNextPhaseChange(subscription, 1, nextExpectedPhaseChange);
 
             // NEXT PHASE
@@ -213,7 +213,7 @@ public class TestUserApiChangePlan extends EntitlementTestSuiteWithEmbeddedDB {
 
             SubscriptionData subscription = testUtil.createSubscription(bundle, fromProd, fromTerm, fromPlanSet);
             final PlanPhase trialPhase = subscription.getCurrentPhase();
-            final DateTime expectedPhaseTrialChange = DefaultClock.addDuration(subscription.getStartDate(), trialPhase.getDuration());
+            final DateTime expectedPhaseTrialChange = TestEntitlementHelper.addDuration(subscription.getStartDate(), trialPhase.getDuration());
             assertEquals(trialPhase.getPhaseType(), PhaseType.TRIAL);
 
             // MOVE TO NEXT PHASE
@@ -226,7 +226,7 @@ public class TestUserApiChangePlan extends EntitlementTestSuiteWithEmbeddedDB {
 
             // SET CTD
             final Duration ctd = testUtil.getDurationMonth(1);
-            final DateTime newChargedThroughDate = DefaultClock.addDuration(expectedPhaseTrialChange, ctd);
+            final DateTime newChargedThroughDate = TestEntitlementHelper.addDuration(expectedPhaseTrialChange, ctd);
             entitlementInternalApi.setChargedThroughDate(subscription.getId(), newChargedThroughDate, internalCallContext);
 
             // RE READ SUBSCRIPTION + CHECK CURRENT PHASE
@@ -271,7 +271,7 @@ public class TestUserApiChangePlan extends EntitlementTestSuiteWithEmbeddedDB {
             assertFalse(testListener.isCompleted(3000));
             testListener.reset();
 
-            final DateTime nextExpectedPhaseChange = DefaultClock.addDuration(newChargedThroughDate, currentPhase.getDuration());
+            final DateTime nextExpectedPhaseChange = TestEntitlementHelper.addDuration(newChargedThroughDate, currentPhase.getDuration());
             testUtil.checkNextPhaseChange(subscription, 1, nextExpectedPhaseChange);
 
             // MOVE TIME RIGHT AFTER NEXT EXPECTED PHASE CHANGE
@@ -306,9 +306,9 @@ public class TestUserApiChangePlan extends EntitlementTestSuiteWithEmbeddedDB {
             final List<Duration> durationList = new ArrayList<Duration>();
             durationList.add(trialPhase.getDuration());
             //durationList.add(subscription.getCurrentPhase().getDuration());
-            final DateTime startDiscountPhase = DefaultClock.addDuration(subscription.getStartDate(), durationList);
+            final DateTime startDiscountPhase = TestEntitlementHelper.addDuration(subscription.getStartDate(), durationList);
             final Duration ctd = testUtil.getDurationMonth(1);
-            final DateTime newChargedThroughDate = DefaultClock.addDuration(startDiscountPhase, ctd);
+            final DateTime newChargedThroughDate = TestEntitlementHelper.addDuration(startDiscountPhase, ctd);
             entitlementInternalApi.setChargedThroughDate(subscription.getId(), newChargedThroughDate, internalCallContext);
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId(), callContext);
 
@@ -355,9 +355,9 @@ public class TestUserApiChangePlan extends EntitlementTestSuiteWithEmbeddedDB {
             // SET CTD
             final List<Duration> durationList = new ArrayList<Duration>();
             durationList.add(trialPhase.getDuration());
-            final DateTime startDiscountPhase = DefaultClock.addDuration(subscription.getStartDate(), durationList);
+            final DateTime startDiscountPhase = TestEntitlementHelper.addDuration(subscription.getStartDate(), durationList);
             final Duration ctd = testUtil.getDurationMonth(1);
-            final DateTime newChargedThroughDate = DefaultClock.addDuration(startDiscountPhase, ctd);
+            final DateTime newChargedThroughDate = TestEntitlementHelper.addDuration(startDiscountPhase, ctd);
             entitlementInternalApi.setChargedThroughDate(subscription.getId(), newChargedThroughDate, internalCallContext);
             subscription = (SubscriptionData) entitlementApi.getSubscriptionFromId(subscription.getId(), callContext);
 
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCreate.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCreate.java
index ef3e65f..66d440f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCreate.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCreate.java
@@ -172,7 +172,7 @@ public class TestUserApiCreate extends EntitlementTestSuiteWithEmbeddedDB {
             assertTrue(events.size() == 1);
             assertTrue(events.get(0) instanceof PhaseEvent);
             final DateTime nextPhaseChange = ((PhaseEvent) events.get(0)).getEffectiveDate();
-            final DateTime nextExpectedPhaseChange = DefaultClock.addDuration(subscription.getStartDate(), currentPhase.getDuration());
+            final DateTime nextExpectedPhaseChange = TestEntitlementHelper.addDuration(subscription.getStartDate(), currentPhase.getDuration());
             assertEquals(nextPhaseChange, nextExpectedPhaseChange);
 
             testListener.pushExpectedEvent(NextEvent.PHASE);
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiError.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiError.java
index f146268..eee8561 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiError.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiError.java
@@ -189,9 +189,9 @@ public class TestUserApiError extends EntitlementTestSuiteNoDB {
             assertTrue(testListener.isCompleted(5000));
 
             // SET CTD TO CANCEL IN FUTURE
-            final DateTime expectedPhaseTrialChange = DefaultClock.addDuration(subscription.getStartDate(), trialPhase.getDuration());
+            final DateTime expectedPhaseTrialChange = TestEntitlementHelper.addDuration(subscription.getStartDate(), trialPhase.getDuration());
             final Duration ctd = testUtil.getDurationMonth(1);
-            final DateTime newChargedThroughDate = DefaultClock.addDuration(expectedPhaseTrialChange, ctd);
+            final DateTime newChargedThroughDate = TestEntitlementHelper.addDuration(expectedPhaseTrialChange, ctd);
             entitlementInternalApi.setChargedThroughDate(subscription.getId(), newChargedThroughDate, internalCallContext);
 
             subscription = entitlementApi.getSubscriptionFromId(subscription.getId(), tenantContext);
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index dd2e49a..5a24ff0 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -49,15 +49,15 @@ import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
 import com.ning.billing.entitlement.events.user.ApiEvent;
 import com.ning.billing.entitlement.events.user.ApiEventType;
+import com.ning.billing.notificationq.NotificationKey;
+import com.ning.billing.notificationq.NotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService;
+import com.ning.billing.notificationq.NotificationQueueService.NoSuchNotificationQueue;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
-import com.ning.billing.util.notificationq.NotificationKey;
-import com.ning.billing.util.notificationq.NotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
 
 import com.google.inject.Inject;
 
@@ -245,11 +245,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     private Subscription buildSubscription(final SubscriptionData in, final InternalTenantContext context) {
-            final SubscriptionData subscription = new SubscriptionData(new SubscriptionBuilder(in), null, clock);
-            if (events.size() > 0) {
-                subscription.rebuildTransitions(getEventsForSubscription(in.getId(), context), catalogService.getFullCatalog());
-            }
-            return subscription;
+        final SubscriptionData subscription = new SubscriptionData(new SubscriptionBuilder(in), null, clock);
+        if (events.size() > 0) {
+            subscription.rebuildTransitions(getEventsForSubscription(in.getId(), context), catalogService.getFullCatalog());
+        }
+        return subscription;
 
     }
 
@@ -417,7 +417,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
         try {
             final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
                                                                                                            Engine.NOTIFICATION_QUEUE_NAME);
-            subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey, context);
+            subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao.getSqlDao(), effectiveDate, notificationKey, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId());
         } catch (NoSuchNotificationQueue e) {
             throw new RuntimeException(e);
         } catch (IOException e) {
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
index 787fe8e..ca7e005 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
@@ -18,20 +18,21 @@ package com.ning.billing.entitlement.engine.dao;
 
 import org.skife.jdbi.v2.IDBI;
 
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
+import com.ning.billing.notificationq.NotificationQueueService;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
-import com.ning.billing.util.dao.NonEntityDao;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.dao.NonEntityDao;
 
 import com.google.inject.Inject;
 
 public class MockEntitlementDaoSql extends DefaultEntitlementDao {
+
     @Inject
     public MockEntitlementDaoSql(final IDBI dbi, final Clock clock, final AddonUtils addonUtils, final NotificationQueueService notificationQueueService,
-                                 final InternalBus eventBus, final CatalogService catalogService, final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
+                                 final PersistentBus eventBus, final CatalogService catalogService, final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
         super(dbi, clock, addonUtils, notificationQueueService, eventBus, catalogService, cacheControllerDispatcher, nonEntityDao);
     }
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/TestEngineModuleNoDB.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/TestEngineModuleNoDB.java
index 1e9f07d..e0c25a3 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/TestEngineModuleNoDB.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/TestEngineModuleNoDB.java
@@ -25,10 +25,10 @@ import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.engine.dao.MockEntitlementDaoMemory;
 import com.ning.billing.entitlement.engine.dao.RepairEntitlementDao;
 import com.ning.billing.mock.glue.MockNonEntityDaoModule;
+import com.ning.billing.notificationq.MockNotificationQueueService;
+import com.ning.billing.notificationq.NotificationQueueConfig;
+import com.ning.billing.notificationq.NotificationQueueService;
 import com.ning.billing.util.bus.InMemoryBusModule;
-import com.ning.billing.util.notificationq.MockNotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueConfig;
-import com.ning.billing.util.notificationq.NotificationQueueService;
 
 import com.google.inject.name.Names;
 

invoice/pom.xml 4(+4 -0)

diff --git a/invoice/pom.xml b/invoice/pom.xml
index c3f6064..a395bf5 100644
--- a/invoice/pom.xml
+++ b/invoice/pom.xml
@@ -85,6 +85,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-queue</artifactId>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
             <scope>provided</scope>
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/DefaultInvoiceService.java b/invoice/src/main/java/com/ning/billing/invoice/api/DefaultInvoiceService.java
index 4536b58..b7a91aa 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/DefaultInvoiceService.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/DefaultInvoiceService.java
@@ -16,14 +16,14 @@
 
 package com.ning.billing.invoice.api;
 
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.invoice.InvoiceListener;
 import com.ning.billing.invoice.InvoiceTagHandler;
 import com.ning.billing.invoice.notification.NextBillingDateNotifier;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
 import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
+import com.ning.billing.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
 
 import com.google.inject.Inject;
 
@@ -33,10 +33,10 @@ public class DefaultInvoiceService implements InvoiceService {
     private final NextBillingDateNotifier dateNotifier;
     private final InvoiceListener invoiceListener;
     private final InvoiceTagHandler tagHandler;
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
 
     @Inject
-    public DefaultInvoiceService(final InvoiceListener invoiceListener, final InvoiceTagHandler tagHandler, final InternalBus eventBus, final NextBillingDateNotifier dateNotifier) {
+    public DefaultInvoiceService(final InvoiceListener invoiceListener, final InvoiceTagHandler tagHandler, final PersistentBus eventBus, final NextBillingDateNotifier dateNotifier) {
         this.invoiceListener = invoiceListener;
         this.tagHandler = tagHandler;
         this.eventBus = eventBus;
@@ -53,7 +53,7 @@ public class DefaultInvoiceService implements InvoiceService {
         try {
             eventBus.register(invoiceListener);
             eventBus.register(tagHandler);
-        } catch (InternalBus.EventBusException e) {
+        } catch (PersistentBus.EventBusException e) {
             throw new RuntimeException("Unable to register to the EventBus!", e);
         }
         dateNotifier.initialize();
@@ -69,7 +69,7 @@ public class DefaultInvoiceService implements InvoiceService {
         try {
             eventBus.unregister(invoiceListener);
             eventBus.unregister(tagHandler);
-        } catch (InternalBus.EventBusException e) {
+        } catch (PersistentBus .EventBusException e) {
             throw new RuntimeException("Unable to unregister to the EventBus!", e);
         }
         dateNotifier.stop();
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index fa3e420..de616ea 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -32,6 +32,8 @@ import com.ning.billing.ErrorCode;
 import com.ning.billing.ObjectType;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.invoice.InvoiceDispatcher;
 import com.ning.billing.invoice.api.Invoice;
@@ -54,8 +56,6 @@ import com.ning.billing.util.callcontext.InternalTenantContext;
 import com.ning.billing.util.callcontext.TenantContext;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.tag.TagInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
-import com.ning.billing.util.svcsapi.bus.InternalBus.EventBusException;
 import com.ning.billing.util.tag.ControlTagType;
 import com.ning.billing.util.tag.Tag;
 
@@ -74,10 +74,10 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
     private final TagInternalApi tagApi;
     private final HtmlInvoiceGenerator generator;
     private final InternalCallContextFactory internalCallContextFactory;
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
 
     @Inject
-    public DefaultInvoiceUserApi(final InvoiceDao dao, final InvoiceDispatcher dispatcher, final AccountInternalApi accountUserApi, final InternalBus eventBus,
+    public DefaultInvoiceUserApi(final InvoiceDao dao, final InvoiceDispatcher dispatcher, final AccountInternalApi accountUserApi, final PersistentBus eventBus,
                                  final TagInternalApi tagApi, final HtmlInvoiceGenerator generator, final InternalCallContextFactory internalCallContextFactory) {
         this.dao = dao;
         this.dispatcher = dispatcher;
@@ -312,7 +312,7 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
 
     private void notifyBusOfInvoiceAdjustment(final UUID invoiceId, final UUID accountId, final InternalCallContext context) {
         try {
-            eventBus.post(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId()), context);
+            eventBus.post(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId()));
         } catch (EventBusException e) {
             log.warn("Failed to post adjustment event for invoice " + invoiceId, e);
         }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index ef2512c..a2dcf4a 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -30,6 +30,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.ErrorCode;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceApiException;
@@ -47,8 +49,6 @@ import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
-import com.ning.billing.util.svcsapi.bus.InternalBus.EventBusException;
 
 import com.google.inject.Inject;
 
@@ -57,14 +57,14 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
     private static final Logger log = LoggerFactory.getLogger(DefaultInvoiceDao.class);
 
     private final NextBillingDatePoster nextBillingDatePoster;
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
     private final InvoiceDaoHelper invoiceDaoHelper;
     private final CBADao cbaDao;
 
     @Inject
     public DefaultInvoiceDao(final IDBI dbi,
                              final NextBillingDatePoster nextBillingDatePoster,
-                             final InternalBus eventBus,
+                             final PersistentBus eventBus,
                              final Clock clock,
                              final CacheControllerDispatcher cacheControllerDispatcher,
                              final NonEntityDao nonEntityDao) {
@@ -72,7 +72,7 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
         this.nextBillingDatePoster = nextBillingDatePoster;
         this.eventBus = eventBus;
         this.invoiceDaoHelper = new InvoiceDaoHelper();
-        this.cbaDao =  new CBADao();
+        this.cbaDao = new CBADao();
     }
 
     @Override
@@ -308,9 +308,9 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
 
                 // Retrieve the amounts to adjust, if needed
                 final Map<UUID, BigDecimal> invoiceItemIdsWithAmounts = invoiceDaoHelper.computeItemAdjustments(payment.getInvoiceId().toString(),
-                                                                                               entitySqlDaoWrapperFactory,
-                                                                                               invoiceItemIdsWithNullAmounts,
-                                                                                               context);
+                                                                                                                entitySqlDaoWrapperFactory,
+                                                                                                                invoiceItemIdsWithNullAmounts,
+                                                                                                                context);
 
                 // Compute the actual amount to refund
                 final BigDecimal requestedPositiveAmount = invoiceDaoHelper.computePositiveRefundAmount(payment, requestedRefundAmount, invoiceItemIdsWithAmounts);
@@ -357,8 +357,8 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
                     for (final UUID invoiceItemId : invoiceItemIdsWithAmounts.keySet()) {
                         final BigDecimal adjAmount = invoiceItemIdsWithAmounts.get(invoiceItemId);
                         final InvoiceItemModelDao item = invoiceDaoHelper.createAdjustmentItem(entitySqlDaoWrapperFactory, invoice.getId(), invoiceItemId, adjAmount,
-                                                                              invoice.getCurrency(), context.getCreatedDate().toLocalDate(),
-                                                                              context);
+                                                                                               invoice.getCurrency(), context.getCreatedDate().toLocalDate(),
+                                                                                               context);
                         transInvoiceItemDao.create(item, context);
                     }
                 }
@@ -374,7 +374,6 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
     }
 
 
-
     @Override
     public InvoicePaymentModelDao postChargeback(final UUID invoicePaymentId, final BigDecimal amount, final InternalCallContext context) throws InvoiceApiException {
 
@@ -582,7 +581,7 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
             @Override
             public InvoiceItemModelDao inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
                 final InvoiceItemModelDao invoiceItemAdjustment = invoiceDaoHelper.createAdjustmentItem(entitySqlDaoWrapperFactory, invoiceId, invoiceItemId, positiveAdjAmount,
-                                                                                       currency, effectiveDate, context);
+                                                                                                        currency, effectiveDate, context);
                 invoiceDaoHelper.insertItem(entitySqlDaoWrapperFactory, invoiceItemAdjustment, context);
 
                 cbaDao.doCBAComplexity(accountId, entitySqlDaoWrapperFactory, context);
@@ -712,7 +711,7 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
                                               final UUID userToken, final InternalCallContext context) {
         try {
             eventBus.postFromTransaction(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken, context.getAccountRecordId(), context.getTenantRecordId()),
-                                         entitySqlDaoWrapperFactory, context);
+                                         entitySqlDaoWrapperFactory.getSqlDao());
         } catch (EventBusException e) {
             log.warn("Failed to post adjustment event for invoice " + invoiceId, e);
         }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
index 4d6b261..e901f0a 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
 import com.ning.billing.ErrorCode;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 import com.ning.billing.invoice.api.Invoice;
@@ -71,8 +73,6 @@ import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 import com.ning.billing.util.svcapi.junction.BillingEventSet;
 import com.ning.billing.util.svcapi.junction.BillingInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
-import com.ning.billing.util.svcsapi.bus.InternalBus.EventBusException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -92,7 +92,7 @@ public class InvoiceDispatcher {
     private final InvoiceDao invoiceDao;
     private final InvoiceNotifier invoiceNotifier;
     private final GlobalLocker locker;
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
     private final Clock clock;
 
     @Inject
@@ -102,7 +102,7 @@ public class InvoiceDispatcher {
                              final InvoiceDao invoiceDao,
                              final InvoiceNotifier invoiceNotifier,
                              final GlobalLocker locker,
-                             final InternalBus eventBus,
+                             final PersistentBus eventBus,
                              final Clock clock) {
         this.generator = generator;
         this.billingApi = billingApi;
@@ -193,7 +193,7 @@ public class InvoiceDispatcher {
                 }
             } else {
                 log.info("Generated invoice {} with {} items for accountId {} and targetDate {} (targetDateTime {})", new Object[]{invoice.getId(), invoice.getNumberOfItems(),
-                                                                                                                                   accountId, targetDate, targetDateTime});
+                        accountId, targetDate, targetDateTime});
                 if (!dryRun) {
 
                     // Extract the set of invoiceId for which we see items that don't belong to current generated invoice
@@ -304,7 +304,7 @@ public class InvoiceDispatcher {
 
     private void postEvent(final BusInternalEvent event, final UUID accountId, final InternalCallContext context) {
         try {
-            eventBus.post(event, context);
+            eventBus.post(event);
         } catch (EventBusException e) {
             log.error(String.format("Failed to post event %s for account %s", event.getBusEventType(), accountId), e);
         }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 5b93235..abe263a 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -26,14 +26,14 @@ import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.invoice.InvoiceListener;
 import com.ning.billing.invoice.api.DefaultInvoiceService;
+import com.ning.billing.notificationq.NotificationKey;
+import com.ning.billing.notificationq.NotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService;
+import com.ning.billing.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
+import com.ning.billing.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.config.InvoiceConfig;
-import com.ning.billing.util.notificationq.NotificationKey;
-import com.ning.billing.util.notificationq.NotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 
 import com.google.inject.Inject;
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
index a3eef35..32ba48c 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
@@ -24,15 +24,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.invoice.api.DefaultInvoiceService;
+import com.ning.billing.notificationq.NotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService;
+import com.ning.billing.notificationq.NotificationQueueService.NoSuchNotificationQueue;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
-import com.ning.billing.util.notificationq.NotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
 
 import com.google.inject.Inject;
 
@@ -61,8 +61,8 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
                                                                              DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
             log.info("Queuing next billing date notification at {} for subscriptionId {}", futureNotificationTime.toString(), subscriptionId.toString());
 
-            nextBillingQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationTime,
-                                                                     new NextBillingDateNotificationKey(subscriptionId), context);
+            nextBillingQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory.getSqlDao(), futureNotificationTime,
+                                                                     new NextBillingDateNotificationKey(subscriptionId), context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId());
         } catch (NoSuchNotificationQueue e) {
             log.error("Attempting to put items on a non-existent queue (NextBillingDateNotifier).", e);
         } catch (IOException e) {
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotificationKey.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotificationKey.java
index b1fec2e..d184ca7 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotificationKey.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotificationKey.java
@@ -18,7 +18,7 @@ package com.ning.billing.invoice.notification;
 
 import java.util.UUID;
 
-import com.ning.billing.util.notificationq.DefaultUUIDNotificationKey;
+import com.ning.billing.notificationq.DefaultUUIDNotificationKey;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
index 38ba037..dacb5b7 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
@@ -16,8 +16,8 @@
 
 package com.ning.billing.invoice.notification;
 
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
+import com.ning.billing.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
 
 public interface NextBillingDateNotifier {
 
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
index 7f01355..c9df7af 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
@@ -29,25 +29,25 @@ import javax.annotation.Nullable;
 import org.joda.time.DateTime;
 import org.joda.time.LocalDate;
 
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.invoice.api.InvoiceApiException;
 import com.ning.billing.invoice.api.user.DefaultInvoiceCreationEvent;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.inject.Inject;
 
 public class MockInvoiceDao implements InvoiceDao {
 
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
     private final Object monitor = new Object();
     private final Map<UUID, InvoiceModelDao> invoices = new LinkedHashMap<UUID, InvoiceModelDao>();
     private final Map<UUID, InvoiceItemModelDao> items = new LinkedHashMap<UUID, InvoiceItemModelDao>();
     private final Map<UUID, InvoicePaymentModelDao> payments = new LinkedHashMap<UUID, InvoicePaymentModelDao>();
 
     @Inject
-    public MockInvoiceDao(final InternalBus eventBus) {
+    public MockInvoiceDao(final PersistentBus eventBus) {
         this.eventBus = eventBus;
     }
 
@@ -66,8 +66,8 @@ public class MockInvoiceDao implements InvoiceDao {
         try {
             eventBus.post(new DefaultInvoiceCreationEvent(invoice.getId(), invoice.getAccountId(),
                                                           InvoiceModelDaoHelper.getBalance(invoice), invoice.getCurrency(),
-                                                          null, 1L, 1L), context);
-        } catch (InternalBus.EventBusException ex) {
+                                                          null, 1L, 1L));
+        } catch (PersistentBus.EventBusException ex) {
             throw new RuntimeException(ex);
         }
     }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/TestDefaultInvoiceDaoUnit.java b/invoice/src/test/java/com/ning/billing/invoice/dao/TestDefaultInvoiceDaoUnit.java
index 344e9e6..74118ff 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/TestDefaultInvoiceDaoUnit.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/TestDefaultInvoiceDaoUnit.java
@@ -21,24 +21,12 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.Transaction;
 import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.ning.billing.ErrorCode;
-import com.ning.billing.dao.MockNonEntityDao;
 import com.ning.billing.invoice.InvoiceTestSuiteNoDB;
-import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceApiException;
-import com.ning.billing.invoice.notification.NextBillingDatePoster;
-import com.ning.billing.util.cache.CacheControllerDispatcher;
-import com.ning.billing.util.callcontext.InternalTenantContext;
-import com.ning.billing.util.entity.dao.EntitySqlDao;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.common.collect.ImmutableMap;
 
diff --git a/invoice/src/test/java/com/ning/billing/invoice/InvoiceTestSuiteNoDB.java b/invoice/src/test/java/com/ning/billing/invoice/InvoiceTestSuiteNoDB.java
index f6e1194..b71e489 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/InvoiceTestSuiteNoDB.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/InvoiceTestSuiteNoDB.java
@@ -26,6 +26,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
 import com.ning.billing.GuicyKillbillTestSuiteNoDB;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.invoice.api.InvoiceMigrationApi;
 import com.ning.billing.invoice.api.InvoicePaymentApi;
 import com.ning.billing.invoice.api.InvoiceUserApi;
@@ -42,7 +43,6 @@ import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 import com.ning.billing.util.svcapi.invoice.InvoiceInternalApi;
 import com.ning.billing.util.svcapi.junction.BillingInternalApi;
 import com.ning.billing.util.svcsapi.bus.BusService;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.inject.Guice;
 import com.google.inject.Inject;
@@ -53,7 +53,7 @@ public abstract class InvoiceTestSuiteNoDB extends GuicyKillbillTestSuiteNoDB {
     private static final Logger log = LoggerFactory.getLogger(InvoiceTestSuiteNoDB.class);
 
     @Inject
-    protected InternalBus bus;
+    protected PersistentBus bus;
     @Inject
     protected CacheControllerDispatcher controllerDispatcher;
     @Inject
diff --git a/invoice/src/test/java/com/ning/billing/invoice/InvoiceTestSuiteWithEmbeddedDB.java b/invoice/src/test/java/com/ning/billing/invoice/InvoiceTestSuiteWithEmbeddedDB.java
index ef6ba5a..a3060f4 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/InvoiceTestSuiteWithEmbeddedDB.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/InvoiceTestSuiteWithEmbeddedDB.java
@@ -26,6 +26,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
 import com.ning.billing.GuicyKillbillTestSuiteWithEmbeddedDB;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.invoice.api.DefaultInvoiceService;
 import com.ning.billing.invoice.api.InvoiceMigrationApi;
@@ -36,18 +37,17 @@ import com.ning.billing.invoice.dao.InvoiceDao;
 import com.ning.billing.invoice.generator.InvoiceGenerator;
 import com.ning.billing.invoice.glue.TestInvoiceModuleWithEmbeddedDb;
 import com.ning.billing.invoice.notification.NextBillingDateNotifier;
+import com.ning.billing.notificationq.NotificationQueueService;
 import com.ning.billing.util.api.TagUserApi;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.globallocker.GlobalLocker;
-import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 import com.ning.billing.util.svcapi.invoice.InvoiceInternalApi;
 import com.ning.billing.util.svcapi.junction.BillingInternalApi;
 import com.ning.billing.util.svcsapi.bus.BusService;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.inject.Guice;
 import com.google.inject.Inject;
@@ -62,7 +62,7 @@ public abstract class InvoiceTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
     @Inject
     protected InvoiceService invoiceService;
     @Inject
-    protected InternalBus bus;
+    protected PersistentBus bus;
     @Inject
     protected CacheControllerDispatcher controllerDispatcher;
     @Inject
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
index e9d1358..18ed89a 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
@@ -26,8 +26,8 @@ import org.testng.annotations.Test;
 import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.invoice.InvoiceTestSuiteWithEmbeddedDB;
 import com.ning.billing.invoice.api.DefaultInvoiceService;
+import com.ning.billing.notificationq.NotificationQueue;
 import com.ning.billing.util.clock.ClockMock;
-import com.ning.billing.util.notificationq.NotificationQueue;
 
 import static com.jayway.awaitility.Awaitility.await;
 import static java.util.concurrent.TimeUnit.MINUTES;
@@ -46,7 +46,7 @@ public class TestNextBillingDateNotifier extends InvoiceTestSuiteWithEmbeddedDB 
         final NotificationQueue nextBillingQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME, DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
 
 
-        nextBillingQueue.recordFutureNotification(now, new NextBillingDateNotificationKey(subscriptionId), internalCallContext);
+        nextBillingQueue.recordFutureNotification(now, new NextBillingDateNotificationKey(subscriptionId), internalCallContext.getUserToken(), internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
 
         // Move time in the future after the notification effectiveDate
         ((ClockMock) clock).setDeltaFromReality(3000);

jaxrs/pom.xml 4(+4 -0)

diff --git a/jaxrs/pom.xml b/jaxrs/pom.xml
index f5c9265..e185516 100644
--- a/jaxrs/pom.xml
+++ b/jaxrs/pom.xml
@@ -56,6 +56,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-queue</artifactId>
+        </dependency>
+        <dependency>
             <groupId>javax.servlet</groupId>
             <artifactId>javax.servlet-api</artifactId>
         </dependency>

junction/pom.xml 4(+4 -0)

diff --git a/junction/pom.xml b/junction/pom.xml
index 998c94b..4453b23 100644
--- a/junction/pom.xml
+++ b/junction/pom.xml
@@ -79,6 +79,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-queue</artifactId>
+        </dependency>
+        <dependency>
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>
         </dependency>
diff --git a/junction/src/test/java/com/ning/billing/junction/JunctionTestSuiteNoDB.java b/junction/src/test/java/com/ning/billing/junction/JunctionTestSuiteNoDB.java
index d9c4aa9..4bbb0b2 100644
--- a/junction/src/test/java/com/ning/billing/junction/JunctionTestSuiteNoDB.java
+++ b/junction/src/test/java/com/ning/billing/junction/JunctionTestSuiteNoDB.java
@@ -21,6 +21,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
 import com.ning.billing.GuicyKillbillTestSuiteNoDB;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.entitlement.api.user.EntitlementUserApi;
 import com.ning.billing.junction.block.BlockingChecker;
@@ -34,7 +35,6 @@ import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 import com.ning.billing.util.svcapi.junction.BillingInternalApi;
 import com.ning.billing.util.svcapi.junction.BlockingInternalApi;
 import com.ning.billing.util.svcapi.tag.TagInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 import com.ning.billing.util.tag.dao.TagDao;
 
 import com.google.inject.Guice;
@@ -65,7 +65,7 @@ public abstract class JunctionTestSuiteNoDB extends GuicyKillbillTestSuiteNoDB {
     @Inject
     protected EntitlementInternalApi entitlementInternalApi;
     @Inject
-    protected InternalBus bus;
+    protected PersistentBus bus;
     @Inject
     protected TagDao tagDao;
     @Inject
diff --git a/junction/src/test/java/com/ning/billing/junction/JunctionTestSuiteWithEmbeddedDB.java b/junction/src/test/java/com/ning/billing/junction/JunctionTestSuiteWithEmbeddedDB.java
index afa132c..f6efaf9 100644
--- a/junction/src/test/java/com/ning/billing/junction/JunctionTestSuiteWithEmbeddedDB.java
+++ b/junction/src/test/java/com/ning/billing/junction/JunctionTestSuiteWithEmbeddedDB.java
@@ -21,6 +21,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
 import com.ning.billing.GuicyKillbillTestSuiteWithEmbeddedDB;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.entitlement.api.user.EntitlementUserApi;
 import com.ning.billing.junction.block.BlockingChecker;
@@ -33,7 +34,6 @@ import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 import com.ning.billing.util.svcapi.junction.BillingInternalApi;
 import com.ning.billing.util.svcapi.junction.BlockingInternalApi;
 import com.ning.billing.util.svcapi.tag.TagInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 import com.ning.billing.util.tag.dao.TagDao;
 
 import com.google.inject.Guice;
@@ -62,7 +62,7 @@ public abstract class JunctionTestSuiteWithEmbeddedDB extends GuicyKillbillTestS
     @Inject
     protected EntitlementInternalApi entitlementInternalApi;
     @Inject
-    protected InternalBus bus;
+    protected PersistentBus bus;
     @Inject
     protected TagDao tagDao;
     @Inject
diff --git a/osgi-bundles/bundles/logger/pom.xml b/osgi-bundles/bundles/logger/pom.xml
index ce33426..bfb1baf 100644
--- a/osgi-bundles/bundles/logger/pom.xml
+++ b/osgi-bundles/bundles/logger/pom.xml
@@ -56,7 +56,7 @@
                 <configuration>
                     <instructions>
                         <Bundle-Activator>com.ning.billing.osgi.bundles.logger.Activator</Bundle-Activator>
-                        <Export-Package />
+                        <Export-Package></Export-Package>
                         <Private-Package>com.ning.billing.osgi.bundles.logger.*</Private-Package>
                         <!-- Optional resolution because exported by the Felix system bundle -->
                         <Import-Package>*;resolution:=optional</Import-Package>

overdue/pom.xml 5(+4 -1)

diff --git a/overdue/pom.xml b/overdue/pom.xml
index 3099737..d528268 100644
--- a/overdue/pom.xml
+++ b/overdue/pom.xml
@@ -73,7 +73,6 @@
             <groupId>com.ning.billing</groupId>
             <artifactId>killbill-util</artifactId>
         </dependency>
-
         <dependency>
             <groupId>com.ning.billing</groupId>
             <artifactId>killbill-util</artifactId>
@@ -81,6 +80,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-queue</artifactId>
+        </dependency>
+        <dependency>
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>
         </dependency>
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
index 2ecc24f..b72960e 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -22,15 +22,15 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.billing.notificationq.NotificationKey;
+import com.ning.billing.notificationq.NotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService;
+import com.ning.billing.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
+import com.ning.billing.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.overdue.OverdueProperties;
 import com.ning.billing.overdue.listener.OverdueListener;
 import com.ning.billing.overdue.service.DefaultOverdueService;
-import com.ning.billing.util.notificationq.NotificationKey;
-import com.ning.billing.util.notificationq.NotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 
 import com.google.inject.Inject;
 
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
index 11cee68..2335771 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -26,7 +26,12 @@ import org.slf4j.LoggerFactory;
 
 import com.ning.billing.junction.api.Blockable;
 import com.ning.billing.junction.api.Type;
+import com.ning.billing.notificationq.Notification;
+import com.ning.billing.notificationq.NotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService;
+import com.ning.billing.notificationq.NotificationQueueService.NoSuchNotificationQueue;
 import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.queue.PersistentQueueBase;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.clock.Clock;
@@ -35,11 +40,6 @@ import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
-import com.ning.billing.util.notificationq.Notification;
-import com.ning.billing.util.notificationq.NotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.util.queue.PersistentQueueBase;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
@@ -88,14 +88,14 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
                         }
 
                         for (int i = minIndexToDeleteFrom; i < futureNotifications.size(); i++) {
-                            checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotifications.get(i).getId(), context);
+                            checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory.getSqlDao(), futureNotifications.get(i).getId());
                         }
                     }
 
                     if (shouldInsertNewNotification) {
                         log.debug("Queuing overdue check notification. Overdueable id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
                         final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(overdueable.getId(), Type.get(overdueable));
-                        checkOverdueQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationTime, notificationKey, context);
+                        checkOverdueQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory.getSqlDao(), futureNotificationTime, notificationKey, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId());
                     } else {
                         log.debug("Skipping queuing overdue check notification. Overdueable id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
                     }
@@ -119,7 +119,7 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
                 public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
                     final List<Notification> futureNotifications = getFutureNotificationsForAccountAndOverdueableInTransaction(entitySqlDaoWrapperFactory, checkOverdueQueue, overdueable, context);
                     for (final Notification notification : futureNotifications) {
-                        checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory, notification.getId(), context);
+                        checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory.getSqlDao(), notification.getId());
                     }
 
                     return null;
@@ -137,7 +137,7 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
                                                                                    final InternalCallContext context) {
         final List<Notification> notifications = new ArrayList<Notification>();
 
-        final List<Notification> candidates = checkOverdueQueue.getFutureNotificationsForAccountFromTransaction(entitySqlDaoWrapperFactory, context);
+        final List<Notification> candidates = checkOverdueQueue.getFutureNotificationsForAccountFromTransaction(context.getAccountRecordId(), entitySqlDaoWrapperFactory.getSqlDao());
         for (final Notification candidate : candidates) {
             if (OverdueCheckNotificationKey.class.getName().equals(candidate.getNotificationKeyClass())) {
                 final OverdueCheckNotificationKey key = PersistentQueueBase.deserializeEvent(candidate.getNotificationKeyClass(), candidate.getNotificationKey());
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckNotificationKey.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckNotificationKey.java
index d74c813..ac96145 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckNotificationKey.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckNotificationKey.java
@@ -19,7 +19,7 @@ package com.ning.billing.ovedue.notification;
 import java.util.UUID;
 
 import com.ning.billing.junction.api.Type;
-import com.ning.billing.util.notificationq.DefaultUUIDNotificationKey;
+import com.ning.billing.notificationq.DefaultUUIDNotificationKey;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
diff --git a/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java b/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
index c145950..3030cf3 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
@@ -30,6 +30,7 @@ import com.ning.billing.ErrorCode;
 import com.ning.billing.ObjectType;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.catalog.api.ActionPolicy;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
@@ -58,7 +59,6 @@ import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 import com.ning.billing.util.svcapi.junction.BlockingInternalApi;
 import com.ning.billing.util.svcapi.junction.DefaultBlockingState;
 import com.ning.billing.util.svcapi.tag.TagInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 import com.ning.billing.util.tag.ControlTagType;
 import com.ning.billing.util.tag.Tag;
 
@@ -73,7 +73,7 @@ public class OverdueStateApplicator<T extends Blockable> {
     private final BlockingInternalApi blockingApi;
     private final Clock clock;
     private final OverdueCheckPoster poster;
-    private final InternalBus bus;
+    private final PersistentBus bus;
     private final AccountInternalApi accountApi;
     private final EntitlementInternalApi entitlementUserApi;
     private final OverdueEmailGenerator overdueEmailGenerator;
@@ -83,7 +83,7 @@ public class OverdueStateApplicator<T extends Blockable> {
     @Inject
     public OverdueStateApplicator(final BlockingInternalApi accessApi, final AccountInternalApi accountApi, final EntitlementInternalApi entitlementUserApi,
                                   final Clock clock, final OverdueCheckPoster poster, final OverdueEmailGenerator overdueEmailGenerator,
-                                  final EmailConfig config, final InternalBus bus, final TagInternalApi tagApi) {
+                                  final EmailConfig config, final PersistentBus bus, final TagInternalApi tagApi) {
         this.blockingApi = accessApi;
         this.accountApi = accountApi;
         this.entitlementUserApi = entitlementUserApi;
@@ -141,7 +141,7 @@ public class OverdueStateApplicator<T extends Blockable> {
         }
 
         try {
-            bus.post(createOverdueEvent(overdueable, previousOverdueStateName, nextOverdueState.getName(), context), context);
+            bus.post(createOverdueEvent(overdueable, previousOverdueStateName, nextOverdueState.getName(), context));
         } catch (Exception e) {
             log.error("Error posting overdue change event to bus", e);
         }
@@ -156,7 +156,7 @@ public class OverdueStateApplicator<T extends Blockable> {
         clearFutureNotification(overdueable, context);
 
         try {
-            bus.post(createOverdueEvent(overdueable, previousOverdueStateName, clearState.getName(), context), context);
+            bus.post(createOverdueEvent(overdueable, previousOverdueStateName, clearState.getName(), context));
         } catch (Exception e) {
             log.error("Error posting overdue change event to bus", e);
         }
diff --git a/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java b/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java
index dee9f20..5e588c2 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java
@@ -22,6 +22,7 @@ import java.net.URISyntaxException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
 import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
 import com.ning.billing.ovedue.notification.OverdueCheckNotifier;
@@ -34,11 +35,11 @@ import com.ning.billing.overdue.listener.OverdueListener;
 import com.ning.billing.overdue.wrapper.OverdueWrapperFactory;
 import com.ning.billing.util.config.catalog.XMLLoader;
 import com.ning.billing.util.svcsapi.bus.BusService;
-import com.ning.billing.util.svcsapi.bus.InternalBus.EventBusException;
 
 import com.google.inject.Inject;
 
 public class DefaultOverdueService implements OverdueService {
+
     private static final Logger log = LoggerFactory.getLogger(DefaultOverdueService.class);
 
     public static final String OVERDUE_SERVICE_NAME = "overdue-service";
diff --git a/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java b/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
index dc8ffa6..d5e39f7 100644
--- a/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
+++ b/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
@@ -30,6 +30,8 @@ import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.junction.api.Blockable;
 import com.ning.billing.junction.api.Type;
+import com.ning.billing.notificationq.Notification;
+import com.ning.billing.notificationq.NotificationQueue;
 import com.ning.billing.overdue.OverdueTestSuiteWithEmbeddedDB;
 import com.ning.billing.overdue.service.DefaultOverdueService;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
@@ -37,8 +39,6 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.jackson.ObjectMapper;
-import com.ning.billing.util.notificationq.Notification;
-import com.ning.billing.util.notificationq.NotificationQueue;
 
 public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedDB {
 
@@ -81,7 +81,7 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
         insertOverdueCheckAndVerifyQueueContent(otherOverdueable, 15, 5);
 
         // Verify the final content of the queue
-        Assert.assertEquals(overdueQueue.getFutureNotificationsForAccount(internalCallContext).size(), 2);
+        Assert.assertEquals(overdueQueue.getFutureNotificationsForAccount(internalCallContext.getAccountRecordId()).size(), 2);
     }
 
     private void insertOverdueCheckAndVerifyQueueContent(final Blockable overdueable, final int nbDaysInFuture, final int expectedNbDaysInFuture) throws IOException {
diff --git a/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteNoDB.java b/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteNoDB.java
index e48240a..9cd571f 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteNoDB.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteNoDB.java
@@ -21,7 +21,9 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
 import com.ning.billing.GuicyKillbillTestSuiteNoDB;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
+import com.ning.billing.notificationq.NotificationQueueService;
 import com.ning.billing.ovedue.notification.OverdueCheckNotifier;
 import com.ning.billing.ovedue.notification.OverdueCheckPoster;
 import com.ning.billing.overdue.applicator.OverdueBusListenerTester;
@@ -31,13 +33,11 @@ import com.ning.billing.overdue.glue.TestOverdueModuleNoDB;
 import com.ning.billing.overdue.service.DefaultOverdueService;
 import com.ning.billing.overdue.wrapper.OverdueWrapperFactory;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
-import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 import com.ning.billing.util.svcapi.invoice.InvoiceInternalApi;
 import com.ning.billing.util.svcapi.junction.BlockingInternalApi;
 import com.ning.billing.util.svcsapi.bus.BusService;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.inject.Guice;
 import com.google.inject.Inject;
@@ -58,7 +58,7 @@ public abstract class OverdueTestSuiteNoDB extends GuicyKillbillTestSuiteNoDB {
     @Inject
     protected EntitlementInternalApi entitlementApi;
     @Inject
-    protected InternalBus bus;
+    protected PersistentBus bus;
     @Inject
     protected InternalCallContextFactory internalCallContextFactory;
     @Inject
diff --git a/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteWithEmbeddedDB.java b/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteWithEmbeddedDB.java
index 7b91843..bebc309 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteWithEmbeddedDB.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteWithEmbeddedDB.java
@@ -21,7 +21,9 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
 import com.ning.billing.GuicyKillbillTestSuiteWithEmbeddedDB;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
+import com.ning.billing.notificationq.NotificationQueueService;
 import com.ning.billing.ovedue.notification.OverdueCheckNotifier;
 import com.ning.billing.ovedue.notification.OverdueCheckPoster;
 import com.ning.billing.overdue.applicator.OverdueBusListenerTester;
@@ -33,13 +35,11 @@ import com.ning.billing.overdue.wrapper.OverdueWrapperFactory;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.dao.NonEntityDao;
-import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 import com.ning.billing.util.svcapi.invoice.InvoiceInternalApi;
 import com.ning.billing.util.svcapi.junction.BlockingInternalApi;
 import com.ning.billing.util.svcsapi.bus.BusService;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.inject.Guice;
 import com.google.inject.Inject;
@@ -62,7 +62,7 @@ public abstract class OverdueTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
     @Inject
     protected CacheControllerDispatcher cacheControllerDispatcher;
     @Inject
-    protected InternalBus bus;
+    protected PersistentBus bus;
     @Inject
     protected InternalCallContextFactory internalCallContextFactory;
     @Inject

payment/pom.xml 4(+4 -0)

diff --git a/payment/pom.xml b/payment/pom.xml
index 9ce0508..fca1716 100644
--- a/payment/pom.xml
+++ b/payment/pom.xml
@@ -93,6 +93,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-queue</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.ning.billing.plugin</groupId>
             <artifactId>killbill-plugin-api-payment</artifactId>
         </dependency>
diff --git a/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java
index 3bd3665..5c198ac 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import com.ning.billing.ErrorCode;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.osgi.api.OSGIServiceRegistration;
 import com.ning.billing.payment.api.DefaultPaymentMethod;
 import com.ning.billing.payment.api.PaymentApiException;
@@ -51,7 +52,6 @@ import com.ning.billing.util.globallocker.GlobalLocker;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.invoice.InvoiceInternalApi;
 import com.ning.billing.util.svcapi.tag.TagInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
@@ -69,7 +69,7 @@ public class PaymentMethodProcessor extends ProcessorBase {
     public PaymentMethodProcessor(final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry,
                                   final AccountInternalApi accountInternalApi,
                                   final InvoiceInternalApi invoiceApi,
-                                  final InternalBus eventBus,
+                                  final PersistentBus eventBus,
                                   final PaymentDao paymentDao,
                                   final TagInternalApi tagUserApi,
                                   final GlobalLocker locker,
diff --git a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
index 016c534..d9ef813 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
@@ -30,13 +30,13 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 
-import org.skife.config.TimeSpan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.ErrorCode;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceApiException;
 import com.ning.billing.osgi.api.OSGIServiceRegistration;
@@ -67,7 +67,6 @@ import com.ning.billing.util.globallocker.GlobalLocker;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.invoice.InvoiceInternalApi;
 import com.ning.billing.util.svcapi.tag.TagInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
@@ -102,7 +101,7 @@ public class PaymentProcessor extends ProcessorBase {
                             final PluginFailureRetryServiceScheduler pluginFailureRetryService,
                             final AutoPayRetryServiceScheduler autoPayoffRetryService,
                             final PaymentDao paymentDao,
-                            final InternalBus eventBus,
+                            final PersistentBus eventBus,
                             final Clock clock,
                             final GlobalLocker locker,
                             final PaymentConfig paymentConfig,
diff --git a/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java b/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
index f6f23c3..172d654 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
@@ -30,6 +30,8 @@ import com.ning.billing.ErrorCode;
 import com.ning.billing.ObjectType;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceApiException;
 import com.ning.billing.osgi.api.OSGIServiceRegistration;
@@ -48,8 +50,6 @@ import com.ning.billing.util.globallocker.LockFailedException;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.invoice.InvoiceInternalApi;
 import com.ning.billing.util.svcapi.tag.TagInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
-import com.ning.billing.util.svcsapi.bus.InternalBus.EventBusException;
 import com.ning.billing.util.tag.ControlTagType;
 import com.ning.billing.util.tag.Tag;
 
@@ -62,7 +62,7 @@ public abstract class ProcessorBase {
 
     protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
     protected final AccountInternalApi accountInternalApi;
-    protected final InternalBus eventBus;
+    protected final PersistentBus eventBus;
     protected final GlobalLocker locker;
     protected final ExecutorService executor;
     protected final PaymentDao paymentDao;
@@ -73,7 +73,7 @@ public abstract class ProcessorBase {
 
     public ProcessorBase(final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry,
                          final AccountInternalApi accountInternalApi,
-                         final InternalBus eventBus,
+                         final PersistentBus eventBus,
                          final PaymentDao paymentDao,
                          final TagInternalApi tagInternalApi,
                          final GlobalLocker locker,
@@ -142,7 +142,7 @@ public abstract class ProcessorBase {
             return;
         }
         try {
-            eventBus.post(ev, context);
+            eventBus.post(ev);
         } catch (EventBusException e) {
             log.error("Failed to post Payment event event for account {} ", accountId, e);
         }
diff --git a/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java
index e76087d..c625d60 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java
@@ -35,6 +35,7 @@ import com.ning.billing.ErrorCode;
 import com.ning.billing.ObjectType;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.invoice.api.InvoiceApiException;
 import com.ning.billing.invoice.api.InvoiceItem;
 import com.ning.billing.osgi.api.OSGIServiceRegistration;
@@ -58,7 +59,6 @@ import com.ning.billing.util.globallocker.GlobalLocker;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.invoice.InvoiceInternalApi;
 import com.ning.billing.util.svcapi.tag.TagInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.common.base.Function;
 import com.google.common.base.Objects;
@@ -79,7 +79,7 @@ public class RefundProcessor extends ProcessorBase {
     public RefundProcessor(final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry,
                            final AccountInternalApi accountApi,
                            final InvoiceInternalApi invoiceApi,
-                           final InternalBus eventBus,
+                           final PersistentBus eventBus,
                            final InternalCallContextFactory internalCallContextFactory,
                            final TagInternalApi tagUserApi,
                            final PaymentDao paymentDao,
diff --git a/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java b/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
index be948d2..97c7512 100644
--- a/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
+++ b/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
@@ -19,9 +19,11 @@ package com.ning.billing.payment.glue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.inject.Inject;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
 import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
+import com.ning.billing.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentService;
 import com.ning.billing.payment.bus.InvoiceHandler;
@@ -29,9 +31,8 @@ import com.ning.billing.payment.bus.PaymentTagHandler;
 import com.ning.billing.payment.retry.AutoPayRetryService;
 import com.ning.billing.payment.retry.FailedPaymentRetryService;
 import com.ning.billing.payment.retry.PluginFailureRetryService;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
+
+import com.google.inject.Inject;
 
 public class DefaultPaymentService implements PaymentService {
 
@@ -41,7 +42,7 @@ public class DefaultPaymentService implements PaymentService {
 
     private final InvoiceHandler invoiceHandler;
     private final PaymentTagHandler tagHandler;
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
     private final PaymentApi api;
     private final FailedPaymentRetryService failedRetryService;
     private final PluginFailureRetryService timedoutRetryService;
@@ -49,11 +50,11 @@ public class DefaultPaymentService implements PaymentService {
 
     @Inject
     public DefaultPaymentService(final InvoiceHandler invoiceHandler,
-            final PaymentTagHandler tagHandler,
-            final PaymentApi api, final InternalBus eventBus,
-            final FailedPaymentRetryService failedRetryService,
-            final PluginFailureRetryService timedoutRetryService,
-            final AutoPayRetryService autoPayoffRetryService) {
+                                 final PaymentTagHandler tagHandler,
+                                 final PaymentApi api, final PersistentBus eventBus,
+                                 final FailedPaymentRetryService failedRetryService,
+                                 final PluginFailureRetryService timedoutRetryService,
+                                 final AutoPayRetryService autoPayoffRetryService) {
         this.invoiceHandler = invoiceHandler;
         this.tagHandler = tagHandler;
         this.eventBus = eventBus;
@@ -73,7 +74,7 @@ public class DefaultPaymentService implements PaymentService {
         try {
             eventBus.register(invoiceHandler);
             eventBus.register(tagHandler);
-        } catch (InternalBus.EventBusException e) {
+        } catch (PersistentBus.EventBusException e) {
             log.error("Unable to register with the EventBus!", e);
         }
         failedRetryService.initialize(SERVICE_NAME);
@@ -93,7 +94,7 @@ public class DefaultPaymentService implements PaymentService {
         try {
             eventBus.unregister(invoiceHandler);
             eventBus.unregister(tagHandler);
-        } catch (InternalBus.EventBusException e) {
+        } catch (PersistentBus.EventBusException e) {
             throw new RuntimeException("Unable to unregister to the EventBus!", e);
         }
         failedRetryService.stop();
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
index d695b8a..61d7d26 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
@@ -20,11 +20,11 @@ import java.util.UUID;
 
 import org.joda.time.DateTime;
 
+import com.ning.billing.notificationq.NotificationQueueService;
 import com.ning.billing.util.config.PaymentConfig;
 import com.ning.billing.payment.core.PaymentProcessor;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
-import com.ning.billing.util.notificationq.NotificationQueueService;
 
 import com.google.inject.Inject;
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
index f3b10e9..4fbc849 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
@@ -24,6 +24,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.ObjectType;
+import com.ning.billing.notificationq.NotificationKey;
+import com.ning.billing.notificationq.NotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService;
+import com.ning.billing.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
+import com.ning.billing.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.payment.glue.DefaultPaymentService;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.InternalCallContext;
@@ -32,12 +38,6 @@ import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.config.PaymentConfig;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
-import com.ning.billing.util.notificationq.NotificationKey;
-import com.ning.billing.util.notificationq.NotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 
 import com.google.inject.Inject;
 
@@ -137,9 +137,9 @@ public abstract class BaseRetryService implements RetryService {
                 final NotificationKey key = new PaymentRetryNotificationKey(paymentId);
                 if (retryQueue != null) {
                     if (transactionalDao == null) {
-                        retryQueue.recordFutureNotification(timeOfRetry, key, context);
+                        retryQueue.recordFutureNotification(timeOfRetry, key, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId());
                     } else {
-                        retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, key, context);
+                        retryQueue.recordFutureNotificationFromTransaction(transactionalDao.getSqlDao(), timeOfRetry, key, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId());
                     }
                 }
             } catch (NoSuchNotificationQueue e) {
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
index 0abe3db..3a70556 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
@@ -23,12 +23,12 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.billing.notificationq.NotificationQueueService;
 import com.ning.billing.util.config.PaymentConfig;
 import com.ning.billing.payment.core.PaymentProcessor;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.notificationq.NotificationQueueService;
 
 import com.google.inject.Inject;
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/PaymentRetryNotificationKey.java b/payment/src/main/java/com/ning/billing/payment/retry/PaymentRetryNotificationKey.java
index 7a9f6ba..dae5d11 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/PaymentRetryNotificationKey.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/PaymentRetryNotificationKey.java
@@ -17,9 +17,10 @@ package com.ning.billing.payment.retry;
 
 import java.util.UUID;
 
+import com.ning.billing.notificationq.DefaultUUIDNotificationKey;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.ning.billing.util.notificationq.DefaultUUIDNotificationKey;
 
 public class PaymentRetryNotificationKey extends DefaultUUIDNotificationKey {
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
index 1e7e3b9..6403013 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
@@ -22,6 +22,7 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.billing.notificationq.NotificationQueueService;
 import com.ning.billing.payment.core.PaymentProcessor;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
@@ -29,7 +30,6 @@ import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.config.PaymentConfig;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
-import com.ning.billing.util.notificationq.NotificationQueueService;
 
 import com.google.inject.Inject;
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/RetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/RetryService.java
index 30180b9..21ed3dc 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/RetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/RetryService.java
@@ -17,9 +17,9 @@ package com.ning.billing.payment.retry;
 
 import java.util.UUID;
 
+import com.ning.billing.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+import com.ning.billing.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
 import com.ning.billing.util.callcontext.InternalCallContext;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
 
 public interface RetryService {
 
diff --git a/payment/src/test/java/com/ning/billing/payment/PaymentTestSuiteNoDB.java b/payment/src/test/java/com/ning/billing/payment/PaymentTestSuiteNoDB.java
index 86c3ddd..e6fa56a 100644
--- a/payment/src/test/java/com/ning/billing/payment/PaymentTestSuiteNoDB.java
+++ b/payment/src/test/java/com/ning/billing/payment/PaymentTestSuiteNoDB.java
@@ -24,6 +24,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
 import com.ning.billing.GuicyKillbillTestSuiteNoDB;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.osgi.api.OSGIServiceRegistration;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.core.PaymentMethodProcessor;
@@ -36,7 +37,6 @@ import com.ning.billing.payment.retry.PluginFailureRetryService;
 import com.ning.billing.util.config.PaymentConfig;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.invoice.InvoiceInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.inject.Guice;
 import com.google.inject.Inject;
@@ -59,7 +59,7 @@ public abstract class PaymentTestSuiteNoDB extends GuicyKillbillTestSuiteNoDB {
     @Inject
     protected PluginFailureRetryService pluginRetryService;
     @Inject
-    protected InternalBus eventBus;
+    protected PersistentBus eventBus;
     @Inject
     protected PaymentApi paymentApi;
     @Inject
diff --git a/payment/src/test/java/com/ning/billing/payment/PaymentTestSuiteWithEmbeddedDB.java b/payment/src/test/java/com/ning/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
index ee2b72d..c9cee71 100644
--- a/payment/src/test/java/com/ning/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
+++ b/payment/src/test/java/com/ning/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
@@ -24,6 +24,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
 import com.ning.billing.GuicyKillbillTestSuiteWithEmbeddedDB;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.osgi.api.OSGIServiceRegistration;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.core.PaymentMethodProcessor;
@@ -37,7 +38,6 @@ import com.ning.billing.payment.retry.PluginFailureRetryService;
 import com.ning.billing.util.config.PaymentConfig;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.invoice.InvoiceInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.inject.Guice;
 import com.google.inject.Inject;
@@ -60,7 +60,7 @@ public abstract class PaymentTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
     @Inject
     protected PluginFailureRetryService pluginRetryService;
     @Inject
-    protected InternalBus eventBus;
+    protected PersistentBus eventBus;
     @Inject
     protected PaymentApi paymentApi;
     @Inject
diff --git a/payment/src/test/java/com/ning/billing/payment/TestPaymentHelper.java b/payment/src/test/java/com/ning/billing/payment/TestPaymentHelper.java
index 4263a5e..096cc8e 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestPaymentHelper.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestPaymentHelper.java
@@ -22,6 +22,8 @@ import org.joda.time.LocalDate;
 import org.mockito.Mockito;
 
 import com.ning.billing.account.api.Account;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBus.EventBusException;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceApiException;
@@ -37,8 +39,6 @@ import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.events.InvoiceCreationInternalEvent;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.invoice.InvoiceInternalApi;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
-import com.ning.billing.util.svcsapi.bus.InternalBus.EventBusException;
 
 import com.google.inject.Inject;
 
@@ -47,7 +47,7 @@ public class TestPaymentHelper {
     protected final AccountInternalApi AccountApi;
     protected final InvoiceInternalApi invoiceApi;
     protected PaymentApi paymentApi;
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
     private final Clock clock;
 
     private final CallContext context;
@@ -55,7 +55,7 @@ public class TestPaymentHelper {
 
     @Inject
     public TestPaymentHelper(final AccountInternalApi AccountApi, final InvoiceInternalApi invoiceApi,
-                             final PaymentApi paymentApi, final InternalBus eventBus, final Clock clock,
+                             final PaymentApi paymentApi, final PersistentBus eventBus, final Clock clock,
                              final CallContext context, final InternalCallContext internalCallContext) {
         this.eventBus = eventBus;
         this.AccountApi = AccountApi;
@@ -92,11 +92,11 @@ public class TestPaymentHelper {
 
         Mockito.when(invoiceApi.getInvoiceById(Mockito.eq(invoice.getId()), Mockito.<InternalTenantContext>any())).thenReturn(invoice);
         final InvoiceCreationInternalEvent event = new MockInvoiceCreationEvent(invoice.getId(), invoice.getAccountId(),
-                                                                        invoice.getBalance(), invoice.getCurrency(),
-                                                                        invoice.getInvoiceDate(),
-                                                                        context.getUserToken());
+                                                                                invoice.getBalance(), invoice.getCurrency(),
+                                                                                invoice.getInvoiceDate(),
+                                                                                context.getUserToken());
 
-        eventBus.post(event, internalCallContext);
+        eventBus.post(event);
         return invoice;
     }
 

pom.xml 2(+1 -1)

diff --git a/pom.xml b/pom.xml
index c375b48..98eec89 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill-oss-parent</artifactId>
         <groupId>com.ning.billing</groupId>
-        <version>0.2.6</version>
+        <version>0.2.7-SNAPSHOT</version>
     </parent>
     <artifactId>killbill</artifactId>
     <version>0.3.2-SNAPSHOT</version>

server/pom.xml 4(+4 -0)

diff --git a/server/pom.xml b/server/pom.xml
index 995c5c3..bc0f921 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -142,6 +142,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-queue</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.ning.jetty</groupId>
             <artifactId>ning-service-skeleton-base</artifactId>
         </dependency>
diff --git a/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java b/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
index d42045d..31065db 100644
--- a/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
+++ b/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
@@ -26,15 +26,15 @@ import org.slf4j.LoggerFactory;
 
 import com.ning.billing.beatrix.bus.api.ExternalBus;
 import com.ning.billing.beatrix.lifecycle.DefaultLifecycle;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.jaxrs.resources.JaxRsResourceBase;
 import com.ning.billing.jaxrs.util.KillbillEventHandler;
+import com.ning.billing.notificationq.NotificationQueueService;
 import com.ning.billing.server.config.KillbillServerConfig;
 import com.ning.billing.server.healthchecks.KillbillHealthcheck;
 import com.ning.billing.server.modules.KillbillServerModule;
 import com.ning.billing.server.security.TenantFilter;
-import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.svcsapi.bus.BusService;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 import com.ning.jetty.base.modules.ServerModuleBuilder;
 import com.ning.jetty.core.listeners.SetupServer;
 
@@ -74,11 +74,11 @@ public class KillbillGuiceListener extends SetupServer {
                 .addHealthCheck(KillbillHealthcheck.class)
                 .addJMXExport(KillbillHealthcheck.class)
                 .addJMXExport(NotificationQueueService.class)
-                .addJMXExport(InternalBus.class)
+                .addJMXExport(PersistentBus.class)
                 .addJMXExport(ExternalBus.class)
                 .addModule(getModule())
-                // Don't filter all requests through Jersey, only the JAX-RS APIs (otherwise,
-                // things like static resources, favicon, etc. are 404'ed)
+                        // Don't filter all requests through Jersey, only the JAX-RS APIs (otherwise,
+                        // things like static resources, favicon, etc. are 404'ed)
                 .setJerseyUriPattern("(" + JaxRsResourceBase.PREFIX + "|" + JaxRsResourceBase.PLUGINS_PATH + ")" + "/.*")
                 .addJerseyResource("com.ning.billing.jaxrs.mappers")
                 .addJerseyResource("com.ning.billing.jaxrs.resources");
@@ -116,7 +116,7 @@ public class KillbillGuiceListener extends SetupServer {
         //
         try {
             killbillBusService.getBus().register(killbilleventHandler);
-        } catch (InternalBus.EventBusException e) {
+        } catch (PersistentBus.EventBusException e) {
             logger.error("Failed to register for event notifications, this is bad exiting!", e);
             System.exit(1);
         }
@@ -139,7 +139,7 @@ public class KillbillGuiceListener extends SetupServer {
 
         try {
             killbillBusService.getBus().unregister(killbilleventHandler);
-        } catch (InternalBus.EventBusException e) {
+        } catch (PersistentBus.EventBusException e) {
             logger.warn("Failed to unregister for event notifications", e);
         }
 

util/pom.xml 15(+15 -0)

diff --git a/util/pom.xml b/util/pom.xml
index a2bb616..0756925 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -78,6 +78,21 @@
             <artifactId>killbill-internal-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-queue</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-queue</artifactId>
+            <version>${killbill-commons.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-util</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.ning.billing.plugin</groupId>
             <artifactId>killbill-plugin-api-notification</artifactId>
         </dependency>
diff --git a/util/src/main/java/com/ning/billing/util/bus/DefaultBusService.java b/util/src/main/java/com/ning/billing/util/bus/DefaultBusService.java
index b3d93bd..c006c82 100644
--- a/util/src/main/java/com/ning/billing/util/bus/DefaultBusService.java
+++ b/util/src/main/java/com/ning/billing/util/bus/DefaultBusService.java
@@ -16,10 +16,10 @@
 
 package com.ning.billing.util.bus;
 
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
 import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
 import com.ning.billing.util.svcsapi.bus.BusService;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.inject.Inject;
 
@@ -32,10 +32,10 @@ public class DefaultBusService implements BusService {
     public static final String EVENT_BUS_SERVICE = "bus-service";
     public static final String EVENT_BUS_IDENTIFIER = EVENT_BUS_SERVICE;
 
-    private final InternalBus eventBus;
+    private final PersistentBus eventBus;
 
     @Inject
-    public DefaultBusService(final InternalBus eventBus) {
+    public DefaultBusService(final PersistentBus eventBus) {
         this.eventBus = eventBus;
     }
 
@@ -55,7 +55,7 @@ public class DefaultBusService implements BusService {
     }
 
     @Override
-    public InternalBus getBus() {
+    public PersistentBus getBus() {
         return eventBus;
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/customfield/dao/DefaultCustomFieldDao.java b/util/src/main/java/com/ning/billing/util/customfield/dao/DefaultCustomFieldDao.java
index 61f6f03..c040d22 100644
--- a/util/src/main/java/com/ning/billing/util/customfield/dao/DefaultCustomFieldDao.java
+++ b/util/src/main/java/com/ning/billing/util/customfield/dao/DefaultCustomFieldDao.java
@@ -22,12 +22,15 @@ import java.util.UUID;
 import javax.annotation.Nullable;
 
 import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.BillingExceptionBase;
 import com.ning.billing.ErrorCode;
 import com.ning.billing.ObjectType;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.dao.PersistentBusSqlDao;
 import com.ning.billing.util.api.CustomFieldApiException;
 import com.ning.billing.util.audit.ChangeType;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
@@ -44,8 +47,6 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.events.BusInternalEvent;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
-import com.ning.billing.util.tag.dao.TagModelDao;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
@@ -56,10 +57,10 @@ public class DefaultCustomFieldDao extends EntityDaoBase<CustomFieldModelDao, Cu
 
     private final static Logger log = LoggerFactory.getLogger(DefaultCustomFieldDao.class);
 
-    private final InternalBus bus;
+    private final PersistentBus bus;
 
     @Inject
-    public DefaultCustomFieldDao(final IDBI dbi, final Clock clock, final CacheControllerDispatcher controllerDispatcher, final NonEntityDao nonEntityDao, final InternalBus bus) {
+    public DefaultCustomFieldDao(final IDBI dbi, final Clock clock, final CacheControllerDispatcher controllerDispatcher, final NonEntityDao nonEntityDao, final PersistentBus bus) {
         super(new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, controllerDispatcher, nonEntityDao), CustomFieldSqlDao.class);
         this.bus = bus;
     }
@@ -120,8 +121,8 @@ public class DefaultCustomFieldDao extends EntityDaoBase<CustomFieldModelDao, Cu
         }
 
         try {
-            bus.postFromTransaction(customFieldEvent, entitySqlDaoWrapperFactory, context);
-        } catch (InternalBus.EventBusException e) {
+            bus.postFromTransaction(customFieldEvent, entitySqlDaoWrapperFactory.getSqlDao());
+        } catch (PersistentBus.EventBusException e) {
             log.warn("Failed to post tag event for custom field " + customField.getId().toString(), e);
         }
 
diff --git a/util/src/main/java/com/ning/billing/util/entity/dao/EntitySqlDaoWrapperFactory.java b/util/src/main/java/com/ning/billing/util/entity/dao/EntitySqlDaoWrapperFactory.java
index 08d5a35..559859f 100644
--- a/util/src/main/java/com/ning/billing/util/entity/dao/EntitySqlDaoWrapperFactory.java
+++ b/util/src/main/java/com/ning/billing/util/entity/dao/EntitySqlDaoWrapperFactory.java
@@ -63,6 +63,10 @@ public class EntitySqlDaoWrapperFactory<InitialSqlDao extends EntitySqlDao> {
         return sqlDao.become(newTransactionalClass);
     }
 
+    public InitialSqlDao getSqlDao() {
+        return sqlDao;
+    }
+
     private <NewSqlDao extends EntitySqlDao<NewEntityModelDao, NewEntity>,
             NewEntityModelDao extends EntityModelDao<NewEntity>,
             NewEntity extends Entity> NewSqlDao create(final Class<NewSqlDao> newSqlDaoClass, final NewSqlDao newSqlDao) {
diff --git a/util/src/main/java/com/ning/billing/util/events/BusInternalEvent.java b/util/src/main/java/com/ning/billing/util/events/BusInternalEvent.java
index 0b64bf4..13629bb 100644
--- a/util/src/main/java/com/ning/billing/util/events/BusInternalEvent.java
+++ b/util/src/main/java/com/ning/billing/util/events/BusInternalEvent.java
@@ -16,9 +16,9 @@
 
 package com.ning.billing.util.events;
 
-import java.util.UUID;
+import com.ning.billing.bus.BusPersistentEvent;
 
-public interface BusInternalEvent {
+public interface BusInternalEvent extends BusPersistentEvent {
 
     public enum BusInternalEventType {
         ACCOUNT_CREATE,
@@ -45,9 +45,4 @@ public interface BusInternalEvent {
 
     public BusInternalEventType getBusEventType();
 
-    public UUID getUserToken();
-
-    public Long getTenantRecordId();
-
-    public Long getAccountRecordId();
 }
diff --git a/util/src/main/java/com/ning/billing/util/glue/BusModule.java b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
index 7712d6c..aaeaecb 100644
--- a/util/src/main/java/com/ning/billing/util/glue/BusModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
@@ -19,12 +19,12 @@ package com.ning.billing.util.glue;
 import org.skife.config.ConfigSource;
 import org.skife.config.ConfigurationObjectFactory;
 
+import com.ning.billing.bus.DefaultPersistentBus;
+import com.ning.billing.bus.InMemoryPersistentBus;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBusConfig;
 import com.ning.billing.util.bus.DefaultBusService;
-import com.ning.billing.util.bus.InMemoryInternalBus;
-import com.ning.billing.util.bus.PersistentBusConfig;
-import com.ning.billing.util.bus.PersistentInternalBus;
 import com.ning.billing.util.svcsapi.bus.BusService;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.inject.AbstractModule;
 
@@ -70,10 +70,10 @@ public class BusModule extends AbstractModule {
 
     private void configurePersistentEventBus() {
         configurePersistentBusConfig();
-        bind(InternalBus.class).to(PersistentInternalBus.class).asEagerSingleton();
+        bind(PersistentBus.class).to(DefaultPersistentBus.class).asEagerSingleton();
     }
 
     private void configureInMemoryEventBus() {
-        bind(InternalBus.class).to(InMemoryInternalBus.class).asEagerSingleton();
+        bind(PersistentBus.class).to(InMemoryPersistentBus.class).asEagerSingleton();
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
index 97f2a13..b5a4294 100644
--- a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
@@ -19,10 +19,9 @@ package com.ning.billing.util.glue;
 import org.skife.config.ConfigSource;
 import org.skife.config.ConfigurationObjectFactory;
 
-import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueConfig;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-
+import com.ning.billing.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.notificationq.NotificationQueueConfig;
+import com.ning.billing.notificationq.NotificationQueueService;
 import com.google.inject.AbstractModule;
 
 public class NotificationQueueModule extends AbstractModule {
diff --git a/util/src/main/java/com/ning/billing/util/svcsapi/bus/BusService.java b/util/src/main/java/com/ning/billing/util/svcsapi/bus/BusService.java
index b8670c8..766f74d 100644
--- a/util/src/main/java/com/ning/billing/util/svcsapi/bus/BusService.java
+++ b/util/src/main/java/com/ning/billing/util/svcsapi/bus/BusService.java
@@ -16,10 +16,11 @@
 
 package com.ning.billing.util.svcsapi.bus;
 
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.lifecycle.KillbillService;
 
 public interface BusService extends KillbillService {
 
    // API_FIX should be removed and server should only use public bus
-   public InternalBus getBus();
+   public PersistentBus getBus();
 }
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDao.java
index 6ae85a9..2c47961 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDao.java
@@ -28,9 +28,9 @@ import org.slf4j.LoggerFactory;
 import com.ning.billing.BillingExceptionBase;
 import com.ning.billing.ErrorCode;
 import com.ning.billing.ObjectType;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.util.api.TagApiException;
 import com.ning.billing.util.audit.ChangeType;
-import com.ning.billing.util.cache.Cachable.CacheType;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
@@ -42,7 +42,6 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.events.TagInternalEvent;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 import com.ning.billing.util.tag.ControlTagType;
 import com.ning.billing.util.tag.Tag;
 import com.ning.billing.util.tag.api.user.TagEventBuilder;
@@ -57,10 +56,10 @@ public class DefaultTagDao extends EntityDaoBase<TagModelDao, Tag, TagApiExcepti
     private static final Logger log = LoggerFactory.getLogger(DefaultTagDao.class);
 
     private final TagEventBuilder tagEventBuilder;
-    private final InternalBus bus;
+    private final PersistentBus bus;
 
     @Inject
-    public DefaultTagDao(final IDBI dbi, final TagEventBuilder tagEventBuilder, final InternalBus bus, final Clock clock,
+    public DefaultTagDao(final IDBI dbi, final TagEventBuilder tagEventBuilder, final PersistentBus bus, final Clock clock,
                          final CacheControllerDispatcher controllerDispatcher, final NonEntityDao nonEntityDao) {
         super(new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, controllerDispatcher, nonEntityDao), TagSqlDao.class);
         this.tagEventBuilder = tagEventBuilder;
@@ -125,8 +124,8 @@ public class DefaultTagDao extends EntityDaoBase<TagModelDao, Tag, TagApiExcepti
         }
 
         try {
-            bus.postFromTransaction(tagEvent, entitySqlDaoWrapperFactory, context);
-        } catch (InternalBus.EventBusException e) {
+            bus.postFromTransaction(tagEvent, entitySqlDaoWrapperFactory.getSqlDao());
+        } catch (PersistentBus.EventBusException e) {
             log.warn("Failed to post tag event for tag " + tag.getId().toString(), e);
         }
     }
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
index 8b8b2fd..6bc6fff 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import com.ning.billing.BillingExceptionBase;
 import com.ning.billing.ErrorCode;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.util.api.TagDefinitionApiException;
 import com.ning.billing.util.audit.ChangeType;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
@@ -41,7 +42,6 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.events.TagDefinitionInternalEvent;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 import com.ning.billing.util.tag.ControlTagType;
 import com.ning.billing.util.tag.TagDefinition;
 import com.ning.billing.util.tag.api.user.TagEventBuilder;
@@ -55,10 +55,10 @@ public class DefaultTagDefinitionDao extends EntityDaoBase<TagDefinitionModelDao
     private static final Logger log = LoggerFactory.getLogger(DefaultTagDefinitionDao.class);
 
     private final TagEventBuilder tagEventBuilder;
-    private final InternalBus bus;
+    private final PersistentBus bus;
 
     @Inject
-    public DefaultTagDefinitionDao(final IDBI dbi, final TagEventBuilder tagEventBuilder, final InternalBus bus, final Clock clock,
+    public DefaultTagDefinitionDao(final IDBI dbi, final TagEventBuilder tagEventBuilder, final PersistentBus bus, final Clock clock,
                                    final CacheControllerDispatcher controllerDispatcher, final NonEntityDao nonEntityDao) {
         super(new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, controllerDispatcher, nonEntityDao), TagDefinitionSqlDao.class);
         this.tagEventBuilder = tagEventBuilder;
@@ -174,8 +174,8 @@ public class DefaultTagDefinitionDao extends EntityDaoBase<TagDefinitionModelDao
                         tagDefinitionEvent = tagEventBuilder.newUserTagDefinitionCreationEvent(tagDefinition.getId(), tagDefinition, context);
                     }
                     try {
-                        bus.postFromTransaction(tagDefinitionEvent, entitySqlDaoWrapperFactory, context);
-                    } catch (InternalBus.EventBusException e) {
+                        bus.postFromTransaction(tagDefinitionEvent, entitySqlDaoWrapperFactory.getSqlDao());
+                    } catch (PersistentBus.EventBusException e) {
                         log.warn("Failed to post tag definition creation event for tag " + tagDefinition.getId(), e);
                     }
 
@@ -249,8 +249,8 @@ public class DefaultTagDefinitionDao extends EntityDaoBase<TagDefinitionModelDao
         }
 
         try {
-            bus.postFromTransaction(tagDefinitionEvent, entitySqlDaoWrapperFactory, context);
-        } catch (InternalBus.EventBusException e) {
+            bus.postFromTransaction(tagDefinitionEvent, entitySqlDaoWrapperFactory.getSqlDao());
+        } catch (PersistentBus.EventBusException e) {
             log.warn("Failed to post tag definition event for tag " + tagDefinition.getId().toString(), e);
         }
     }
diff --git a/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java b/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
index 46fb7d9..37b2a9f 100644
--- a/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
+++ b/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
@@ -19,10 +19,10 @@ package com.ning.billing.mock.glue;
 import org.skife.config.ConfigSource;
 import org.skife.config.ConfigurationObjectFactory;
 
+import com.ning.billing.notificationq.MockNotificationQueueService;
+import com.ning.billing.notificationq.NotificationQueueConfig;
+import com.ning.billing.notificationq.NotificationQueueService;
 import com.ning.billing.util.glue.NotificationQueueModule;
-import com.ning.billing.util.notificationq.MockNotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueConfig;
-import com.ning.billing.util.notificationq.NotificationQueueService;
 
 public class MockNotificationQueueModule extends NotificationQueueModule {
 
diff --git a/util/src/test/java/com/ning/billing/util/UtilTestSuiteNoDB.java b/util/src/test/java/com/ning/billing/util/UtilTestSuiteNoDB.java
index 2022493..58d5d92 100644
--- a/util/src/test/java/com/ning/billing/util/UtilTestSuiteNoDB.java
+++ b/util/src/test/java/com/ning/billing/util/UtilTestSuiteNoDB.java
@@ -23,13 +23,13 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
 import com.ning.billing.GuicyKillbillTestSuiteNoDB;
+import com.ning.billing.bus.PersistentBus;
 import com.ning.billing.util.api.AuditUserApi;
 import com.ning.billing.util.audit.dao.AuditDao;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.dao.NonEntityDao;
 import com.ning.billing.util.glue.TestUtilModuleNoDB;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -39,7 +39,7 @@ public class UtilTestSuiteNoDB extends GuicyKillbillTestSuiteNoDB {
 
 
     @Inject
-    protected InternalBus eventBus;
+    protected PersistentBus eventBus;
     @Inject
     protected CacheControllerDispatcher controlCacheDispatcher;
     @Inject
diff --git a/util/src/test/java/com/ning/billing/util/UtilTestSuiteWithEmbeddedDB.java b/util/src/test/java/com/ning/billing/util/UtilTestSuiteWithEmbeddedDB.java
index 39a9731..b370086 100644
--- a/util/src/test/java/com/ning/billing/util/UtilTestSuiteWithEmbeddedDB.java
+++ b/util/src/test/java/com/ning/billing/util/UtilTestSuiteWithEmbeddedDB.java
@@ -23,6 +23,8 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
 import com.ning.billing.GuicyKillbillTestSuiteWithEmbeddedDB;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.notificationq.NotificationQueueService;
 import com.ning.billing.util.audit.dao.AuditDao;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
@@ -31,8 +33,6 @@ import com.ning.billing.util.customfield.dao.CustomFieldDao;
 import com.ning.billing.util.dao.NonEntityDao;
 import com.ning.billing.util.export.dao.DatabaseExportDao;
 import com.ning.billing.util.glue.TestUtilModuleWithEmbeddedDB;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-import com.ning.billing.util.svcsapi.bus.InternalBus;
 import com.ning.billing.util.tag.dao.DefaultTagDao;
 import com.ning.billing.util.tag.dao.TagDefinitionDao;
 
@@ -44,7 +44,7 @@ import com.google.inject.Stage;
 public abstract class UtilTestSuiteWithEmbeddedDB extends GuicyKillbillTestSuiteWithEmbeddedDB {
 
     @Inject
-    protected InternalBus eventBus;
+    protected PersistentBus eventBus;
     @Inject
     protected CacheControllerDispatcher controlCacheDispatcher;
     @Inject