killbill-memoizeit

Changes

entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionFactory.java 63(+0 -63)

meter/pom.xml 3(+2 -1)

Details

diff --git a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
index 7bd0ae1..c80e303 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
@@ -44,7 +44,6 @@ import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.api.svcs.DefaultEntitlementInternalApi;
 import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
 import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory;
 import com.ning.billing.entitlement.api.user.EntitlementUserApi;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
@@ -58,9 +57,11 @@ import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
 import com.ning.billing.util.config.CatalogConfig;
 import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 
+
 public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
 
     private BusinessAccountTagSqlDao accountTagSqlDao;
@@ -73,6 +74,19 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
     private EntitlementUserApi entitlementUserApi;
     private BusinessTagDao tagDao;
 
+
+    private NotificationQueueConfig config = new NotificationQueueConfig() {
+        @Override
+        public boolean isNotificationProcessingOff() {
+            return false;
+        }
+
+        @Override
+        public long getSleepTimeMs() {
+            return 3000;
+        }
+    };
+
     @BeforeMethod(groups = "slow")
     public void setUp() throws Exception {
         final Clock clock = new ClockMock();
@@ -89,13 +103,12 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
         accountUserApi = new DefaultAccountUserApi(callContextFactory, internalCallContextFactory, accountDao);
         final CatalogService catalogService = new DefaultCatalogService(Mockito.mock(CatalogConfig.class), Mockito.mock(VersionedCatalogLoader.class));
         final AddonUtils addonUtils = new AddonUtils(catalogService);
-        final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, internalCallContextFactory);
+        final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, config, internalCallContextFactory);
         final EntitlementDao entitlementDao = new DefaultEntitlementDao(dbi, clock, addonUtils, notificationQueueService, eventBus, catalogService);
         final PlanAligner planAligner = new PlanAligner(catalogService);
-        final DefaultSubscriptionApiService apiService = new DefaultSubscriptionApiService(clock, entitlementDao, catalogService, planAligner, internalCallContextFactory);
-        final DefaultSubscriptionFactory subscriptionFactory = new DefaultSubscriptionFactory(apiService, clock, catalogService);
-        entitlementApi = new DefaultEntitlementInternalApi(entitlementDao, subscriptionFactory);
-        entitlementUserApi = new DefaultEntitlementUserApi(clock, entitlementDao, catalogService, apiService, subscriptionFactory, addonUtils, internalCallContextFactory);
+        final DefaultSubscriptionApiService apiService = new DefaultSubscriptionApiService(clock, entitlementDao, catalogService, planAligner, addonUtils, internalCallContextFactory);
+        entitlementApi = new DefaultEntitlementInternalApi(entitlementDao, apiService, clock, catalogService);
+        entitlementUserApi = new DefaultEntitlementUserApi(clock, entitlementDao, catalogService, apiService, addonUtils, internalCallContextFactory);
         tagDao = new BusinessTagDao(accountTagSqlDao, invoicePaymentTagSqlDao, invoiceTagSqlDao, subscriptionTransitionTagSqlDao,
                                     accountApi, entitlementApi);
 
diff --git a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
index e04fed7..5ecdbb6 100644
--- a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
+++ b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
@@ -292,4 +292,12 @@ public interface InvoiceUserApi {
      * @throws InvoiceApiException
      */
     public String getInvoiceAsHTML(UUID invoiceId, TenantContext context) throws AccountApiException, IOException, InvoiceApiException;
+
+    /**
+     * Rebalance CBA for account which have credit and unpaid invoices-- only needed if system is configured to not rebalance automatically.
+     *
+     * @param accountId account id
+     * @param context the callcontext
+     */
+    public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final CallContext context);
 }
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
index ef98c42..517ea8d 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
@@ -140,7 +140,7 @@ public class BeatrixListener {
         default:
         }
         return eventBusType != null ?
-                new ExtBusEventEntry(hostname, objectType, objectId, eventBusType, event.getAccountRecordId(), event.getAccountRecordId()) : null;
+                new ExtBusEventEntry(hostname, objectType, objectId, event.getUserToken(), eventBusType, event.getAccountRecordId(), event.getAccountRecordId()) : null;
     }
 
 }
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java
index b882a7c..a34e62f 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java
@@ -37,12 +37,12 @@ public class ExtBusEventEntry implements PersistentQueueEntryLifecycle {
     private final ObjectType objectType;
     private final UUID objectId;
     private final ExtBusEventType extBusType;
-
+    private final UUID userToken;
 
     public ExtBusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable,
-                         final PersistentQueueEntryLifecycleState processingState,
-                         final ObjectType objectType, final UUID objectId, final  ExtBusEventType extBusType,
-                         final Long accountRecordId, final Long tenantRecordId) {
+                            final PersistentQueueEntryLifecycleState processingState,
+                            final ObjectType objectType, final UUID objectId, final UUID userToken, final ExtBusEventType extBusType,
+                            final Long accountRecordId, final Long tenantRecordId) {
         this.id = id;
         this.createdOwner = createdOwner;
         this.owner = owner;
@@ -51,14 +51,14 @@ public class ExtBusEventEntry implements PersistentQueueEntryLifecycle {
         this.objectType = objectType;
         this.objectId = objectId;
         this.extBusType = extBusType;
+        this.userToken = userToken;
         this.accountRecordId = accountRecordId;
         this.tenantRecordId = tenantRecordId;
     }
 
-    public ExtBusEventEntry(final String createdOwner,
-            final ObjectType objectType, final UUID objectId, final  ExtBusEventType extBusType,
-                         final Long accountRecordId, final Long tenantRecordId) {
-        this(0, createdOwner, null, null, null, objectType, objectId, extBusType, accountRecordId, tenantRecordId);
+    public ExtBusEventEntry(final String createdOwner, final ObjectType objectType, final UUID objectId, final UUID userToken, final ExtBusEventType extBusType,
+                            final Long accountRecordId, final Long tenantRecordId) {
+        this(0, createdOwner, null, null, null, objectType, objectId, userToken, extBusType, accountRecordId, tenantRecordId);
     }
 
     public long getId() {
@@ -78,6 +78,11 @@ public class ExtBusEventEntry implements PersistentQueueEntryLifecycle {
     }
 
     @Override
+    public UUID getUserToken() {
+        return userToken;
+    }
+
+    @Override
     public String getOwner() {
         return owner;
     }
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java
index 2f3c3d0..f174158 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java
@@ -13,6 +13,7 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
+
 package com.ning.billing.beatrix.extbus.dao;
 
 import java.sql.ResultSet;
@@ -42,43 +43,41 @@ import com.ning.billing.util.dao.BinderBase;
 import com.ning.billing.util.dao.MapperBase;
 import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
-
 @ExternalizedSqlViaStringTemplate3()
 public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
 
-
     @SqlQuery
     @Mapper(ExtBusSqlMapper.class)
     public ExtBusEventEntry getNextBusExtEventEntry(@Bind("max") int max,
-                                              @Bind("owner") String owner,
-                                              @Bind("now") Date now,
-                                              @InternalTenantContextBinder final InternalTenantContext context);
+                                                    @Bind("owner") String owner,
+                                                    @Bind("now") Date now,
+                                                    @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlUpdate
     public int claimBusExtEvent(@Bind("owner") String owner,
-                             @Bind("nextAvailable") Date nextAvailable,
-                             @Bind("recordId") Long id,
-                             @Bind("now") Date now,
-                             @InternalTenantContextBinder final InternalCallContext context);
+                                @Bind("nextAvailable") Date nextAvailable,
+                                @Bind("recordId") Long id,
+                                @Bind("now") Date now,
+                                @InternalTenantContextBinder final InternalCallContext context);
 
     @SqlUpdate
     public void clearBusExtEvent(@Bind("recordId") Long id,
-                              @Bind("owner") String owner,
-                              @InternalTenantContextBinder final InternalCallContext context);
+                                 @Bind("owner") String owner,
+                                 @InternalTenantContextBinder final InternalCallContext context);
 
     @SqlUpdate
     public void removeBusExtEventsById(@Bind("recordId") Long id,
-                                    @InternalTenantContextBinder final InternalCallContext context);
+                                       @InternalTenantContextBinder final InternalCallContext context);
 
     @SqlUpdate
     public void insertBusExtEvent(@Bind(binder = ExtBusSqlBinder.class) ExtBusEventEntry evt,
-                               @InternalTenantContextBinder final InternalCallContext context);
+                                  @InternalTenantContextBinder final InternalCallContext context);
 
     @SqlUpdate
     public void insertClaimedExtHistory(@Bind("ownerId") String owner,
-                                     @Bind("claimedDate") Date claimedDate,
-                                     @Bind("busEventId") long id,
-                                     @InternalTenantContextBinder final InternalCallContext context);
+                                        @Bind("claimedDate") Date claimedDate,
+                                        @Bind("busEventId") long id,
+                                        @InternalTenantContextBinder final InternalCallContext context);
 
     public static class ExtBusSqlBinder extends BinderBase implements Binder<Bind, ExtBusEventEntry> {
 
@@ -87,6 +86,7 @@ public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
             stmt.bind("eventType", evt.getExtBusType().toString());
             stmt.bind("objectId", evt.getObjectId().toString());
             stmt.bind("objectType", evt.getObjectType().toString());
+            stmt.bind("userToken", getUUIDString(evt.getUserToken()));
             stmt.bind("createdDate", getDate(new DateTime()));
             stmt.bind("creatingOwner", evt.getCreatedOwner());
             stmt.bind("processingAvailableDate", getDate(evt.getNextAvailableDate()));
@@ -104,6 +104,7 @@ public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
             final ExtBusEventType eventType = ExtBusEventType.valueOf(r.getString("event_type"));
             final UUID objectId = getUUID(r, "object_id");
             final ObjectType objectType = ObjectType.valueOf(r.getString("object_type"));
+            final UUID userToken = getUUID(r, "user_token");
             final String createdOwner = r.getString("creating_owner");
             final DateTime nextAvailableDate = getDateTime(r, "processing_available_date");
             final String processingOwner = r.getString("processing_owner");
@@ -111,7 +112,7 @@ public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
             final Long accountRecordId = r.getLong("account_record_id");
             final Long tenantRecordId = r.getLong("tenant_record_id");
             return new ExtBusEventEntry(recordId, createdOwner, processingOwner, nextAvailableDate, processingState,
-                    objectType, objectId, eventType, accountRecordId, tenantRecordId);
+                                        objectType, objectId, userToken, eventType, accountRecordId, tenantRecordId);
         }
     }
 }
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
index 81c832a..eddf4b5 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
@@ -63,6 +63,8 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
     private final InternalCallContextFactory internalCallContextFactory;
     private final AccountInternalApi accountApi;
 
+    private volatile boolean isStarted;
+
     private static final class EventBusDelegate extends EventBus {
 
         public EventBusDelegate(final String busName) {
@@ -90,10 +92,12 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
 
     public void start() {
         startQueue();
+        isStarted = true;
     }
 
     public void stop() {
         stopQueue();
+        isStarted = false;
     }
 
     @Override
@@ -119,6 +123,11 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
         return result;
     }
 
+    @Override
+    public boolean isStarted() {
+        return isStarted;
+    }
+
     private final UUID getAccountIdFromRecordId(final Long recordId, final InternalCallContext context) {
         try {
             final Account account = accountApi.getAccountByRecordId(recordId, context);
diff --git a/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql b/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
index 655d4e0..d4b955c 100644
--- a/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
+++ b/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
@@ -6,6 +6,7 @@ CREATE TABLE bus_ext_events (
     event_type varchar(32) NOT NULL,
     object_id varchar(64) NOT NULL,
     object_type varchar(32) NOT NULL,
+    user_token char(36),
     created_date datetime NOT NULL,
     creating_owner char(50) NOT NULL,
     processing_owner char(50) DEFAULT NULL,
diff --git a/beatrix/src/main/resources/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.sql.stg b/beatrix/src/main/resources/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.sql.stg
index 897d273..c1acfa4 100644
--- a/beatrix/src/main/resources/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.sql.stg
+++ b/beatrix/src/main/resources/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.sql.stg
@@ -9,6 +9,7 @@ getNextBusExtEventEntry() ::= <<
       , event_type
       , object_id
       , object_type
+      , user_token
       , created_date
       , creating_owner
       , processing_owner
@@ -66,6 +67,7 @@ insertBusExtEvent() ::= <<
      event_type
     , object_id
     , object_type
+    , user_token
     , created_date
     , creating_owner
     , processing_owner
@@ -76,7 +78,8 @@ insertBusExtEvent() ::= <<
     ) values (
       :eventType
     , :objectId
-    , :objectType    
+    , :objectType
+    , :userToken
     , :createdDate
     , :creatingOwner
     , :processingOwner
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
index 38b3a50..fcd10f8 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
@@ -179,17 +179,17 @@ public class TestOverdueIntegration extends TestOverdueBase {
                                             new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 30), new LocalDate(2012, 7, 31), InvoiceItemType.REPAIR_ADJ, new BigDecimal("-249.95")),
                                             new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 23), InvoiceItemType.CBA_ADJ, new BigDecimal("249.95")));
         invoiceChecker.checkInvoice(account.getId(), 4, callContext,
-                                    // Note the end date here is not 07-25, but 07-9. The overdue configuration disabled invoicing between 07-09 and 07-23 (e.g. the bundle
+                                    // Note the end date here is not 07-25, but 07-10. The overdue configuration disabled invoicing between 07-10 and 07-23 (e.g. the bundle
                                     // was inaccessible, hence we didn't want to charge the customer for that period, even though the account was overdue).
-                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 30), new LocalDate(2012, 7, 9), InvoiceItemType.RECURRING, new BigDecimal("74.99")),
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 30), new LocalDate(2012, 7, 10), InvoiceItemType.RECURRING, new BigDecimal("83.31")),
                                     // Item for the upgraded recurring plan
                                     new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 31), InvoiceItemType.RECURRING, new BigDecimal("154.85")),
                                     // Credits consumed
-                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 23), InvoiceItemType.CBA_ADJ, new BigDecimal("-229.84")));
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 23), InvoiceItemType.CBA_ADJ, new BigDecimal("-238.16")));
         invoiceChecker.checkChargedThroughDate(baseSubscription.getId(), new LocalDate(2012, 7, 31), callContext);
 
         // Verify the account balance: 249.95 - 74.99 - 154.85
-        assertEquals(invoiceUserApi.getAccountBalance(account.getId(), callContext).compareTo(new BigDecimal("-20.11")), 0);
+        assertEquals(invoiceUserApi.getAccountBalance(account.getId(), callContext).compareTo(new BigDecimal("-11.79")), 0);
     }
 
     @Test(groups = "slow")
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/EntitlementApiBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/EntitlementApiBase.java
new file mode 100644
index 0000000..39d636b
--- /dev/null
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/EntitlementApiBase.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.entitlement.api;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.ning.billing.catalog.api.CatalogService;
+import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.engine.dao.EntitlementDao;
+import com.ning.billing.entitlement.events.EntitlementEvent;
+import com.ning.billing.util.clock.Clock;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
+public class EntitlementApiBase {
+
+    protected final EntitlementDao dao;
+
+    protected final SubscriptionApiService apiService;
+    protected final Clock clock;
+    protected final CatalogService catalogService;
+
+    public EntitlementApiBase(final EntitlementDao dao, final SubscriptionApiService apiService, final Clock clock, final CatalogService catalogService) {
+        this.dao = dao;
+        this.apiService = apiService;
+        this.clock = clock;
+        this.catalogService = catalogService;
+    }
+
+    protected List<Subscription> createSubscriptionsForApiUse(final List<Subscription> internalSubscriptions) {
+        return new ArrayList<Subscription>(Collections2.transform(internalSubscriptions, new Function<Subscription, Subscription>() {
+            @Override
+            public Subscription apply(final Subscription subscription) {
+                return createSubscriptionForApiUse((SubscriptionData) subscription);
+            }
+        }));
+    }
+
+    protected SubscriptionData createSubscriptionForApiUse(final Subscription internalSubscription) {
+        return new SubscriptionData((SubscriptionData) internalSubscription, apiService, clock);
+    }
+
+    protected SubscriptionData createSubscriptionForApiUse(SubscriptionBuilder builder, List<EntitlementEvent> events) {
+        final SubscriptionData subscription = new SubscriptionData(builder, apiService, clock);
+        if (events.size() > 0) {
+            subscription.rebuildTransitions(events, catalogService.getFullCatalog());
+        }
+        return subscription;
+    }
+}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java
index a6eb3d9..30bb487 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java
@@ -20,7 +20,8 @@ import java.util.List;
 
 import org.joda.time.DateTime;
 
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.events.EntitlementEvent;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
index e836d59..1b42632 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
@@ -25,13 +25,15 @@ import java.util.UUID;
 
 import org.joda.time.DateTime;
 
+import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.alignment.MigrationPlanAligner;
 import com.ning.billing.entitlement.alignment.TimedMigration;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
@@ -54,24 +56,20 @@ import com.ning.billing.util.clock.Clock;
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 
-public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
+public class DefaultEntitlementMigrationApi extends EntitlementApiBase implements EntitlementMigrationApi {
 
-    private final EntitlementDao dao;
     private final MigrationPlanAligner migrationAligner;
-    private final SubscriptionFactory factory;
-    private final Clock clock;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
     public DefaultEntitlementMigrationApi(final MigrationPlanAligner migrationAligner,
-                                          final SubscriptionFactory factory,
+                                          final SubscriptionApiService apiService,
+                                          final CatalogService catalogService,
                                           final EntitlementDao dao,
                                           final Clock clock,
                                           final InternalCallContextFactory internalCallContextFactory) {
-        this.dao = dao;
+        super(dao, apiService, clock, catalogService);
         this.migrationAligner = migrationAligner;
-        this.factory = factory;
-        this.clock = clock;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
@@ -141,13 +139,13 @@ public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
         final TimedMigration[] events = migrationAligner.getEventsMigration(input, now);
         final DateTime migrationStartDate = events[0].getEventTime();
         final List<EntitlementEvent> emptyEvents = Collections.emptyList();
-        final SubscriptionData subscriptionData = factory.createSubscription(new SubscriptionBuilder()
-                                                                                     .setId(UUID.randomUUID())
-                                                                                     .setBundleId(bundleId)
-                                                                                     .setCategory(productCategory)
-                                                                                     .setBundleStartDate(migrationStartDate)
-                                                                                     .setAlignStartDate(migrationStartDate),
-                                                                             emptyEvents);
+        final SubscriptionData subscriptionData = createSubscriptionForApiUse(new SubscriptionBuilder()
+                                                                                      .setId(UUID.randomUUID())
+                                                                                      .setBundleId(bundleId)
+                                                                                      .setCategory(productCategory)
+                                                                                      .setBundleStartDate(migrationStartDate)
+                                                                                      .setAlignStartDate(migrationStartDate),
+                                                                              emptyEvents);
         return new SubscriptionMigrationData(subscriptionData, toEvents(subscriptionData, now, ctd, events, context), ctd);
     }
 
@@ -157,13 +155,13 @@ public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
         final TimedMigration[] events = migrationAligner.getEventsMigration(input, now);
         final DateTime migrationStartDate = events[0].getEventTime();
         final List<EntitlementEvent> emptyEvents = Collections.emptyList();
-        final SubscriptionData subscriptionData = factory.createSubscription(new SubscriptionBuilder()
-                                                                                     .setId(UUID.randomUUID())
-                                                                                     .setBundleId(bundleId)
-                                                                                     .setCategory(productCategory)
-                                                                                     .setBundleStartDate(bundleStartDate)
-                                                                                     .setAlignStartDate(migrationStartDate),
-                                                                             emptyEvents);
+        final SubscriptionData subscriptionData = createSubscriptionForApiUse(new SubscriptionBuilder()
+                                                                                      .setId(UUID.randomUUID())
+                                                                                      .setBundleId(bundleId)
+                                                                                      .setCategory(productCategory)
+                                                                                      .setBundleStartDate(bundleStartDate)
+                                                                                      .setAlignStartDate(migrationStartDate),
+                                                                              emptyEvents);
         return new SubscriptionMigrationData(subscriptionData, toEvents(subscriptionData, now, ctd, events, context), ctd);
     }
 
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
index 4721c66..db0af82 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
@@ -23,10 +23,11 @@ import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.PhaseType;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.PlanPhaseSpecifier;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.InternalCallContext;
 
 public interface SubscriptionApiService {
 
@@ -54,4 +55,6 @@ public interface SubscriptionApiService {
     public boolean changePlanWithPolicy(SubscriptionData subscription, String productName, BillingPeriod term,
                                         String priceList, DateTime requestedDate, ActionPolicy policy, CallContext context)
             throws EntitlementUserApiException;
+
+    public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context);
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java
index 1596030..f4b0420 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java
@@ -29,17 +29,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.ErrorCode;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.catalog.api.CatalogService;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
 import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
+import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.events.EffectiveSubscriptionInternalEvent;
 import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 
@@ -48,18 +51,17 @@ import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 
-public class DefaultEntitlementInternalApi implements EntitlementInternalApi {
+public class DefaultEntitlementInternalApi extends EntitlementApiBase implements EntitlementInternalApi {
 
     private final Logger log = LoggerFactory.getLogger(DefaultEntitlementInternalApi.class);
 
-    private final EntitlementDao dao;
-    private final SubscriptionFactory subscriptionFactory;
 
     @Inject
     public DefaultEntitlementInternalApi(final EntitlementDao dao,
-                                         final SubscriptionFactory subscriptionFactory) {
-        this.dao = dao;
-        this.subscriptionFactory = subscriptionFactory;
+                                         final DefaultSubscriptionApiService apiService,
+                                         final Clock clock,
+                                         final CatalogService catalogService) {
+        super(dao, apiService, clock, catalogService);
     }
 
     @Override
@@ -68,26 +70,31 @@ public class DefaultEntitlementInternalApi implements EntitlementInternalApi {
     }
 
     @Override
-    public List<Subscription> getSubscriptionsForBundle(final UUID bundleId, final InternalTenantContext context) {
-        return dao.getSubscriptions(subscriptionFactory, bundleId, context);
+    public List<Subscription> getSubscriptionsForBundle(UUID bundleId,
+                                                        InternalTenantContext context) {
+        final List<Subscription> internalSubscriptions = dao.getSubscriptions(bundleId, context);
+        return createSubscriptionsForApiUse(internalSubscriptions);
     }
 
     @Override
-    public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) throws EntitlementUserApiException {
-        final Subscription result = dao.getBaseSubscription(subscriptionFactory, bundleId, context);
+    public Subscription getBaseSubscription(UUID bundleId,
+                                            InternalTenantContext context) throws EntitlementUserApiException {
+        final Subscription result = dao.getBaseSubscription(bundleId, context);
         if (result == null) {
             throw new EntitlementUserApiException(ErrorCode.ENT_GET_NO_SUCH_BASE_SUBSCRIPTION, bundleId);
         }
-        return result;
+        return createSubscriptionForApiUse(result);
     }
 
     @Override
-    public Subscription getSubscriptionFromId(final UUID id, final InternalTenantContext context) throws EntitlementUserApiException {
-        final Subscription result = dao.getSubscriptionFromId(subscriptionFactory, id, context);
+
+    public Subscription getSubscriptionFromId(UUID id,
+                                              InternalTenantContext context) throws EntitlementUserApiException {
+        final Subscription result = dao.getSubscriptionFromId(id, context);
         if (result == null) {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_SUBSCRIPTION_ID, id);
         }
-        return result;
+        return createSubscriptionForApiUse(result);
     }
 
     @Override
@@ -105,8 +112,9 @@ public class DefaultEntitlementInternalApi implements EntitlementInternalApi {
     }
 
     @Override
-    public void setChargedThroughDate(final UUID subscriptionId, final LocalDate localChargedThruDate, final InternalCallContext context) {
-        final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, subscriptionId, context);
+    public void setChargedThroughDate(UUID subscriptionId,
+                                      LocalDate localChargedThruDate, InternalCallContext context) {
+        final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionId, context);
         final DateTime chargedThroughDate = localChargedThruDate.toDateTime(new LocalTime(subscription.getStartDate(), DateTimeZone.UTC), DateTimeZone.UTC);
         final SubscriptionBuilder builder = new SubscriptionBuilder(subscription)
                 .setChargedThroughDate(chargedThroughDate)
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
index f61d2a6..c44617a 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
@@ -16,6 +16,7 @@
 
 package com.ning.billing.entitlement.api.timeline;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -25,21 +26,25 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
 
+import javax.annotation.Nullable;
+
 import org.joda.time.DateTime;
 
 import com.ning.billing.ErrorCode;
 import com.ning.billing.catalog.api.CatalogApiException;
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
 import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.NewEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.glue.DefaultEntitlementModule;
@@ -47,17 +52,21 @@ import com.ning.billing.util.callcontext.CallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 import com.ning.billing.util.callcontext.TenantContext;
+import com.ning.billing.util.clock.Clock;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
-public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
+public class DefaultEntitlementTimelineApi extends EntitlementApiBase implements EntitlementTimelineApi {
 
-    private final EntitlementDao dao;
-    private final SubscriptionFactory factory;
     private final RepairEntitlementLifecycleDao repairDao;
     private final CatalogService catalogService;
     private final InternalCallContextFactory internalCallContextFactory;
+    private final AddonUtils addonUtils;
+
+    private final SubscriptionApiService repairApiService;
 
     private enum RepairType {
         BASE_REPAIR,
@@ -66,14 +75,17 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
     }
 
     @Inject
-    public DefaultEntitlementTimelineApi(@Named(DefaultEntitlementModule.REPAIR_NAMED) final SubscriptionFactory factory, final CatalogService catalogService,
+    public DefaultEntitlementTimelineApi(final CatalogService catalogService,
+                                         final SubscriptionApiService apiService,
                                          @Named(DefaultEntitlementModule.REPAIR_NAMED) final RepairEntitlementLifecycleDao repairDao, final EntitlementDao dao,
-                                         final InternalCallContextFactory internalCallContextFactory) {
+                                         @Named(DefaultEntitlementModule.REPAIR_NAMED) final SubscriptionApiService repairApiService,
+                                         final InternalCallContextFactory internalCallContextFactory, final Clock clock, final AddonUtils addonUtils) {
+        super(dao, apiService, clock, catalogService);
         this.catalogService = catalogService;
-        this.dao = dao;
         this.repairDao = repairDao;
-        this.factory = factory;
         this.internalCallContextFactory = internalCallContextFactory;
+        this.repairApiService = repairApiService;
+        this.addonUtils = addonUtils;
     }
 
     @Override
@@ -101,7 +113,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
             if (bundle == null) {
                 throw new EntitlementRepairException(ErrorCode.ENT_REPAIR_UNKNOWN_BUNDLE, descBundle);
             }
-            final List<Subscription> subscriptions = dao.getSubscriptions(factory, bundle.getId(), internalCallContextFactory.createInternalTenantContext(context));
+            final List<SubscriptionDataRepair> subscriptions = convertToSubscriptionsDataRepair(dao.getSubscriptions(bundle.getId(), internalCallContextFactory.createInternalTenantContext(context)));
             if (subscriptions.size() == 0) {
                 throw new EntitlementRepairException(ErrorCode.ENT_REPAIR_NO_ACTIVE_SUBSCRIPTIONS, bundle.getId());
             }
@@ -113,6 +125,18 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
         }
     }
 
+    private List<SubscriptionDataRepair> convertToSubscriptionsDataRepair(List<Subscription> input) {
+        return new ArrayList<SubscriptionDataRepair>(Collections2.transform(input, new Function<Subscription, SubscriptionDataRepair>() {
+            @Override
+            public SubscriptionDataRepair apply(@Nullable final Subscription subscription) {
+                return convertToSubscriptionDataRepair((SubscriptionData) subscription);
+            }
+        }));
+    }
+    private SubscriptionDataRepair convertToSubscriptionDataRepair(SubscriptionData input) {
+        return new SubscriptionDataRepair(input, repairApiService, (EntitlementDao) repairDao, clock, addonUtils, catalogService, internalCallContextFactory);
+    }
+
     @Override
     public BundleTimeline repairBundle(final BundleTimeline input, final boolean dryRun, final CallContext context) throws EntitlementRepairException {
         final InternalTenantContext tenantContext = internalCallContextFactory.createInternalTenantContext(context);
@@ -123,7 +147,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
             }
 
             // Subscriptions are ordered with BASE subscription first-- if exists
-            final List<Subscription> subscriptions = dao.getSubscriptions(factory, input.getId(), tenantContext);
+            final List<SubscriptionDataRepair> subscriptions = convertToSubscriptionsDataRepair(dao.getSubscriptions(input.getId(), tenantContext));
             if (subscriptions.size() == 0) {
                 throw new EntitlementRepairException(ErrorCode.ENT_REPAIR_NO_ACTIVE_SUBSCRIPTIONS, input.getId());
             }
@@ -252,7 +276,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
         }
     }
 
-    private void validateBasePlanRecreate(final boolean isBasePlanRecreate, final List<Subscription> subscriptions, final List<SubscriptionTimeline> input)
+    private void validateBasePlanRecreate(final boolean isBasePlanRecreate, final List<SubscriptionDataRepair> subscriptions, final List<SubscriptionTimeline> input)
             throws EntitlementRepairException {
         if (!isBasePlanRecreate) {
             return;
@@ -269,7 +293,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
         }
     }
 
-    private void validateInputSubscriptionsKnown(final List<Subscription> subscriptions, final List<SubscriptionTimeline> input)
+    private void validateInputSubscriptionsKnown(final List<SubscriptionDataRepair> subscriptions, final List<SubscriptionTimeline> input)
             throws EntitlementRepairException {
         for (final SubscriptionTimeline cur : input) {
             boolean found = false;
@@ -365,7 +389,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
         return result;
     }
 
-    private String getViewId(final DateTime lastUpdateBundleDate, final List<Subscription> subscriptions) {
+    private String getViewId(final DateTime lastUpdateBundleDate, final List<SubscriptionDataRepair> subscriptions) {
         final StringBuilder tmp = new StringBuilder();
         long lastOrderedId = -1;
         for (final Subscription cur : subscriptions) {
@@ -412,7 +436,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
         };
     }
 
-    private List<SubscriptionTimeline> createGetSubscriptionRepairList(final List<Subscription> subscriptions, final List<SubscriptionTimeline> inRepair) throws CatalogApiException {
+    private List<SubscriptionTimeline> createGetSubscriptionRepairList(final List<SubscriptionDataRepair> subscriptions, final List<SubscriptionTimeline> inRepair) throws CatalogApiException {
 
         final List<SubscriptionTimeline> result = new LinkedList<SubscriptionTimeline>();
         final Set<UUID> repairIds = new TreeSet<UUID>();
@@ -464,7 +488,9 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
             }
         }
 
-        return (SubscriptionDataRepair) factory.createSubscription(builder, initialEvents);
+        final SubscriptionDataRepair subscriptiondataRepair = new SubscriptionDataRepair(builder, curData.getEvents(), repairApiService, (EntitlementDao) repairDao, clock, addonUtils, catalogService, internalCallContextFactory);
+        subscriptiondataRepair.rebuildTransitions(curData.getEvents(), catalogService.getFullCatalog());
+        return subscriptiondataRepair;
     }
 
     private SubscriptionTimeline findAndCreateSubscriptionRepair(final UUID target, final List<SubscriptionTimeline> input) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
index 1870bfe..9f5d397 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
@@ -16,12 +16,17 @@
 
 package com.ning.billing.entitlement.api.timeline;
 
+import org.joda.time.DateTime;
+
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
+import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.glue.DefaultEntitlementModule;
+import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
 
@@ -35,7 +40,14 @@ public class RepairSubscriptionApiService extends DefaultSubscriptionApiService 
                                         @Named(DefaultEntitlementModule.REPAIR_NAMED) final EntitlementDao dao,
                                         final CatalogService catalogService,
                                         final PlanAligner planAligner,
+                                        final AddonUtils addonUtils,
                                         final InternalCallContextFactory internalCallContextFactory) {
-        super(clock, dao, catalogService, planAligner, internalCallContextFactory);
+        super(clock, dao, catalogService, planAligner, addonUtils, internalCallContextFactory);
+    }
+
+    // Nothing to do for repair as we pass all the repair events in the stream
+    @Override
+    public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context) {
+        return 0;
     }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
index 301f15f..310469c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
@@ -24,7 +24,6 @@ import org.joda.time.DateTime;
 
 import com.ning.billing.ErrorCode;
 import com.ning.billing.ObjectType;
-import com.ning.billing.catalog.api.Catalog;
 import com.ning.billing.catalog.api.CatalogApiException;
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.Plan;
@@ -33,8 +32,8 @@ import com.ning.billing.catalog.api.Product;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.api.user.SubscriptionTransition;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
@@ -59,8 +58,6 @@ public class SubscriptionDataRepair extends SubscriptionData {
     private final List<EntitlementEvent> initialEvents;
     private final InternalCallContextFactory internalCallContextFactory;
 
-    // Low level events are ONLY used for Repair APIs
-    private List<EntitlementEvent> events;
 
     public SubscriptionDataRepair(final SubscriptionBuilder builder, final List<EntitlementEvent> initialEvents, final SubscriptionApiService apiService,
                                   final EntitlementDao dao, final Clock clock, final AddonUtils addonUtils, final CatalogService catalogService,
@@ -74,6 +71,20 @@ public class SubscriptionDataRepair extends SubscriptionData {
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
+
+
+    public SubscriptionDataRepair(final SubscriptionData subscriptionData, final SubscriptionApiService apiService,
+                                  final EntitlementDao dao, final Clock clock, final AddonUtils addonUtils, final CatalogService catalogService,
+                                  final InternalCallContextFactory internalCallContextFactory) {
+        super(subscriptionData, apiService , clock);
+        this.repairDao = dao;
+        this.addonUtils = addonUtils;
+        this.clock = clock;
+        this.catalogService = catalogService;
+        this.initialEvents = subscriptionData.getEvents();
+        this.internalCallContextFactory = internalCallContextFactory;
+    }
+
     DateTime getLastUserEventEffectiveDate() {
         DateTime res = null;
         for (final EntitlementEvent cur : events) {
@@ -185,12 +196,6 @@ public class SubscriptionDataRepair extends SubscriptionData {
         }
     }
 
-    @Override
-    public void rebuildTransitions(final List<EntitlementEvent> inputEvents, final Catalog catalog) {
-        this.events = inputEvents;
-        super.rebuildTransitions(inputEvents, catalog);
-    }
-
     public List<EntitlementEvent> getEvents() {
         return events;
     }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java
index 466d466..651b297 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java
@@ -29,7 +29,8 @@ import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.PlanPhase;
 import com.ning.billing.catalog.api.PlanPhaseSpecifier;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
 import com.ning.billing.entitlement.api.timeline.BundleTimeline;
@@ -37,7 +38,8 @@ import com.ning.billing.entitlement.api.timeline.EntitlementRepairException;
 import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
 import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline;
 import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.ExistingEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
@@ -58,22 +60,17 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 
-public class DefaultEntitlementTransferApi implements EntitlementTransferApi {
+public class DefaultEntitlementTransferApi extends EntitlementApiBase implements EntitlementTransferApi {
 
-    private final Clock clock;
-    private final EntitlementDao dao;
     private final CatalogService catalogService;
-    private final SubscriptionFactory subscriptionFactory;
     private final EntitlementTimelineApi timelineApi;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
     public DefaultEntitlementTransferApi(final Clock clock, final EntitlementDao dao, final EntitlementTimelineApi timelineApi, final CatalogService catalogService,
-                                         final SubscriptionFactory subscriptionFactory, final InternalCallContextFactory internalCallContextFactory) {
-        this.clock = clock;
-        this.dao = dao;
+                                         final SubscriptionApiService apiService, final InternalCallContextFactory internalCallContextFactory) {
+        super(dao, apiService, clock, catalogService);
         this.catalogService = catalogService;
-        this.subscriptionFactory = subscriptionFactory;
         this.timelineApi = timelineApi;
         this.internalCallContextFactory = internalCallContextFactory;
     }
@@ -220,7 +217,7 @@ public class DefaultEntitlementTransferApi implements EntitlementTransferApi {
             DateTime bundleStartdate = null;
 
             for (final SubscriptionTimeline cur : bundleTimeline.getSubscriptions()) {
-                final SubscriptionData oldSubscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, cur.getId(), fromInternalCallContext);
+                final SubscriptionData oldSubscription = (SubscriptionData) dao.getSubscriptionFromId(cur.getId(), fromInternalCallContext);
                 final List<ExistingEvent> existingEvents = cur.getExistingEvents();
                 final ProductCategory productCategory = existingEvents.get(0).getPlanPhaseSpecifier().getProductCategory();
                 if (productCategory == ProductCategory.ADD_ON) {
@@ -254,13 +251,13 @@ public class DefaultEntitlementTransferApi implements EntitlementTransferApi {
                 }
 
                 // Create the new subscription for the new bundle on the new account
-                final SubscriptionData subscriptionData = subscriptionFactory.createSubscription(new SubscriptionBuilder()
-                                                                                                         .setId(UUID.randomUUID())
-                                                                                                         .setBundleId(subscriptionBundleData.getId())
-                                                                                                         .setCategory(productCategory)
-                                                                                                         .setBundleStartDate(effectiveTransferDate)
-                                                                                                         .setAlignStartDate(subscriptionAlignStartDate),
-                                                                                                 ImmutableList.<EntitlementEvent>of());
+                final SubscriptionData subscriptionData = createSubscriptionForApiUse(new SubscriptionBuilder()
+                                                                                              .setId(UUID.randomUUID())
+                                                                                              .setBundleId(subscriptionBundleData.getId())
+                                                                                              .setCategory(productCategory)
+                                                                                              .setBundleStartDate(effectiveTransferDate)
+                                                                                              .setAlignStartDate(subscriptionAlignStartDate),
+                                                                                      ImmutableList.<EntitlementEvent>of());
 
                 final List<EntitlementEvent> events = toEvents(existingEvents, subscriptionData, effectiveTransferDate, context);
                 final SubscriptionMigrationData curData = new SubscriptionMigrationData(subscriptionData, events, null);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
index 32d968c..4272038 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 public class DefaultEffectiveSubscriptionEvent extends DefaultSubscriptionEvent implements EffectiveSubscriptionInternalEvent {
+
     public DefaultEffectiveSubscriptionEvent(final SubscriptionTransitionData in, final DateTime startDate, final Long accountRecordId, final Long tenantRecordId) {
         super(in, startDate, accountRecordId, tenantRecordId);
     }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
index d7b8d6e..d3d128c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
@@ -33,8 +33,7 @@ import com.ning.billing.catalog.api.PlanPhase;
 import com.ning.billing.catalog.api.PlanPhaseSpecifier;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
 import com.ning.billing.entitlement.api.user.SubscriptionStatusDryRun.DryRunChangeReason;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
@@ -48,26 +47,19 @@ import com.ning.billing.util.clock.DefaultClock;
 
 import com.google.inject.Inject;
 
-public class DefaultEntitlementUserApi implements EntitlementUserApi {
+public class DefaultEntitlementUserApi extends EntitlementApiBase implements EntitlementUserApi {
 
-    private final Clock clock;
-    private final EntitlementDao dao;
     private final CatalogService catalogService;
-    private final DefaultSubscriptionApiService apiService;
     private final AddonUtils addonUtils;
-    private final SubscriptionFactory subscriptionFactory;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
     public DefaultEntitlementUserApi(final Clock clock, final EntitlementDao dao, final CatalogService catalogService,
-                                     final DefaultSubscriptionApiService apiService, final SubscriptionFactory subscriptionFactory,
+                                     final DefaultSubscriptionApiService apiService,
                                      final AddonUtils addonUtils, final InternalCallContextFactory internalCallContextFactory) {
-        this.clock = clock;
-        this.apiService = apiService;
-        this.dao = dao;
+        super(dao, apiService, clock, catalogService);
         this.catalogService = catalogService;
         this.addonUtils = addonUtils;
-        this.subscriptionFactory = subscriptionFactory;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
@@ -82,11 +74,11 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
 
     @Override
     public Subscription getSubscriptionFromId(final UUID id, final TenantContext context) throws EntitlementUserApiException {
-        final Subscription result = dao.getSubscriptionFromId(subscriptionFactory, id, internalCallContextFactory.createInternalTenantContext(context));
+        final Subscription result = dao.getSubscriptionFromId(id, internalCallContextFactory.createInternalTenantContext(context));
         if (result == null) {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_SUBSCRIPTION_ID, id);
         }
-        return result;
+        return createSubscriptionForApiUse(result);
     }
 
     @Override
@@ -111,23 +103,26 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
 
     @Override
     public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId, final String bundleKey, final TenantContext context) {
-        return dao.getSubscriptionsForAccountAndKey(subscriptionFactory, accountId, bundleKey, internalCallContextFactory.createInternalTenantContext(context));
+        final List<Subscription> internalSubscriptions = dao.getSubscriptionsForAccountAndKey(accountId, bundleKey, internalCallContextFactory.createInternalTenantContext(context));
+        return createSubscriptionsForApiUse(internalSubscriptions);
     }
 
     @Override
     public List<Subscription> getSubscriptionsForBundle(final UUID bundleId, final TenantContext context) {
-        return dao.getSubscriptions(subscriptionFactory, bundleId, internalCallContextFactory.createInternalTenantContext(context));
+        final List<Subscription> internalSubscriptions = dao.getSubscriptions(bundleId, internalCallContextFactory.createInternalTenantContext(context));
+        return createSubscriptionsForApiUse(internalSubscriptions);
     }
 
     @Override
     public Subscription getBaseSubscription(final UUID bundleId, final TenantContext context) throws EntitlementUserApiException {
-        final Subscription result = dao.getBaseSubscription(subscriptionFactory, bundleId, internalCallContextFactory.createInternalTenantContext(context));
+        final Subscription result = dao.getBaseSubscription(bundleId, internalCallContextFactory.createInternalTenantContext(context));
         if (result == null) {
             throw new EntitlementUserApiException(ErrorCode.ENT_GET_NO_SUCH_BASE_SUBSCRIPTION, bundleId);
         }
-        return result;
+        return createSubscriptionForApiUse(result);
     }
 
+
     @Override
     public SubscriptionBundle createBundleForAccount(final UUID accountId, final String bundleName, final CallContext context)
             throws EntitlementUserApiException {
@@ -162,7 +157,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
             }
 
             DateTime bundleStartDate = null;
-            final SubscriptionData baseSubscription = (SubscriptionData) dao.getBaseSubscription(subscriptionFactory, bundleId, internalCallContextFactory.createInternalTenantContext(context));
+            final SubscriptionData baseSubscription = (SubscriptionData) dao.getBaseSubscription(bundleId, internalCallContextFactory.createInternalTenantContext(context));
             switch (plan.getProduct().getCategory()) {
                 case BASE:
                     if (baseSubscription != null) {
@@ -231,7 +226,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
     public List<SubscriptionStatusDryRun> getDryRunChangePlanStatus(final UUID subscriptionId, @Nullable final String baseProductName,
                                                                     final DateTime requestedDate, final TenantContext context)
             throws EntitlementUserApiException {
-        final Subscription subscription = dao.getSubscriptionFromId(subscriptionFactory, subscriptionId, internalCallContextFactory.createInternalTenantContext(context));
+        final Subscription subscription = dao.getSubscriptionFromId(subscriptionId, internalCallContextFactory.createInternalTenantContext(context));
         if (subscription == null) {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_SUBSCRIPTION_ID, subscriptionId);
         }
@@ -241,7 +236,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
 
         final List<SubscriptionStatusDryRun> result = new LinkedList<SubscriptionStatusDryRun>();
 
-        final List<Subscription> bundleSubscriptions = dao.getSubscriptions(subscriptionFactory, subscription.getBundleId(), internalCallContextFactory.createInternalTenantContext(context));
+        final List<Subscription> bundleSubscriptions = dao.getSubscriptions(subscription.getBundleId(), internalCallContextFactory.createInternalTenantContext(context));
         for (final Subscription cur : bundleSubscriptions) {
             if (cur.getId().equals(subscriptionId)) {
                 continue;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
index 3dd5850..d60a898 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
@@ -17,6 +17,8 @@
 package com.ning.billing.entitlement.api.user;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 
@@ -37,11 +39,12 @@ import com.ning.billing.catalog.api.PlanSpecifier;
 import com.ning.billing.catalog.api.PriceList;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.Product;
+import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.alignment.TimedPhase;
 import com.ning.billing.entitlement.api.SubscriptionApiService;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.phase.PhaseEvent;
@@ -68,18 +71,22 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
     private final EntitlementDao dao;
     private final CatalogService catalogService;
     private final PlanAligner planAligner;
+    private final AddonUtils addonUtils;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
     public DefaultSubscriptionApiService(final Clock clock, final EntitlementDao dao, final CatalogService catalogService,
-                                         final PlanAligner planAligner, final InternalCallContextFactory internalCallContextFactory) {
+                                         final PlanAligner planAligner, final AddonUtils addonUtils,
+                                         final InternalCallContextFactory internalCallContextFactory) {
         this.clock = clock;
         this.catalogService = catalogService;
         this.planAligner = planAligner;
         this.dao = dao;
+        this.addonUtils = addonUtils;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
+
     @Override
     public SubscriptionData createPlan(final SubscriptionBuilder builder, final Plan plan, final PhaseType initialPhase,
                                        final String realPriceList, final DateTime requestedDate, final DateTime effectiveDate, final DateTime processedDate,
@@ -215,6 +222,9 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
         final InternalCallContext internalCallContext = createCallContextFromBundleId(subscription.getBundleId(), context);
         dao.cancelSubscription(subscription, cancelEvent, internalCallContext, 0);
         subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId(), internalCallContext), catalogService.getFullCatalog());
+
+        cancelAddOnsIfRequired(subscription, effectiveDate, internalCallContext);
+
         return (policy == ActionPolicy.IMMEDIATE);
     }
 
@@ -354,9 +364,58 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
         dao.changePlan(subscription, changeEvents, internalCallContext);
         subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId(), internalCallContext), catalogService.getFullCatalog());
 
+        cancelAddOnsIfRequired(subscription, effectiveDate, internalCallContext);
+
         return (policy == ActionPolicy.IMMEDIATE);
     }
 
+
+    public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context) {
+
+        // If cancellation/change occur in the future, there is nothing to do
+        final DateTime now = clock.getUTCNow();
+        if (effectiveDate.compareTo(now) > 0) {
+            return 0;
+        }
+
+        final Product baseProduct = (baseSubscription.getState() == SubscriptionState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
+
+        final List<Subscription> subscriptions = dao.getSubscriptions(baseSubscription.getBundleId(), context);
+
+        final List<SubscriptionData> subscriptionsToBeCancelled = new LinkedList<SubscriptionData>();
+        final List<EntitlementEvent> cancelEvents = new LinkedList<EntitlementEvent>();
+
+        for (final Subscription subscription : subscriptions) {
+            final SubscriptionData cur = (SubscriptionData) subscription;
+            if (cur.getState() == SubscriptionState.CANCELLED ||
+                cur.getCategory() != ProductCategory.ADD_ON) {
+                continue;
+            }
+
+            final Plan addonCurrentPlan = cur.getCurrentPlan();
+            if (baseProduct == null ||
+                addonUtils.isAddonIncluded(baseProduct, addonCurrentPlan) ||
+                !addonUtils.isAddonAvailable(baseProduct, addonCurrentPlan)) {
+                //
+                // Perform AO cancellation using the effectiveDate of the BP
+                //
+                final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
+                                                                                .setSubscriptionId(cur.getId())
+                                                                                .setActiveVersion(cur.getActiveVersion())
+                                                                                .setProcessedDate(now)
+                                                                                .setEffectiveDate(effectiveDate)
+                                                                                .setRequestedDate(now)
+                                                                                .setUserToken(context.getUserToken())
+                                                                                .setFromDisk(true));
+                subscriptionsToBeCancelled.add(cur);
+                cancelEvents.add(cancelEvent);
+            }
+        }
+
+        dao.cancelSubscriptions(subscriptionsToBeCancelled, cancelEvents, context);
+        return subscriptionsToBeCancelled.size();
+    }
+
     private void validateRequestedDate(final SubscriptionData subscription, final DateTime now, final DateTime requestedDate)
             throws EntitlementUserApiException {
 
@@ -364,7 +423,7 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_FUTURE_DATE, requestedDate.toString());
         }
 
-        final SubscriptionTransition  previousTransition = subscription.getPreviousTransition();
+        final SubscriptionTransition previousTransition = subscription.getPreviousTransition();
         if (previousTransition != null && previousTransition.getEffectiveTransitionTime().isAfter(requestedDate)) {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_DATE,
                                                   requestedDate.toString(), previousTransition.getEffectiveTransitionTime());
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
index 2903504..7d88024 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
@@ -39,7 +39,6 @@ import com.ning.billing.catalog.api.PriceList;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionDataIterator.Kind;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionDataIterator.Order;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionDataIterator.TimeLimit;
@@ -85,6 +84,14 @@ public class SubscriptionData extends EntityBase implements Subscription {
     //
     private LinkedList<SubscriptionTransitionData> transitions;
 
+    // Low level events are ONLY used for Repair APIs
+    protected List<EntitlementEvent> events;
+
+
+    public List<EntitlementEvent> getEvents() {
+        return events;
+    }
+
     // Transient object never returned at the API
     public SubscriptionData(final SubscriptionBuilder builder) {
         this(builder, null, null);
@@ -103,6 +110,22 @@ public class SubscriptionData extends EntityBase implements Subscription {
         this.paidThroughDate = builder.getPaidThroughDate();
     }
 
+    // Used for API to make sure we have a clock and an apiService set before we return the object
+    public SubscriptionData(final SubscriptionData internalSubscription, final SubscriptionApiService apiService, final Clock clock) {
+        super(internalSubscription.getId(), internalSubscription.getCreatedDate(), internalSubscription.getUpdatedDate());
+        this.apiService = apiService;
+        this.clock = clock;
+        this.bundleId = internalSubscription.getBundleId();
+        this.alignStartDate = internalSubscription.getAlignStartDate();
+        this.bundleStartDate = internalSubscription.getBundleStartDate();
+        this.category = internalSubscription.getCategory();
+        this.activeVersion = internalSubscription.getActiveVersion();
+        this.chargedThroughDate = internalSubscription.getChargedThroughDate();
+        this.paidThroughDate = internalSubscription.getPaidThroughDate();
+        this.transitions = new LinkedList<SubscriptionTransitionData>(internalSubscription.getAllTransitions());
+        this.events = internalSubscription.getEvents();
+    }
+
     @Override
     public UUID getBundleId() {
         return bundleId;
@@ -484,13 +507,14 @@ public class SubscriptionData extends EntityBase implements Subscription {
                 "Failed to find CurrentPhaseStart id = %s", getId().toString()));
     }
 
-    public void rebuildTransitions(final List<EntitlementEvent> inputEvents,
-            final Catalog catalog) {
+    public void rebuildTransitions(final List<EntitlementEvent> inputEvents, final Catalog catalog) {
 
         if (inputEvents == null) {
             return;
         }
 
+        this.events = inputEvents;
+
         SubscriptionState nextState = null;
         String nextPlanName = null;
         String nextPhaseName = null;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 27c9eea..ce206ed 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -28,12 +28,10 @@ import org.slf4j.LoggerFactory;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.Product;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.util.config.EntitlementConfig;
-import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.alignment.TimedPhase;
 import com.ning.billing.entitlement.api.EntitlementService;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
 import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
@@ -80,29 +78,25 @@ public class Engine implements EventListener, EntitlementService {
     private final PlanAligner planAligner;
     private final AddonUtils addonUtils;
     private final InternalBus eventBus;
-    private final EntitlementConfig config;
     private final NotificationQueueService notificationQueueService;
-    private final SubscriptionFactory subscriptionFactory;
     private final InternalCallContextFactory internalCallContextFactory;
-
     private NotificationQueue subscriptionEventQueue;
+    private final SubscriptionApiService apiService;
 
     @Inject
     public Engine(final Clock clock, final EntitlementDao dao, final PlanAligner planAligner,
-                  final EntitlementConfig config,
                   final AddonUtils addonUtils, final InternalBus eventBus,
                   final NotificationQueueService notificationQueueService,
-                  final SubscriptionFactory subscriptionFactory,
-                  final InternalCallContextFactory internalCallContextFactory) {
+                  final InternalCallContextFactory internalCallContextFactory,
+                  final SubscriptionApiService apiService) {
         this.clock = clock;
         this.dao = dao;
         this.planAligner = planAligner;
         this.addonUtils = addonUtils;
-        this.config = config;
         this.eventBus = eventBus;
         this.notificationQueueService = notificationQueueService;
-        this.subscriptionFactory = subscriptionFactory;
         this.internalCallContextFactory = internalCallContextFactory;
+        this.apiService = apiService;
     }
 
     @Override
@@ -115,7 +109,7 @@ public class Engine implements EventListener, EntitlementService {
         try {
             final NotificationQueueHandler queueHandler = new NotificationQueueHandler() {
                 @Override
-                public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+                public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final UUID fromNotificationQueueUserToken, final Long accountRecordId, final Long tenantRecordId) {
                     if (!(inputKey instanceof EntitlementNotificationKey)) {
                         log.error("Entitlement service received an unexpected event type {}" + inputKey.getClass().getName());
                         return;
@@ -128,28 +122,16 @@ public class Engine implements EventListener, EntitlementService {
                         return;
                     }
 
-                    final UUID userToken = (event.getType() == EventType.API_USER) ? ((ApiEvent) event).getUserToken() : null;
+                    // TODO STEPH?
+                    final UUID userToken = (event.getType() == EventType.API_USER) ? ((ApiEvent) event).getUserToken() : fromNotificationQueueUserToken;
                     final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "SubscriptionEventQueue", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
                     processEventReady(event, key.getSeqId(), context);
                 }
             };
 
-            final NotificationConfig notificationConfig = new NotificationConfig() {
-                @Override
-                public long getSleepTimeMs() {
-                    return config.getSleepTimeMs();
-                }
-
-                @Override
-                public boolean isNotificationProcessingOff() {
-                    return config.isNotificationProcessingOff();
-                }
-            };
-
             subscriptionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
                                                                                       NOTIFICATION_QUEUE_NAME,
-                                                                                      queueHandler,
-                                                                                      notificationConfig);
+                                                                                      queueHandler);
         } catch (NotificationQueueAlreadyExists e) {
             throw new RuntimeException(e);
         }
@@ -174,7 +156,7 @@ public class Engine implements EventListener, EntitlementService {
             return;
         }
 
-        final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, event.getSubscriptionId(), context);
+        final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(event.getSubscriptionId(), context);
         if (subscription == null) {
             log.warn("Failed to retrieve subscription for id %s", event.getSubscriptionId());
             return;
@@ -187,7 +169,6 @@ public class Engine implements EventListener, EntitlementService {
         //
         // Do any internal processing on that event before we send the event to the bus
         //
-
         int theRealSeqId = seqId;
         if (event.getType() == EventType.PHASE) {
             onPhaseEvent(subscription, context);
@@ -198,7 +179,7 @@ public class Engine implements EventListener, EntitlementService {
         try {
             final SubscriptionTransitionData transition = (subscription.getTransitionFromEvent(event, theRealSeqId));
             final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, subscription.getAlignStartDate(),
-                    context.getAccountRecordId(), context.getTenantRecordId());
+                                                                                                      context.getAccountRecordId(), context.getTenantRecordId());
             eventBus.post(busEvent, context);
         } catch (EventBusException e) {
             log.warn("Failed to post entitlement event " + event, e);
@@ -221,47 +202,8 @@ public class Engine implements EventListener, EntitlementService {
     }
 
     private int onBasePlanEvent(final SubscriptionData baseSubscription, final ApiEvent event, final InternalCallContext context) {
-        final DateTime now = clock.getUTCNow();
-        final Product baseProduct = (baseSubscription.getState() == SubscriptionState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
-
-        final List<Subscription> subscriptions = dao.getSubscriptions(subscriptionFactory, baseSubscription.getBundleId(), context);
-
-        final Map<UUID, EntitlementEvent> addOnCancellations = new HashMap<UUID, EntitlementEvent>();
-        final Map<UUID, SubscriptionData> addOnCancellationSubscriptions = new HashMap<UUID, SubscriptionData>();
-        for (final Subscription subscription : subscriptions) {
-            final SubscriptionData cur = (SubscriptionData) subscription;
-            if (cur.getState() == SubscriptionState.CANCELLED ||
-                cur.getCategory() != ProductCategory.ADD_ON) {
-                continue;
-            }
+        return apiService.cancelAddOnsIfRequired(baseSubscription, event.getEffectiveDate(), context);
+    }
 
-            final Plan addonCurrentPlan = cur.getCurrentPlan();
-            if (baseProduct == null ||
-                addonUtils.isAddonIncluded(baseProduct, addonCurrentPlan) ||
-                !addonUtils.isAddonAvailable(baseProduct, addonCurrentPlan)) {
-                //
-                // Perform AO cancellation using the effectiveDate of the BP
-                //
-                final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
-                                                                                .setSubscriptionId(cur.getId())
-                                                                                .setActiveVersion(cur.getActiveVersion())
-                                                                                .setProcessedDate(now)
-                                                                                .setEffectiveDate(event.getEffectiveDate())
-                                                                                .setRequestedDate(now)
-                                                                                .setUserToken(context.getUserToken())
-                                                                                .setFromDisk(true));
 
-                addOnCancellations.put(cur.getId(), cancelEvent);
-                addOnCancellationSubscriptions.put(cur.getId(), cur);
-            }
-        }
-
-        final int addOnSize = addOnCancellations.size();
-        int cancelSeq = addOnSize - 1;
-        for (final UUID key : addOnCancellations.keySet()) {
-            dao.cancelSubscription(addOnCancellationSubscriptions.get(key), addOnCancellations.get(key), context, cancelSeq);
-            cancelSeq--;
-        }
-        return addOnSize;
-    }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
index aecdd99..7f077ce 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2010-2011 Ning, Inc.
+ * Copyright 2010-2012 Ning, Inc.
  *
  * Ning licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -39,19 +40,20 @@ import com.ning.billing.ErrorCode;
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
 import com.ning.billing.entitlement.api.timeline.DefaultRepairEntitlementEvent;
 import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
 import com.ning.billing.entitlement.api.transfer.TransferCancelData;
+import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
 import com.ning.billing.entitlement.api.user.DefaultRequestedSubscriptionEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.core.Engine;
 import com.ning.billing.entitlement.engine.core.EntitlementNotificationKey;
@@ -76,6 +78,7 @@ import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import com.ning.billing.util.events.EffectiveSubscriptionInternalEvent;
 import com.ning.billing.util.events.RepairEntitlementInternalEvent;
 import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
@@ -98,6 +101,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
     private final NotificationQueueService notificationQueueService;
     private final AddonUtils addonUtils;
     private final InternalBus eventBus;
+    private final CatalogService catalogService;
 
     @Inject
     public DefaultEntitlementDao(final IDBI dbi, final Clock clock, final AddonUtils addonUtils,
@@ -107,6 +111,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
         this.notificationQueueService = notificationQueueService;
         this.addonUtils = addonUtils;
         this.eventBus = eventBus;
+        this.catalogService = catalogService;
     }
 
     @Override
@@ -206,29 +211,26 @@ public class DefaultEntitlementDao implements EntitlementDao {
     }
 
     @Override
-    public Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
-        return getBaseSubscription(factory, bundleId, true, context);
+    public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) {
+        return getBaseSubscription(bundleId, true, context);
     }
 
     @Override
-    public Subscription getSubscriptionFromId(final SubscriptionFactory factory, final UUID subscriptionId, final InternalTenantContext context) {
-        return buildSubscription(factory, getSubscriptionFromId(subscriptionId, context), context);
-    }
-
-    private Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
-        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Subscription>() {
+    public Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
+        final Subscription shellSubscription = transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Subscription>() {
             @Override
             public Subscription inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
                 final SubscriptionModelDao model = entitySqlDaoWrapperFactory.become(SubscriptionSqlDao.class).getById(subscriptionId.toString(), context);
                 return SubscriptionModelDao.toSubscription(model);
             }
         });
+        return buildSubscription(shellSubscription, context);
     }
 
 
     @Override
-    public List<Subscription> getSubscriptions(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
-        return buildBundleSubscriptions(bundleId, factory, getSubscriptionFromBundleId(bundleId, context), context);
+    public List<Subscription> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
+        return buildBundleSubscriptions(bundleId, getSubscriptionFromBundleId(bundleId, context), context);
     }
 
     private List<Subscription> getSubscriptionFromBundleId(final UUID bundleId, final InternalTenantContext context) {
@@ -248,7 +250,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
 
 
     @Override
-    public List<Subscription> getSubscriptionsForAccountAndKey(final SubscriptionFactory factory, final UUID accountId,
+    public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId,
                                                                final String bundleKey, final InternalTenantContext context) {
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<Subscription>>() {
             @Override
@@ -257,7 +259,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
                 if (bundleModel == null) {
                     return Collections.emptyList();
                 }
-                return getSubscriptions(factory, bundleModel.getId(), context);
+                return getSubscriptions(bundleModel.getId(), context);
             }
         });
     }
@@ -389,10 +391,10 @@ public class DefaultEntitlementDao implements EntitlementDao {
                 final EntitlementEventSqlDao eventsDaoFromSameTransaction = entitySqlDaoWrapperFactory.become(EntitlementEventSqlDao.class);
                 for (final EntitlementEvent cur : initialEvents) {
                     eventsDaoFromSameTransaction.create(new EntitlementEventModelDao(cur), context);
-                    recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                            cur.getEffectiveDate(),
-                                                            new EntitlementNotificationKey(cur.getId()),
-                                                            context);
+
+                    final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+                    recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
+
                 }
                 // Notify the Bus of the latest requested change, if needed
                 if (initialEvents.size() > 0) {
@@ -403,6 +405,8 @@ public class DefaultEntitlementDao implements EntitlementDao {
         });
     }
 
+
+
     @Override
     public void recreateSubscription(final SubscriptionData subscription, final List<EntitlementEvent> recreateEvents, final InternalCallContext context) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -412,11 +416,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
 
                 for (final EntitlementEvent cur : recreateEvents) {
                     transactional.create(new EntitlementEventModelDao(cur), context);
-                    recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                            cur.getEffectiveDate(),
-                                                            new EntitlementNotificationKey(cur.getId()),
-                                                            context);
 
+                    final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+                    recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
                 }
 
                 // Notify the Bus of the latest requested change
@@ -428,6 +430,24 @@ public class DefaultEntitlementDao implements EntitlementDao {
     }
 
     @Override
+    public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+
+        transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+            @Override
+            public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                for (int i = 0; i < subscriptions.size(); i++) {
+                    final SubscriptionData subscription = subscriptions.get(i);
+                    final EntitlementEvent cancelEvent = cancelEvents.get(i);
+                    cancelSubscriptionFromTransaction(subscription, cancelEvent, entitySqlDaoWrapperFactory, context, i);
+                }
+                return null;
+            }
+        });
+    }
+
+
+
+    @Override
     public void cancelSubscription(final SubscriptionData subscription, final EntitlementEvent cancelEvent, final InternalCallContext context, final int seqId) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
             @Override
@@ -498,10 +518,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
                 for (final EntitlementEvent cur : changeEventsTweakedWithMigrateBilling) {
 
                     transactional.create(new EntitlementEventModelDao(cur), context);
-                    recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                            cur.getEffectiveDate(),
-                                                            new EntitlementNotificationKey(cur.getId()),
-                                                            context);
+
+                    final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+                    recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
                 }
 
                 // Notify the Bus of the latest requested change
@@ -608,10 +627,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
         final UUID subscriptionId = subscription.getId();
         cancelFutureEventsFromTransaction(subscriptionId, entitySqlDaoWrapperFactory, context);
         entitySqlDaoWrapperFactory.become(EntitlementEventSqlDao.class).create(new EntitlementEventModelDao(cancelEvent), context);
-        recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                cancelEvent.getEffectiveDate(),
-                                                new EntitlementNotificationKey(cancelEvent.getId(), seqId),
-                                                context);
+
+        final boolean isBusEvent = cancelEvent.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0;
+        recordBusOrFutureNotificationFromTransaction(subscription, cancelEvent, entitySqlDaoWrapperFactory, isBusEvent, seqId, context);
 
         // Notify the Bus of the requested change
         notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, cancelEvent, context);
@@ -663,14 +681,14 @@ public class DefaultEntitlementDao implements EntitlementDao {
         }
     }
 
-    private Subscription buildSubscription(final SubscriptionFactory factory, final Subscription input, final InternalTenantContext context) {
+    private Subscription buildSubscription(final Subscription input, final InternalTenantContext context) {
         if (input == null) {
             return null;
         }
 
         final List<Subscription> bundleInput = new ArrayList<Subscription>();
         if (input.getCategory() == ProductCategory.ADD_ON) {
-            final Subscription baseSubscription = getBaseSubscription(factory, input.getBundleId(), false, context);
+            final Subscription baseSubscription = getBaseSubscription(input.getBundleId(), false, context);
             if (baseSubscription == null) {
                 return null;
             }
@@ -681,7 +699,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
             bundleInput.add(input);
         }
 
-        final List<Subscription> reloadedSubscriptions = buildBundleSubscriptions(input.getBundleId(), factory, bundleInput, context);
+        final List<Subscription> reloadedSubscriptions = buildBundleSubscriptions(input.getBundleId(), bundleInput, context);
         for (final Subscription cur : reloadedSubscriptions) {
             if (cur.getId().equals(input.getId())) {
                 return cur;
@@ -691,7 +709,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
         throw new EntitlementError("Unexpected code path in buildSubscription");
     }
 
-    private List<Subscription> buildBundleSubscriptions(final UUID bundleId, final SubscriptionFactory factory, final List<Subscription> input, final InternalTenantContext context) {
+    private List<Subscription> buildBundleSubscriptions(final UUID bundleId, final List<Subscription> input, final InternalTenantContext context) {
         if (input == null || input.size() == 0) {
             return Collections.emptyList();
         }
@@ -714,7 +732,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
         final List<Subscription> result = new ArrayList<Subscription>(input.size());
         for (final Subscription cur : input) {
             final List<EntitlementEvent> events = getEventsForSubscription(cur.getId(), context);
-            Subscription reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
+            Subscription reloaded = createSubscriptionForInternalUse(cur, events);
 
             switch (cur.getCategory()) {
                 case BASE:
@@ -752,7 +770,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
 
                         events.add(addOnCancelEvent);
                         // Finally reload subscription with full set of events
-                        reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
+                        reloaded = createSubscriptionForInternalUse(cur, events);
                     }
                     break;
                 default:
@@ -838,26 +856,58 @@ public class DefaultEntitlementDao implements EntitlementDao {
         });
     }
 
-    private Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final boolean rebuildSubscription, final InternalTenantContext context) {
+    private SubscriptionData createSubscriptionForInternalUse(final Subscription shellSubscription, final List<EntitlementEvent> events) {
+        final SubscriptionData result = new SubscriptionData(new SubscriptionBuilder(((SubscriptionData) shellSubscription)), null, clock);
+        if (events.size() > 0) {
+            result.rebuildTransitions(events, catalogService.getFullCatalog());
+        }
+        return result;
+    }
+
+    private Subscription getBaseSubscription(final UUID bundleId, final boolean rebuildSubscription, final InternalTenantContext context) {
         final List<Subscription> subscriptions = getSubscriptionFromBundleId(bundleId, context);
         for (final Subscription cur : subscriptions) {
             if (cur.getCategory() == ProductCategory.BASE) {
-                return rebuildSubscription ? buildSubscription(factory, cur, context) : cur;
+                return rebuildSubscription ? buildSubscription(cur, context) : cur;
             }
         }
         return null;
     }
 
-    private void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final DateTime effectiveDate,
-                                                         final NotificationKey notificationKey, final InternalCallContext context) {
+
+    //
+    // Either records a notfication or sends a bus event is operation is immediate
+    //
+    private void recordBusOrFutureNotificationFromTransaction(final SubscriptionData subscription, final EntitlementEvent event, final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final boolean busEvent,
+                                                              final int seqId, final InternalCallContext context) {
+        if (busEvent) {
+            notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, seqId, context);
+        } else {
+            recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
+                                                    event.getEffectiveDate(),
+                                                    new EntitlementNotificationKey(event.getId()),
+                                                    context);
+        }
+    }
+
+    //
+    // Sends bus notification for event on effecfive date-- only used for operation that happen immediately:
+    // - CREATE,
+    // - IMM CANCEL or CHANGE
+    //
+    private void notifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final SubscriptionData subscription,
+                                            final EntitlementEvent immediateEvent, final int seqId, final InternalCallContext context) {
         try {
-            final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
-                                                                                                           Engine.NOTIFICATION_QUEUE_NAME);
-            subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, null, notificationKey, context);
-        } catch (NoSuchNotificationQueue e) {
-            throw new RuntimeException(e);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+            final SubscriptionData upToDateSubscription = createSubscriptionWithNewEvent(subscription, immediateEvent);
+
+            final SubscriptionTransitionData transition = upToDateSubscription.getTransitionFromEvent(immediateEvent, seqId);
+            final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, upToDateSubscription.getAlignStartDate(),
+                                                                                                      context.getAccountRecordId(), context.getTenantRecordId());
+
+
+            eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory, context);
+        } catch (EventBusException e) {
+            log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
         }
     }
 
@@ -870,6 +920,18 @@ public class DefaultEntitlementDao implements EntitlementDao {
         }
     }
 
+    private void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final DateTime effectiveDate,
+                                                         final NotificationKey notificationKey, final InternalCallContext context) {
+        try {
+            final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+                                                                                                           Engine.NOTIFICATION_QUEUE_NAME);
+            subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, notificationKey, context);
+        } catch (NoSuchNotificationQueue e) {
+            throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
     private void migrateBundleDataFromTransaction(final BundleMigrationData bundleTransferData, final EntitlementEventSqlDao transactional,
                                                   final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final InternalCallContext context) throws EntityPersistenceException {
 
@@ -902,4 +964,20 @@ public class DefaultEntitlementDao implements EntitlementDao {
 
         transBundleDao.create(new SubscriptionBundleModelDao(bundleData), context);
     }
+
+    //
+    // Creates a copy of the existing subscriptions whose 'transitions' will reflect the new event
+    //
+    private SubscriptionData createSubscriptionWithNewEvent(final SubscriptionData subscription, EntitlementEvent newEvent) {
+
+        final SubscriptionData subscriptionWithNewEvent = new SubscriptionData(subscription, null, clock);
+        final List<EntitlementEvent> allEvents = new LinkedList<EntitlementEvent>();
+        if (subscriptionWithNewEvent.getEvents() != null) {
+            allEvents.addAll(subscriptionWithNewEvent.getEvents());
+        }
+        allEvents.add(newEvent);
+        subscriptionWithNewEvent.rebuildTransitions(allEvents, catalogService.getFullCatalog());
+        return subscriptionWithNewEvent;
+    }
+
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
index 1c17929..2b83ab2 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
@@ -20,7 +20,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import com.ning.billing.entitlement.api.SubscriptionFactory;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
@@ -46,17 +45,17 @@ public interface EntitlementDao {
 
     public SubscriptionBundle createSubscriptionBundle(SubscriptionBundleData bundle, InternalCallContext context);
 
-    public Subscription getSubscriptionFromId(SubscriptionFactory factory, UUID subscriptionId, InternalTenantContext context);
+    public Subscription getSubscriptionFromId(UUID subscriptionId, InternalTenantContext context);
 
     // ACCOUNT retrieval
     public UUID getAccountIdFromSubscriptionId(UUID subscriptionId, InternalTenantContext context);
 
     // Subscription retrieval
-    public Subscription getBaseSubscription(SubscriptionFactory factory, UUID bundleId, InternalTenantContext context);
+    public Subscription getBaseSubscription(UUID bundleId, InternalTenantContext context);
 
-    public List<Subscription> getSubscriptions(SubscriptionFactory factory, UUID bundleId, InternalTenantContext context);
+    public List<Subscription> getSubscriptions(UUID bundleId, InternalTenantContext context);
 
-    public List<Subscription> getSubscriptionsForAccountAndKey(SubscriptionFactory factory, UUID accountId, String bundleKey, InternalTenantContext context);
+    public List<Subscription> getSubscriptionsForAccountAndKey(UUID accountId, String bundleKey, InternalTenantContext context);
 
     // Update
     public void updateChargedThroughDate(SubscriptionData subscription, InternalCallContext context);
@@ -79,6 +78,8 @@ public interface EntitlementDao {
 
     public void cancelSubscription(SubscriptionData subscription, EntitlementEvent cancelEvent, InternalCallContext context, int cancelSeq);
 
+    public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context);
+
     public void uncancelSubscription(SubscriptionData subscription, List<EntitlementEvent> uncancelEvents, InternalCallContext context);
 
     public void changePlan(SubscriptionData subscription, List<EntitlementEvent> changeEvents, InternalCallContext context);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java
index 0f0e18d..b208f18 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java
@@ -21,8 +21,8 @@ import java.util.UUID;
 import org.joda.time.DateTime;
 
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.util.dao.TableName;
 import com.ning.billing.util.entity.EntityBase;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
index b4d3899..38695ac 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
@@ -27,7 +27,6 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
 
-import com.ning.billing.entitlement.api.SubscriptionFactory;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
@@ -176,6 +175,10 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
     }
 
     @Override
+    public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+    }
+
+    @Override
     public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents, final InternalCallContext context) {
         addEvents(subscription.getId(), changeEvents);
     }
@@ -228,7 +231,7 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
     }
 
     @Override
-    public Subscription getSubscriptionFromId(final SubscriptionFactory factory, final UUID subscriptionId, final InternalTenantContext context) {
+    public Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
         throw new EntitlementError(NOT_IMPLEMENTED);
     }
 
@@ -238,17 +241,17 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
     }
 
     @Override
-    public Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+    public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) {
         throw new EntitlementError(NOT_IMPLEMENTED);
     }
 
     @Override
-    public List<Subscription> getSubscriptions(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+    public List<Subscription> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
         throw new EntitlementError(NOT_IMPLEMENTED);
     }
 
     @Override
-    public List<Subscription> getSubscriptionsForAccountAndKey(final SubscriptionFactory factory, final UUID accountId,
+    public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId,
                                                                final String bundleKey, final InternalTenantContext context) {
         throw new EntitlementError(NOT_IMPLEMENTED);
     }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java b/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
index 459cae7..f3dc0b9 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
@@ -24,7 +24,6 @@ import com.ning.billing.entitlement.alignment.MigrationPlanAligner;
 import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.api.EntitlementService;
 import com.ning.billing.entitlement.api.SubscriptionApiService;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
 import com.ning.billing.entitlement.api.migration.DefaultEntitlementMigrationApi;
 import com.ning.billing.entitlement.api.migration.EntitlementMigrationApi;
 import com.ning.billing.entitlement.api.svcs.DefaultEntitlementInternalApi;
@@ -32,12 +31,10 @@ import com.ning.billing.entitlement.api.timeline.DefaultEntitlementTimelineApi;
 import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
 import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
 import com.ning.billing.entitlement.api.timeline.RepairSubscriptionApiService;
-import com.ning.billing.entitlement.api.timeline.RepairSubscriptionFactory;
 import com.ning.billing.entitlement.api.transfer.DefaultEntitlementTransferApi;
 import com.ning.billing.entitlement.api.transfer.EntitlementTransferApi;
 import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
 import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory;
 import com.ning.billing.entitlement.api.user.EntitlementUserApi;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.core.Engine;
@@ -68,9 +65,6 @@ public class DefaultEntitlementModule extends AbstractModule implements Entitlem
 
     protected void installEntitlementCore() {
 
-        bind(SubscriptionFactory.class).annotatedWith(Names.named(REPAIR_NAMED)).to(RepairSubscriptionFactory.class).asEagerSingleton();
-        bind(SubscriptionFactory.class).to(DefaultSubscriptionFactory.class).asEagerSingleton();
-
         bind(SubscriptionApiService.class).annotatedWith(Names.named(REPAIR_NAMED)).to(RepairSubscriptionApiService.class).asEagerSingleton();
         bind(SubscriptionApiService.class).to(DefaultSubscriptionApiService.class).asEagerSingleton();
 
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java b/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java
index 7a3ceaf..38c949f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java
@@ -33,8 +33,8 @@ import com.ning.billing.catalog.api.PhaseType;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.io.VersionedCatalogLoader;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.util.config.CatalogConfig;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
@@ -164,7 +164,7 @@ public class TestPlanAligner extends KillbillTestSuite {
     }
 
     private SubscriptionData createSubscriptionStartedInThePast(final String productName, final PhaseType phaseType) {
-        final DefaultSubscriptionFactory.SubscriptionBuilder builder = new DefaultSubscriptionFactory.SubscriptionBuilder();
+        final SubscriptionBuilder builder = new SubscriptionBuilder();
         builder.setBundleStartDate(clock.getUTCNow().minusHours(10));
         // Make sure to set the dates apart
         builder.setAlignStartDate(new DateTime(builder.getBundleStartDate().plusHours(5)));
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index 0b95f47..bd05062 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -275,6 +275,10 @@ public abstract class TestApiBase extends EntitlementTestSuiteWithEmbeddedDB imp
                                                                                                    new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSet, null),
                                                                                                    requestedDate == null ? clock.getUTCNow() : requestedDate, callContext);
         assertNotNull(subscription);
+
+
+        //try {Thread.sleep(100000000); } catch (Exception e) {};
+
         assertTrue(testListener.isCompleted(5000));
         return subscription;
     }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
index db3df23..8052a4f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
@@ -342,7 +342,7 @@ public class TestRepairBP extends TestApiBaseRepair {
 
         assertEquals(dryRunBaseSubscription.getActiveVersion(), SubscriptionEvents.INITIAL_VERSION);
         assertEquals(dryRunBaseSubscription.getBundleId(), bundle.getId());
-        assertEquals(dryRunBaseSubscription.getStartDate(), baseSubscription.getStartDate());
+        assertTrue(dryRunBaseSubscription.getStartDate().compareTo(baseSubscription.getStartDate()) == 0);
 
         Plan currentPlan = dryRunBaseSubscription.getCurrentPlan();
         assertNotNull(currentPlan);
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java
index 2f21fe2..cf792cc 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java
@@ -35,11 +35,11 @@ import com.ning.billing.catalog.api.PlanPhaseSpecifier;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.EntitlementTestSuite;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
 import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
 import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.ExistingEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.events.EntitlementEvent;
@@ -63,10 +63,10 @@ public class TestDefaultEntitlementTransferApi extends EntitlementTestSuite {
     public void setUp() throws Exception {
         final EntitlementDao dao = Mockito.mock(EntitlementDao.class);
         final CatalogService catalogService = new MockCatalogService(new MockCatalog());
-        final SubscriptionFactory subscriptionFactory = Mockito.mock(SubscriptionFactory.class);
+        final SubscriptionApiService apiService =  Mockito.mock(SubscriptionApiService.class);
         final EntitlementTimelineApi timelineApi = Mockito.mock(EntitlementTimelineApi.class);
         final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(IDBI.class), clock);
-        transferApi = new DefaultEntitlementTransferApi(clock, dao, timelineApi, catalogService, subscriptionFactory, internalCallContextFactory);
+        transferApi = new DefaultEntitlementTransferApi(clock, dao, timelineApi, catalogService, apiService, internalCallContextFactory);
     }
 
     @Test(groups = "fast")
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index b73c1e3..a7f3191 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -33,14 +33,13 @@ import org.slf4j.LoggerFactory;
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.catalog.api.TimeUnit;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
 import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
 import com.ning.billing.entitlement.api.transfer.TransferCancelData;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
@@ -141,10 +140,10 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     @Override
-    public Subscription getSubscriptionFromId(final SubscriptionFactory factory, final UUID subscriptionId, final InternalTenantContext context) {
+    public Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
         for (final Subscription cur : subscriptions) {
             if (cur.getId().equals(subscriptionId)) {
-                return buildSubscription(factory, (SubscriptionData) cur, context);
+                return buildSubscription((SubscriptionData) cur, context);
             }
         }
         return null;
@@ -156,11 +155,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     @Override
-    public List<Subscription> getSubscriptionsForAccountAndKey(final SubscriptionFactory factory, final UUID accountId, final String bundleKey, final InternalTenantContext context) {
+    public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId, final String bundleKey, final InternalTenantContext context) {
 
         for (final SubscriptionBundle cur : bundles) {
             if (cur.getExternalKey().equals(bundleKey) && cur.getAccountId().equals(bundleKey)) {
-                return getSubscriptions(factory, cur.getId(), context);
+                return getSubscriptions(cur.getId(), context);
             }
         }
         return Collections.emptyList();
@@ -175,7 +174,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
                 recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new EntitlementNotificationKey(cur.getId()), context);
             }
         }
-        final Subscription updatedSubscription = buildSubscription(null, subscription, context);
+        final Subscription updatedSubscription = buildSubscription(subscription, context);
         subscriptions.add(updatedSubscription);
     }
 
@@ -190,11 +189,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     @Override
-    public List<Subscription> getSubscriptions(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+    public List<Subscription> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
         final List<Subscription> results = new ArrayList<Subscription>();
         for (final Subscription cur : subscriptions) {
             if (cur.getBundleId().equals(bundleId)) {
-                results.add(buildSubscription(factory, (SubscriptionData) cur, context));
+                results.add(buildSubscription((SubscriptionData) cur, context));
             }
         }
         return results;
@@ -229,11 +228,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     @Override
-    public Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+    public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) {
         for (final Subscription cur : subscriptions) {
             if (cur.getBundleId().equals(bundleId) &&
                 cur.getCurrentPlan().getProduct().getCategory() == ProductCategory.BASE) {
-                return buildSubscription(factory, (SubscriptionData) cur, context);
+                return buildSubscription((SubscriptionData) cur, context);
             }
         }
         return null;
@@ -245,16 +244,13 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
         insertEvent(nextPhase, context);
     }
 
-    private Subscription buildSubscription(final SubscriptionFactory factory, final SubscriptionData in, final InternalTenantContext context) {
-        if (factory != null) {
-            return factory.createSubscription(new SubscriptionBuilder(in), getEventsForSubscription(in.getId(), context));
-        } else {
+    private Subscription buildSubscription(final SubscriptionData in, final InternalTenantContext context) {
             final SubscriptionData subscription = new SubscriptionData(new SubscriptionBuilder(in), null, clock);
             if (events.size() > 0) {
                 subscription.rebuildTransitions(getEventsForSubscription(in.getId(), context), catalogService.getFullCatalog());
             }
             return subscription;
-        }
+
     }
 
     @Override
@@ -284,6 +280,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     @Override
+    public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+        synchronized (events) {
+            for (int i = 0; i < subscriptions.size(); i++) {
+                cancelSubscription(subscriptions.get(i), cancelEvents.get(i), context, 0);
+            }
+        }
+    }
+
+    @Override
     public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents, final InternalCallContext context) {
         synchronized (events) {
             cancelNextChangeEvent(subscription.getId());
@@ -303,7 +308,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     private void cancelNextPhaseEvent(final UUID subscriptionId, final InternalTenantContext context) {
-        final Subscription curSubscription = getSubscriptionFromId(null, subscriptionId, context);
+        final Subscription curSubscription = getSubscriptionFromId(subscriptionId, context);
         if (curSubscription.getCurrentPhase() == null ||
             curSubscription.getCurrentPhase().getDuration().getUnit() == TimeUnit.UNLIMITED) {
             return;
@@ -412,7 +417,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
         try {
             final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
                                                                                                            Engine.NOTIFICATION_QUEUE_NAME);
-            subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, null, notificationKey, context);
+            subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey, context);
         } catch (NoSuchNotificationQueue e) {
             throw new RuntimeException(e);
         } catch (IOException e) {
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
index fff17a9..a580f01 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
@@ -17,6 +17,7 @@
 package com.ning.billing.entitlement.glue;
 
 import org.mockito.Mockito;
+import org.skife.config.ConfigurationObjectFactory;
 import org.skife.jdbi.v2.IDBI;
 
 import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
@@ -28,6 +29,7 @@ import com.ning.billing.util.callcontext.MockCallContextSqlDao;
 import com.ning.billing.util.glue.BusModule;
 import com.ning.billing.util.glue.BusModule.BusType;
 import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 
 import com.google.inject.name.Names;
@@ -44,6 +46,12 @@ public class MockEngineModuleMemory extends MockEngineModule {
 
     private void installNotificationQueue() {
         bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
+        configureNotificationQueueConfig();
+    }
+
+    protected void configureNotificationQueueConfig() {
+        final NotificationQueueConfig config = new ConfigurationObjectFactory(System.getProperties()).build(NotificationQueueConfig.class);
+        bind(NotificationQueueConfig.class).toInstance(config);
     }
 
     protected void installDBI() {
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index fa5da08..3f43198 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -305,6 +305,11 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
         return generator.generateInvoice(account, invoice, manualPay);
     }
 
+    @Override
+    public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final CallContext context) {
+        dao.consumeExstingCBAOnAccountWithUnpaidInvoices(accountId, internalCallContextFactory.createInternalCallContext(accountId, context));
+    }
+
     private void notifyBusOfInvoiceAdjustment(final UUID invoiceId, final UUID accountId, final InternalCallContext context) {
         try {
             eventBus.post(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId()), context);
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index e928a24..f5aa44e 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -203,7 +203,7 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
                     // Now we check whether we generated any credit that could be used on some unpaid invoices
                     useExistingCBAFromTransaction(invoice.getAccountId(), entitySqlDaoWrapperFactory, context);
 
-                    notifyOfFutureBillingEvents(entitySqlDaoWrapperFactory, invoice.getAccountId(), callbackDateTimePerSubscriptions);
+                    notifyOfFutureBillingEvents(entitySqlDaoWrapperFactory, invoice.getAccountId(), callbackDateTimePerSubscriptions, context.getUserToken());
 
                     // Create associated payments
                     final InvoicePaymentSqlDao invoicePaymentSqlDao = entitySqlDaoWrapperFactory.become(InvoicePaymentSqlDao.class);
@@ -791,6 +791,16 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
         });
     }
 
+    public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final InternalCallContext context) {
+        transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+            @Override
+            public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                useExistingCBAFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
+                return null;
+            }
+        });
+    }
+
     private void useExistingCBAFromTransaction(final UUID accountId, final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final InternalCallContext context) throws InvoiceApiException, EntityPersistenceException {
 
         final BigDecimal accountCBA = getAccountCBAFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
@@ -983,10 +993,11 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
         invoice.addPayments(invoicePayments);
     }
 
-    private void notifyOfFutureBillingEvents(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId, final Map<UUID, DateTime> callbackDateTimePerSubscriptions) {
+    private void notifyOfFutureBillingEvents(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId,
+                                             final Map<UUID, DateTime> callbackDateTimePerSubscriptions, final UUID userToken) {
         for (final UUID subscriptionId : callbackDateTimePerSubscriptions.keySet()) {
             final DateTime callbackDateTimeUTC = callbackDateTimePerSubscriptions.get(subscriptionId);
-            nextBillingDatePoster.insertNextBillingNotification(entitySqlDaoWrapperFactory, accountId, subscriptionId, callbackDateTimeUTC);
+            nextBillingDatePoster.insertNextBillingNotification(entitySqlDaoWrapperFactory, accountId, subscriptionId, callbackDateTimeUTC, userToken);
         }
     }
 
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java
index dbaa474..3e76427 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java
@@ -163,4 +163,11 @@ public interface InvoiceDao {
     void deleteCBA(UUID accountId, UUID invoiceId, UUID invoiceItemId, InternalCallContext context) throws InvoiceApiException;
 
     void notifyOfPayment(InvoicePaymentModelDao invoicePayment, InternalCallContext context);
+
+    /**
+     *
+     * @param accountId the account for which we need to rebalance the CBA
+     * @param context the callContext
+     */
+    public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final InternalCallContext context);
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java b/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java
index 25bb819..74914d7 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java
@@ -73,9 +73,9 @@ public class InvoiceListener {
         }
     }
 
-    public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+    public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
         try {
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
             dispatcher.processSubscription(subscriptionId, eventDateTime, context);
         } catch (InvoiceApiException e) {
             log.error(e.getMessage());
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 6e3a8d7..464376d 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -68,21 +68,10 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
 
     @Override
     public void initialize() throws NotificationQueueAlreadyExists {
-        final NotificationConfig notificationConfig = new NotificationConfig() {
-            @Override
-            public long getSleepTimeMs() {
-                return config.getSleepTimeMs();
-            }
-
-            @Override
-            public boolean isNotificationProcessingOff() {
-                return config.isNotificationProcessingOff();
-            }
-        };
 
         final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
             @Override
-            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final Long accountRecordId, final Long tenantRecordId) {
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                 try {
                     if (!(notificationKey instanceof NextBillingDateNotificationKey)) {
                         log.error("Invoice service received an unexpected event type {}", notificationKey.getClass().getName());
@@ -95,7 +84,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
                         if (subscription == null) {
                             log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")");
                         } else {
-                            processEvent(key.getUuidKey(), eventDate, accountRecordId, tenantRecordId);
+                            processEvent(key.getUuidKey(), eventDate, userToken, accountRecordId, tenantRecordId);
                         }
                     } catch (EntitlementUserApiException e) {
                         log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")", e);
@@ -108,8 +97,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
 
         nextBillingQueue = notificationQueueService.createNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
                                                                             NEXT_BILLING_DATE_NOTIFIER_QUEUE,
-                                                                            notificationQueueHandler,
-                                                                            notificationConfig);
+                                                                            notificationQueueHandler);
     }
 
     @Override
@@ -125,7 +113,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
         }
     }
 
-    private void processEvent(final UUID subscriptionId, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
-        listener.handleNextBillingDateEvent(subscriptionId, eventDateTime, accountRecordId, tenantRecordId);
+    private void processEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+        listener.handleNextBillingDateEvent(subscriptionId, eventDateTime, userToken, accountRecordId, tenantRecordId);
     }
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
index f923aff..c17af59 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
@@ -52,8 +52,8 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
 
     @Override
     public void insertNextBillingNotification(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId,
-                                              final UUID subscriptionId, final DateTime futureNotificationTime) {
-        final InternalCallContext context = createCallContext(accountId);
+                                              final UUID subscriptionId, final DateTime futureNotificationTime, final UUID userToken) {
+        final InternalCallContext context = createCallContext(accountId, userToken);
 
         final NotificationQueue nextBillingQueue;
         try {
@@ -61,7 +61,7 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
                                                                              DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
             log.info("Queuing next billing date notification at {} for subscriptionId {}", futureNotificationTime.toString(), subscriptionId.toString());
 
-            nextBillingQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationTime, accountId,
+            nextBillingQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationTime,
                                                                      new NextBillingDateNotificationKey(subscriptionId), context);
         } catch (NoSuchNotificationQueue e) {
             log.error("Attempting to put items on a non-existent queue (NextBillingDateNotifier).", e);
@@ -70,7 +70,7 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
         }
     }
 
-    private InternalCallContext createCallContext(final UUID accountId) {
-        return internalCallContextFactory.createInternalCallContext(accountId, "NextBillingDatePoster", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+    private InternalCallContext createCallContext(final UUID accountId, final UUID userToken) {
+        return internalCallContextFactory.createInternalCallContext(accountId, "NextBillingDatePoster", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
     }
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
index 95955f7..1a8da73 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
@@ -25,7 +25,6 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 
 public interface NextBillingDatePoster {
 
-    void insertNextBillingNotification(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, UUID accountId,
-                                       UUID subscriptionId, DateTime futureNotificationTime);
-
+    void insertNextBillingNotification(EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, UUID accountId,
+                                       UUID subscriptionId, DateTime futureNotificationTime, UUID userToken);
 }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/api/migration/InvoiceApiTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/api/migration/InvoiceApiTestBase.java
index d4ee2e1..43afee2 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/api/migration/InvoiceApiTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/api/migration/InvoiceApiTestBase.java
@@ -23,6 +23,8 @@ import java.util.UUID;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
@@ -69,6 +71,8 @@ import com.google.inject.Inject;
 @Guice(modules = {MockModuleNoEntitlement.class})
 public abstract class InvoiceApiTestBase extends InvoicingTestBase {
 
+    private static final Logger log = LoggerFactory.getLogger(InvoiceApiTestBase.class);
+
     protected static final Currency accountCurrency = Currency.USD;
 
     @Inject
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
index 2949653..7559994 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
@@ -66,16 +66,6 @@ public class InvoiceDaoTestBase extends InvoicingTestBase {
 
     private final InvoiceConfig invoiceConfig = new InvoiceConfig() {
         @Override
-        public long getSleepTimeMs() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public boolean isNotificationProcessingOff() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
         public int getNumberOfMonthsInFuture() {
             return 36;
         }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
index 6861d13..9371dce 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
@@ -182,6 +182,10 @@ public class MockInvoiceDao implements InvoiceDao {
     }
 
     @Override
+    public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final InternalCallContext context) {
+    }
+
+    @Override
     public BigDecimal getAccountBalance(final UUID accountId, final InternalTenantContext context) {
         BigDecimal balance = BigDecimal.ZERO;
 
diff --git a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java
index 5864d95..9c4087a 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java
@@ -82,11 +82,6 @@ public class TestDefaultInvoiceGenerator extends InvoicingTestBase {
         final Clock clock = new DefaultClock();
         final InvoiceConfig invoiceConfig = new InvoiceConfig() {
             @Override
-            public long getSleepTimeMs() {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
             public int getNumberOfMonthsInFuture() {
                 return 36;
             }
@@ -95,11 +90,6 @@ public class TestDefaultInvoiceGenerator extends InvoicingTestBase {
             public boolean isEmailNotificationsEnabled() {
                 return false;
             }
-
-            @Override
-            public boolean isNotificationProcessingOff() {
-                throw new UnsupportedOperationException();
-            }
         };
         this.generator = new DefaultInvoiceGenerator(clock, invoiceConfig);
     }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java
index 1fd99d3..c5c7168 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java
@@ -67,21 +67,11 @@ public class TestDefaultInvoiceGeneratorUnit extends InvoicingTestBase {
         clock = new ClockMock();
         gen = new TestDefaultInvoiceGeneratorMock(clock, new InvoiceConfig() {
             @Override
-            public boolean isNotificationProcessingOff() {
-                return false;
-            }
-
-            @Override
             public boolean isEmailNotificationsEnabled() {
                 return false;
             }
 
             @Override
-            public long getSleepTimeMs() {
-                return 100;
-            }
-
-            @Override
             public int getNumberOfMonthsInFuture() {
                 return 5;
             }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java b/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java
index 6b77d80..54091f5 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java
@@ -26,6 +26,7 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 public class MockNextBillingDatePoster implements NextBillingDatePoster {
 
     @Override
-    public void insertNextBillingNotification(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId, final UUID subscriptionId, final DateTime futureNotificationTime) {
+    public void insertNextBillingNotification(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId,
+                                              final UUID subscriptionId, final DateTime futureNotificationTime, final UUID userToken) {
     }
 }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
index 4ea0e9d..15d08b6 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
@@ -92,8 +92,7 @@ public class TestNextBillingDateNotifier extends InvoiceTestSuiteWithEmbeddedDB 
         }
 
         @Override
-        public void handleNextBillingDateEvent(final UUID subscriptionId,
-                                               final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+        public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
             eventCount++;
             latestSubscriptionId = subscriptionId;
         }
@@ -182,7 +181,7 @@ public class TestNextBillingDateNotifier extends InvoiceTestSuiteWithEmbeddedDB 
         entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
             @Override
             public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
-                poster.insertNextBillingNotification(entitySqlDaoWrapperFactory, accountId, subscriptionId, readyTime);
+                poster.insertNextBillingNotification(entitySqlDaoWrapperFactory, accountId, subscriptionId, readyTime, UUID.randomUUID());
                 return null;
             }
         });
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
index 0162c8f..f07276d 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
@@ -332,6 +332,27 @@ public class AccountResource extends JaxRsResourceBase {
         return Response.status(Status.OK).build();
     }
 
+
+    /*
+     * ************************** INVOICE CBA REBALANCING ********************************
+     */
+    @POST
+    @Path("/{accountId:" + UUID_PATTERN + "}/" + CBA_REBALANCING)
+    @Consumes(APPLICATION_JSON)
+    @Produces(APPLICATION_JSON)
+    public Response rebalanceExistingCBAOnAccount(@PathParam("accountId") final String accountIdString,
+                                   @HeaderParam(HDR_CREATED_BY) final String createdBy,
+                                  @HeaderParam(HDR_REASON) final String reason,
+                                  @HeaderParam(HDR_COMMENT) final String comment,
+                                  @javax.ws.rs.core.Context final HttpServletRequest request) throws AccountApiException {
+
+        final CallContext callContext = context.createContext(createdBy, reason, comment, request);
+        final UUID accountId = UUID.fromString(accountIdString);
+
+        invoiceApi.consumeExstingCBAOnAccountWithUnpaidInvoices(accountId, callContext);
+        return Response.status(Status.OK).build();
+    }
+
     /*
      * ************************** PAYMENTS ********************************
      */
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
index 8384cf9..7d0ec23 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
@@ -132,4 +132,7 @@ public interface JaxrsResource {
 
     public static final String EXPORT = "export";
     public static final String EXPORT_PATH = PREFIX + "/" + EXPORT;
+
+    public static final String CBA_REBALANCING = "cbaRebalancing";
+
 }

meter/pom.xml 3(+2 -1)

diff --git a/meter/pom.xml b/meter/pom.xml
index e374441..395f26e 100644
--- a/meter/pom.xml
+++ b/meter/pom.xml
@@ -8,7 +8,8 @@
     OR CONDITIONS OF ANY KIND, either express or implied. See the ~ License for
     the specific language governing permissions and limitations ~ under the License. -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>com.ning.billing</groupId>
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
index 39b7ef4..d107468 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -16,15 +16,16 @@
 
 package com.ning.billing.ovedue.notification;
 
+import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.overdue.OverdueProperties;
 import com.ning.billing.overdue.listener.OverdueListener;
 import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService;
@@ -55,21 +56,9 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
 
     @Override
     public void initialize() {
-        final NotificationConfig notificationConfig = new NotificationConfig() {
-            @Override
-            public boolean isNotificationProcessingOff() {
-                return config.isNotificationProcessingOff();
-            }
-
-            @Override
-            public long getSleepTimeMs() {
-                return config.getSleepTimeMs();
-            }
-        };
-
         final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
             @Override
-            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final Long accountRecordId, final Long tenantRecordId) {
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                 try {
                     if (!(notificationKey instanceof OverdueCheckNotificationKey)) {
                         log.error("Overdue service received Unexpected notificationKey {}", notificationKey.getClass().getName());
@@ -77,7 +66,7 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
                     }
 
                     final OverdueCheckNotificationKey key = (OverdueCheckNotificationKey) notificationKey;
-                    listener.handleNextOverdueCheck(key, accountRecordId, tenantRecordId);
+                    listener.handleNextOverdueCheck(key, userToken, accountRecordId, tenantRecordId);
                 } catch (IllegalArgumentException e) {
                     log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
                 }
@@ -88,8 +77,7 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
         try {
             overdueQueue = notificationQueueService.createNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
                                                                             OVERDUE_CHECK_NOTIFIER_QUEUE,
-                                                                            notificationQueueHandler,
-                                                                            notificationConfig);
+                                                                            notificationQueueHandler);
         } catch (NotificationQueueAlreadyExists e) {
             throw new RuntimeException(e);
         }
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
index ed401f1..2cb149e 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -52,7 +52,7 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
                                                                               DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
             log.info("Queuing overdue check notification. id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
 
-            checkOverdueQueue.recordFutureNotification(futureNotificationTime, null, new OverdueCheckNotificationKey(overdueable.getId(), Blockable.Type.get(overdueable)), context);
+            checkOverdueQueue.recordFutureNotification(futureNotificationTime, new OverdueCheckNotificationKey(overdueable.getId(), Blockable.Type.get(overdueable)), context);
         } catch (NoSuchNotificationQueue e) {
             log.error("Attempting to put items on a non-existent queue (DefaultOverdueCheck).", e);
         } catch (IOException e) {
diff --git a/overdue/src/main/java/com/ning/billing/overdue/listener/OverdueListener.java b/overdue/src/main/java/com/ning/billing/overdue/listener/OverdueListener.java
index 86af3de..1733da5 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/listener/OverdueListener.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/listener/OverdueListener.java
@@ -67,10 +67,10 @@ public class OverdueListener {
         dispatcher.processOverdueForAccount(accountId, createCallContext(event.getUserToken(), event.getAccountRecordId(), event.getTenantRecordId()));
     }
 
-    public void handleNextOverdueCheck(final OverdueCheckNotificationKey notificationKey, final Long accountRecordId, final Long tenantRecordId) {
+    public void handleNextOverdueCheck(final OverdueCheckNotificationKey notificationKey, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
         log.info(String.format("Received OD checkup notification for type = %s, id = %s",
                 notificationKey.getType(), notificationKey.getUuidKey()));
-        dispatcher.processOverdue(notificationKey.getType(), notificationKey.getUuidKey(), createCallContext(null, accountRecordId, tenantRecordId));
+        dispatcher.processOverdue(notificationKey.getType(), notificationKey.getUuidKey(), createCallContext(userToken, accountRecordId, tenantRecordId));
     }
 
     private InternalCallContext createCallContext(final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
diff --git a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
index 084e6c6..ff3a672 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
@@ -64,7 +64,7 @@ import com.ning.billing.util.email.templates.TemplateModule;
 import com.ning.billing.util.globallocker.GlobalLocker;
 import com.ning.billing.util.globallocker.MySqlGlobalLocker;
 import com.ning.billing.util.glue.BusModule;
-import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.glue.NotificationQueueModule;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
@@ -80,6 +80,7 @@ import static com.jayway.awaitility.Awaitility.await;
 import static java.util.concurrent.TimeUnit.MINUTES;
 
 public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
+
     private Clock clock;
     private DefaultOverdueCheckNotifier notifier;
 
@@ -88,6 +89,7 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
     private NotificationQueueService notificationQueueService;
 
     private static final class OverdueListenerMock extends OverdueListener {
+
         int eventCount = 0;
         UUID latestSubscriptionId = null;
 
@@ -96,7 +98,7 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
         }
 
         @Override
-        public void handleNextOverdueCheck(final OverdueCheckNotificationKey key, final Long accountRecordId, final Long tenantRecordId) {
+        public void handleNextOverdueCheck(final OverdueCheckNotificationKey key, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
             eventCount++;
             latestSubscriptionId = key.getUuidKey();
         }
@@ -118,7 +120,6 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
                 super.configure();
                 bind(Clock.class).to(ClockMock.class).asEagerSingleton();
                 bind(CallContextFactory.class).to(DefaultCallContextFactory.class).asEagerSingleton();
-                bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
                 final InvoiceConfig invoiceConfig = new ConfigurationObjectFactory(System.getProperties()).build(InvoiceConfig.class);
                 bind(InvoiceConfig.class).toInstance(invoiceConfig);
                 final CatalogConfig catalogConfig = new ConfigurationObjectFactory(System.getProperties()).build(CatalogConfig.class);
@@ -132,7 +133,7 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
                 install(new MockJunctionModule());
                 install(new EmailModule());
                 install(new TemplateModule());
-
+                install(new NotificationQueueModule());
                 final AccountInternalApi accountApi = Mockito.mock(AccountInternalApi.class);
                 bind(AccountInternalApi.class).toInstance(accountApi);
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
index b45819d..5ffb060 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
@@ -39,7 +39,7 @@ public class AutoPayRetryService extends BaseRetryService implements RetryServic
                                final PaymentConfig config,
                                final PaymentProcessor paymentProcessor,
                                final InternalCallContextFactory internalCallContextFactory) {
-        super(notificationQueueService, config, internalCallContextFactory);
+        super(notificationQueueService, internalCallContextFactory);
         this.paymentProcessor = paymentProcessor;
     }
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
index 2828c63..c2cda2d 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
@@ -47,16 +47,13 @@ public abstract class BaseRetryService implements RetryService {
     private static final String PAYMENT_RETRY_SERVICE = "PaymentRetryService";
 
     private final NotificationQueueService notificationQueueService;
-    private final PaymentConfig config;
     private final InternalCallContextFactory internalCallContextFactory;
 
     private NotificationQueue retryQueue;
 
     public BaseRetryService(final NotificationQueueService notificationQueueService,
-                            final PaymentConfig config,
                             final InternalCallContextFactory internalCallContextFactory) {
         this.notificationQueueService = notificationQueueService;
-        this.config = config;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
@@ -66,17 +63,16 @@ public abstract class BaseRetryService implements RetryService {
                                                                       getQueueName(),
                                                                       new NotificationQueueHandler() {
                                                                           @Override
-                                                                          public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+                                                                          public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                                                                               if (!(notificationKey instanceof PaymentRetryNotificationKey)) {
                                                                                   log.error("Payment service got an unexpected notification type {}", notificationKey.getClass().getName());
                                                                                   return;
                                                                               }
                                                                               final PaymentRetryNotificationKey key = (PaymentRetryNotificationKey) notificationKey;
-                                                                              final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, null);
+                                                                              final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
                                                                               retry(key.getUuidKey(), callContext);
                                                                           }
-                                                                      },
-                                                                      config);
+                                                                      });
     }
 
     @Override
@@ -141,9 +137,9 @@ public abstract class BaseRetryService implements RetryService {
                 final NotificationKey key = new PaymentRetryNotificationKey(paymentId);
                 if (retryQueue != null) {
                     if (transactionalDao == null) {
-                        retryQueue.recordFutureNotification(timeOfRetry, null, key, context);
+                        retryQueue.recordFutureNotification(timeOfRetry, key, context);
                     } else {
-                        retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, null, key, context);
+                        retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, key, context);
                     }
                 }
             } catch (NoSuchNotificationQueue e) {
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
index 250987b..c9a80a1 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
@@ -45,7 +45,7 @@ public class FailedPaymentRetryService extends BaseRetryService implements Retry
                                      final PaymentConfig config,
                                      final PaymentProcessor paymentProcessor,
                                      final InternalCallContextFactory internalCallContextFactory) {
-        super(notificationQueueService, config, internalCallContextFactory);
+        super(notificationQueueService, internalCallContextFactory);
         this.paymentProcessor = paymentProcessor;
     }
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
index 9217abd..2e76ea2 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
@@ -43,10 +43,9 @@ public class PluginFailureRetryService extends BaseRetryService implements Retry
 
     @Inject
     public PluginFailureRetryService(final NotificationQueueService notificationQueueService,
-                                     final PaymentConfig config,
                                      final PaymentProcessor paymentProcessor,
                                      final InternalCallContextFactory internalCallContextFactory) {
-        super(notificationQueueService, config, internalCallContextFactory);
+        super(notificationQueueService, internalCallContextFactory);
         this.paymentProcessor = paymentProcessor;
     }
 
diff --git a/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java b/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java
index a01dda5..0da7d2b 100644
--- a/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java
+++ b/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java
@@ -16,8 +16,6 @@
 
 package com.ning.billing.payment.glue;
 
-import static org.testng.Assert.assertNotNull;
-
 import java.io.IOException;
 import java.net.URL;
 import java.util.Properties;
@@ -28,7 +26,6 @@ import org.skife.config.SimplePropertyConfigSource;
 import org.skife.jdbi.v2.IDBI;
 
 import com.ning.billing.ObjectType;
-import com.ning.billing.util.config.PaymentConfig;
 import com.ning.billing.mock.glue.MockInvoiceModule;
 import com.ning.billing.mock.glue.MockNotificationQueueModule;
 import com.ning.billing.payment.dao.MockPaymentDao;
@@ -37,6 +34,7 @@ import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
 import com.ning.billing.util.callcontext.CallContextSqlDao;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 import com.ning.billing.util.callcontext.MockCallContextSqlDao;
+import com.ning.billing.util.config.PaymentConfig;
 import com.ning.billing.util.globallocker.GlobalLocker;
 import com.ning.billing.util.globallocker.MockGlobalLocker;
 import com.ning.billing.util.glue.BusModule;
@@ -46,9 +44,11 @@ import com.ning.billing.util.svcapi.tag.TagInternalApi;
 import com.ning.billing.util.tag.Tag;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+
+import static org.testng.Assert.assertNotNull;
 
 public class PaymentTestModuleWithMocks extends PaymentModule {
+
     public static final String PLUGIN_TEST_NAME = "my-mock";
 
     private void loadSystemPropertiesFromClasspath(final String resource) {
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
index c77521d..98be956 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.util.bus.dao;
 
+import java.util.UUID;
+
 import org.joda.time.DateTime;
 
 import com.ning.billing.util.queue.PersistentQueueEntryLifecycle;
@@ -29,12 +31,13 @@ public class BusEventEntry implements PersistentQueueEntryLifecycle {
     private final PersistentQueueEntryLifecycleState processingState;
     private final String busEventClass;
     private final String busEventJson;
+    private final UUID userToken;
     private final Long accountRecordId;
     private final Long tenantRecordId;
 
     public BusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable,
                          final PersistentQueueEntryLifecycleState processingState, final String busEventClass, final String busEventJson,
-                         final Long accountRecordId, final Long tenantRecordId) {
+                         final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
         this.id = id;
         this.createdOwner = createdOwner;
         this.owner = owner;
@@ -42,13 +45,14 @@ public class BusEventEntry implements PersistentQueueEntryLifecycle {
         this.processingState = processingState;
         this.busEventClass = busEventClass;
         this.busEventJson = busEventJson;
+        this.userToken = userToken;
         this.accountRecordId = accountRecordId;
         this.tenantRecordId = tenantRecordId;
     }
 
     public BusEventEntry(final String createdOwner, final String busEventClass, final String busEventJson,
-                         final Long accountRecordId, final Long tenantRecordId) {
-        this(0, createdOwner, null, null, null, busEventClass, busEventJson, accountRecordId, tenantRecordId);
+                         final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+        this(0, createdOwner, null, null, null, busEventClass, busEventJson, userToken, accountRecordId, tenantRecordId);
     }
 
     public long getId() {
@@ -64,6 +68,11 @@ public class BusEventEntry implements PersistentQueueEntryLifecycle {
     }
 
     @Override
+    public UUID getUserToken() {
+        return userToken;
+    }
+
+    @Override
     public String getOwner() {
         return owner;
     }
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
index 0f2ad73..3203c11 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.bus.dao;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Date;
+import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.SQLStatement;
@@ -82,6 +83,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
         public void bind(@SuppressWarnings("rawtypes") final SQLStatement stmt, final Bind bind, final BusEventEntry evt) {
             stmt.bind("className", evt.getBusEventClass());
             stmt.bind("eventJson", evt.getBusEventJson());
+            stmt.bind("userToken", getUUIDString(evt.getUserToken()));
             stmt.bind("createdDate", getDate(new DateTime()));
             stmt.bind("creatingOwner", evt.getCreatedOwner());
             stmt.bind("processingAvailableDate", getDate(evt.getNextAvailableDate()));
@@ -100,6 +102,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
             final String className = r.getString("class_name");
             final String createdOwner = r.getString("creating_owner");
             final String eventJson = r.getString("event_json");
+            final UUID userToken = getUUID(r, "user_token");
             final DateTime nextAvailableDate = getDateTime(r, "processing_available_date");
             final String processingOwner = r.getString("processing_owner");
             final PersistentQueueEntryLifecycleState processingState = PersistentQueueEntryLifecycleState.valueOf(r.getString("processing_state"));
@@ -107,7 +110,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
             final Long tenantRecordId = r.getLong("tenant_record_id");
 
             return new BusEventEntry(recordId, createdOwner, processingOwner, nextAvailableDate, processingState, className,
-                                     eventJson, accountRecordId, tenantRecordId);
+                                     eventJson, userToken, accountRecordId, tenantRecordId);
         }
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
index 859fce8..f2fa6ab 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ThreadFactory;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +59,8 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
     private final String hostname;
     private final InternalCallContextFactory internalCallContextFactory;
 
+    private volatile boolean isStarted;
+
     private static final class EventBusDelegate extends EventBus {
 
         public EventBusDelegate(final String busName) {
@@ -96,16 +97,19 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
         this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
         this.hostname = Hostname.get();
         this.internalCallContextFactory = internalCallContextFactory;
+        this.isStarted = false;
     }
 
     @Override
     public void start() {
         startQueue();
+        isStarted = true;
     }
 
     @Override
     public void stop() {
         stopQueue();
+        isStarted = false;
     }
 
     @Override
@@ -131,6 +135,11 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
         return result;
     }
 
+    @Override
+    public boolean isStarted() {
+        return isStarted;
+    }
+
     private List<BusEventEntry> getNextBusEvent(final InternalCallContext context) {
         final Date now = clock.getUTCNow().toDate();
         final Date nextAvailable = clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
@@ -182,16 +191,15 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
     private void postFromTransaction(final BusInternalEvent event, final InternalCallContext context, final PersistentBusSqlDao transactional) {
         try {
             final String json = objectMapper.writeValueAsString(event);
-            final BusEventEntry entry = new BusEventEntry(hostname, event.getClass().getName(), json, context.getAccountRecordId(), context.getTenantRecordId());
+            final BusEventEntry entry = new BusEventEntry(hostname, event.getClass().getName(), json, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId());
             transactional.insertBusEvent(entry, context);
         } catch (Exception e) {
             log.error("Failed to post BusEvent " + event, e);
         }
     }
 
-
     private String tweakJsonToIncludeAccountAndTenantRecordId(final String input, final Long accountRecordId, final Long tenantRecordId) {
-        int lastIndexPriorFinalBracket = input.lastIndexOf("}");
+        final int lastIndexPriorFinalBracket = input.lastIndexOf("}");
         final StringBuilder tmp = new StringBuilder(input.substring(0, lastIndexPriorFinalBracket));
         tmp.append(",\"accountRecordId\":");
         tmp.append(accountRecordId);
@@ -200,5 +208,4 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
         tmp.append("}");
         return tmp.toString();
     }
-
 }
diff --git a/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java b/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java
index aad7427..30c0ebd 100644
--- a/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java
@@ -19,14 +19,5 @@ package com.ning.billing.util.config;
 import org.skife.config.Config;
 import org.skife.config.Default;
 
-public interface EntitlementConfig extends NotificationConfig, KillbillConfig {
-    @Override
-    @Config("killbill.entitlement.engine.notifications.sleep")
-    @Default("500")
-    public long getSleepTimeMs();
-
-    @Override
-    @Config("killbill.entitlement.engine.notifications.off")
-    @Default("false")
-    public boolean isNotificationProcessingOff();
+public interface EntitlementConfig extends KillbillConfig {
 }
diff --git a/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java b/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java
index a1f8d7b..73db4fc 100644
--- a/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java
@@ -19,17 +19,7 @@ package com.ning.billing.util.config;
 import org.skife.config.Config;
 import org.skife.config.Default;
 
-public interface InvoiceConfig extends NotificationConfig, KillbillConfig {
-    @Override
-    @Config("killbill.invoice.engine.notifications.sleep")
-    @Default("500")
-    public long getSleepTimeMs();
-
-    @Override
-    @Config("killbill.invoice.engine.notifications.off")
-    @Default("false")
-    public boolean isNotificationProcessingOff();
-
+public interface InvoiceConfig extends KillbillConfig {
     @Config("killbill.invoice.maxNumberOfMonthsInFuture")
     @Default("36")
     public int getNumberOfMonthsInFuture();
diff --git a/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java b/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java
index 16a6e4b..64ae12d 100644
--- a/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java
@@ -22,7 +22,7 @@ import org.skife.config.Config;
 import org.skife.config.Default;
 
 
-public interface PaymentConfig extends NotificationConfig, KillbillConfig {
+public interface PaymentConfig extends KillbillConfig {
     @Config("killbill.payment.provider.default")
     @Default("noop")
     public String getDefaultPaymentProvider();
@@ -43,16 +43,6 @@ public interface PaymentConfig extends NotificationConfig, KillbillConfig {
     @Default("8")
     public int getPluginFailureRetryMaxAttempts();
 
-    @Override
-    @Config("killbill.payment.engine.notifications.sleep")
-    @Default("500")
-    public long getSleepTimeMs();
-
-    @Override
-    @Config("killbill.payment.engine.notifications.off")
-    @Default("false")
-    public boolean isNotificationProcessingOff();
-
     @Config("killbill.payment.off")
     @Default("false")
     public boolean isPaymentOff();
diff --git a/util/src/main/java/com/ning/billing/util/dao/BinderBase.java b/util/src/main/java/com/ning/billing/util/dao/BinderBase.java
index 27cb985..48f2534 100644
--- a/util/src/main/java/com/ning/billing/util/dao/BinderBase.java
+++ b/util/src/main/java/com/ning/billing/util/dao/BinderBase.java
@@ -17,11 +17,17 @@
 package com.ning.billing.util.dao;
 
 import java.util.Date;
+import java.util.UUID;
 
 import org.joda.time.DateTime;
 
 public abstract class BinderBase {
+
     protected Date getDate(final DateTime dateTime) {
         return dateTime == null ? null : dateTime.toDate();
     }
+
+    protected String getUUIDString(final UUID uuid) {
+        return uuid == null ? null : uuid.toString();
+    }
 }
diff --git a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
index 557a6e1..7e4e1c9 100644
--- a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
@@ -16,15 +16,25 @@
 
 package com.ning.billing.util.glue;
 
+import org.skife.config.ConfigurationObjectFactory;
+
+import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 
 import com.google.inject.AbstractModule;
 
 public class NotificationQueueModule extends AbstractModule {
 
+
+    protected void configureNotificationQueueConfig() {
+        final NotificationQueueConfig config = new ConfigurationObjectFactory(System.getProperties()).build(NotificationQueueConfig.class);
+        bind(NotificationQueueConfig.class).toInstance(config);
+    }
     @Override
     protected void configure() {
         bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+        configureNotificationQueueConfig();
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 811846e..0a295ae 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -55,12 +55,11 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
     public List<Notification> getReadyNotifications(@Bind("now") Date now,
                                                     @Bind("owner") String owner,
                                                     @Bind("max") int max,
-                                                    @Bind("queueName") String queueName,
                                                     @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
     @Mapper(NotificationSqlMapper.class)
-    public List<Notification> getNotificationForAccountAndDate(@Bind("accountId") final String accountId,
+    public List<Notification> getNotificationForAccountAndDate(@Bind("accountRecordId") final long accountRecordId,
                                                                @Bind("effectiveDate") final Date effectiveDate,
                                                                @InternalTenantContextBinder final InternalTenantContext context);
 
@@ -102,7 +101,8 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             stmt.bind("createdDate", getDate(new DateTime()));
             stmt.bind("creatingOwner", evt.getCreatedOwner());
             stmt.bind("className", evt.getNotificationKeyClass());
-            stmt.bind("accountId", evt.getAccountId() != null ? evt.getAccountId().toString() : null);
+            // The current user token will be bound with the InternalTenantContextBinder
+            stmt.bind("futureUserToken", getUUIDString(evt.getFutureUserToken()));
             stmt.bind("notificationKey", evt.getNotificationKey());
             stmt.bind("effectiveDate", getDate(evt.getEffectiveDate()));
             stmt.bind("queueName", evt.getQueueName());
@@ -123,7 +123,8 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             final String createdOwner = r.getString("creating_owner");
             final String className = r.getString("class_name");
             final String notificationKey = r.getString("notification_key");
-            final UUID accountId = getUUID(r, "account_id");
+            final UUID userToken = getUUID(r, "user_token");
+            final UUID futureUserToken = getUUID(r, "future_user_token");
             final String queueName = r.getString("queue_name");
             final DateTime effectiveDate = getDateTime(r, "effective_date");
             final DateTime nextAvailableDate = getDateTime(r, "processing_available_date");
@@ -133,7 +134,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             final Long tenantRecordId = r.getLong("tenant_record_id");
 
             return new DefaultNotification(ordering, id, createdOwner, processingOwner, queueName, nextAvailableDate,
-                                           processingState, className, notificationKey, accountId, effectiveDate,
+                                           processingState, className, notificationKey, userToken, futureUserToken, effectiveDate,
                                            accountRecordId, tenantRecordId);
         }
     }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 848ed9a..c5310a4 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -32,15 +32,16 @@ public class DefaultNotification extends EntityBase implements Notification {
     private final PersistentQueueEntryLifecycleState lifecycleState;
     private final String notificationKeyClass;
     private final String notificationKey;
+    private final UUID userToken;
+    private final UUID futureUserToken;
     private final DateTime effectiveDate;
-    private final UUID accountId;
     private final Long accountRecordId;
     private final Long tenantRecordId;
 
     public DefaultNotification(final long ordering, final UUID id, final String createdOwner, final String owner, final String queueName,
                                final DateTime nextAvailableDate, final PersistentQueueEntryLifecycleState lifecycleState,
-                               final String notificationKeyClass, final String notificationKey, final UUID accountId, final DateTime effectiveDate,
-                               final Long accountRecordId, final Long tenantRecordId) {
+                               final String notificationKeyClass, final String notificationKey, final UUID userToken, final UUID futureUserToken,
+                               final DateTime effectiveDate, final Long accountRecordId, final Long tenantRecordId) {
         super(id);
         this.ordering = ordering;
         this.owner = owner;
@@ -50,17 +51,18 @@ public class DefaultNotification extends EntityBase implements Notification {
         this.lifecycleState = lifecycleState;
         this.notificationKeyClass = notificationKeyClass;
         this.notificationKey = notificationKey;
-        this.accountId = accountId;
+        this.userToken = userToken;
+        this.futureUserToken = futureUserToken;
         this.effectiveDate = effectiveDate;
         this.accountRecordId = accountRecordId;
         this.tenantRecordId = tenantRecordId;
     }
 
     public DefaultNotification(final String queueName, final String createdOwner, final String notificationKeyClass,
-                               final String notificationKey, final UUID accountId, final DateTime effectiveDate,
+                               final String notificationKey, final UUID userToken, final UUID futureUserToken, final DateTime effectiveDate,
                                final Long accountRecordId, final Long tenantRecordId) {
         this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE,
-             notificationKeyClass, notificationKey, accountId, effectiveDate, accountRecordId, tenantRecordId);
+             notificationKeyClass, notificationKey, userToken, futureUserToken, effectiveDate, accountRecordId, tenantRecordId);
     }
 
     @Override
@@ -128,8 +130,13 @@ public class DefaultNotification extends EntityBase implements Notification {
     }
 
     @Override
-    public UUID getAccountId() {
-        return accountId;
+    public UUID getUserToken() {
+        return userToken;
+    }
+
+    @Override
+    public UUID getFutureUserToken() {
+        return futureUserToken;
     }
 
     @Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 6f58a2e..2780faa 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -17,156 +17,162 @@
 package com.ning.billing.util.notificationq;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
-import javax.annotation.Nullable;
-
 import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.billing.util.Hostname;
 import com.ning.billing.util.config.NotificationConfig;
-import com.ning.billing.util.callcontext.CallOrigin;
+
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
-import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
-public class DefaultNotificationQueue extends NotificationQueueBase {
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+
+public class DefaultNotificationQueue implements NotificationQueue {
 
     private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
 
+    private final IDBI dbi;
     private final NotificationSqlDao dao;
-    private final InternalCallContextFactory internalCallContextFactory;
+    private final String hostname;
 
-    public DefaultNotificationQueue(final IDBI dbi, final Clock clock, final String svcName, final String queueName,
-                                    final NotificationQueueHandler handler, final NotificationConfig config,
-                                    final InternalCallContextFactory internalCallContextFactory) {
-        super(clock, svcName, queueName, handler, config);
-        this.dao = dbi.onDemand(NotificationSqlDao.class);
-        this.internalCallContextFactory = internalCallContextFactory;
-    }
+    private final String svcName;
+    private final String queueName;
 
-    @Override
-    public int doProcessEvents() {
-        logDebug("ENTER doProcessEvents");
-        // Finding and claiming notifications is not done per tenant (yet?)
-        final List<Notification> notifications = getReadyNotifications(createCallContext(null, null));
-        if (notifications.size() == 0) {
-            logDebug("EXIT doProcessEvents");
-            return 0;
-        }
+    private final ObjectMapper objectMapper;
 
-        logDebug("START processing %d events at time %s", notifications.size(), getClock().getUTCNow().toDate());
-
-        int result = 0;
-        for (final Notification cur : notifications) {
-            getNbProcessedEvents().incrementAndGet();
-            logDebug("handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-            final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
-            getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
-            result++;
-            clearNotification(cur, createCallContext(cur.getTenantRecordId(), cur.getAccountRecordId()));
-            logDebug("done handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-        }
+    private final NotificationQueueHandler handler;
+
+    private final NotificationQueueService notificationQueueService;
+
+    private volatile boolean isStarted;
 
-        return result;
+    public DefaultNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler,
+                                    final IDBI dbi, final NotificationQueueService notificationQueueService) {
+        this.svcName = svcName;
+        this.queueName = queueName;
+        this.handler = handler;
+        this.dbi = dbi;
+        this.dao = dbi.onDemand(NotificationSqlDao.class);
+        this.hostname = Hostname.get();
+        this.notificationQueueService = notificationQueueService;
+        this.objectMapper = new ObjectMapper();
     }
 
+
     @Override
     public void recordFutureNotification(final DateTime futureNotificationTime,
-                                         final UUID accountId,
                                          final NotificationKey notificationKey,
                                          final InternalCallContext context) throws IOException {
-        recordFutureNotificationInternal(futureNotificationTime, accountId, notificationKey, dao, context);
+        recordFutureNotificationInternal(futureNotificationTime, notificationKey, dao, context);
     }
 
     @Override
     public void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
                                                         final DateTime futureNotificationTime,
-                                                        final UUID accountId,
                                                         final NotificationKey notificationKey,
                                                         final InternalCallContext context) throws IOException {
         final NotificationSqlDao transactionalNotificationDao = transactionalDao.transmogrify(NotificationSqlDao.class);
-        recordFutureNotificationInternal(futureNotificationTime, accountId, notificationKey, transactionalNotificationDao, context);
+        recordFutureNotificationInternal(futureNotificationTime, notificationKey, transactionalNotificationDao, context);
     }
 
     private void recordFutureNotificationInternal(final DateTime futureNotificationTime,
-                                                  final UUID accountId,
                                                   final NotificationKey notificationKey,
                                                   final NotificationSqlDao thisDao,
                                                   final InternalCallContext context) throws IOException {
         final String json = objectMapper.writeValueAsString(notificationKey);
-        final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json,
-                                                                  accountId, futureNotificationTime, context.getAccountRecordId(), context.getTenantRecordId());
-        thisDao.insertNotification(notification, context);
-    }
+        final UUID futureUserToken = UUID.randomUUID();
 
-    private void clearNotification(final Notification cleared, final InternalCallContext context) {
-        dao.clearNotification(cleared.getId().toString(), getHostname(), context);
+        final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.getClass().getName(), json,
+                                                                  context.getUserToken(), futureUserToken, futureNotificationTime, context.getAccountRecordId(), context.getTenantRecordId());
+        thisDao.insertNotification(notification, context);
     }
 
-    private List<Notification> getReadyNotifications(final InternalCallContext context) {
-        final Date now = getClock().getUTCNow().toDate();
-        final Date nextAvailable = getClock().getUTCNow().plus(CLAIM_TIME_MS).toDate();
-        final List<Notification> input = dao.getReadyNotifications(now, getHostname(), CLAIM_TIME_MS, getFullQName(), context);
 
-        final List<Notification> claimedNotifications = new ArrayList<Notification>();
-        for (final Notification cur : input) {
-            logDebug("about to claim notification %s,  key = %s for time %s",
-                     cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
 
-            final boolean claimed = (dao.claimNotification(getHostname(), nextAvailable, cur.getId().toString(), now, context) == 1);
-            logDebug("claimed notification %s, key = %s for time %s result = %s",
-                     cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate(), claimed);
+    @Override
+    public void removeNotificationsByKey(final NotificationKey notificationKey, final InternalCallContext context) {
+        dao.removeNotificationsByKey(notificationKey.toString(), context);
+    }
 
-            if (claimed) {
-                claimedNotifications.add(cur);
-                dao.insertClaimedHistory(getHostname(), now, cur.getId().toString(), context);
+    @Override
+    public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
+        // TODO we have the same use case in InternalCallContextFactory, do we need some sort of helper class?
+        final Long accountRecordId = dbi.withHandle(new HandleCallback<Long>() {
+            @Override
+            public Long withHandle(final Handle handle) throws Exception {
+                final List<Map<String, Object>> values = handle.select("select record_id from accounts where id = " + accountId.toString());
+                if (values.size() == 0) {
+                    return null;
+                } else {
+                    return (Long) values.get(0).get("record_id");
+                }
             }
-        }
+        });
 
-        for (final Notification cur : claimedNotifications) {
-            if (cur.getOwner() != null && !cur.getOwner().equals(getHostname())) {
-                log.warn("NotificationQueue {} stealing notification {} from {}", new Object[]{getFullQName(), cur, cur.getOwner()});
-            }
+        if (accountId == null) {
+            return ImmutableList.<Notification>of();
+        } else {
+            return dao.getNotificationForAccountAndDate(accountRecordId, effectiveDate.toDate(), context);
         }
+    }
 
-        return claimedNotifications;
+    @Override
+    public void removeNotification(final UUID notificationId, final InternalCallContext context) {
+        dao.removeNotification(notificationId.toString(), context);
     }
 
-    private void logDebug(final String format, final Object... args) {
-        if (log.isDebugEnabled()) {
-            final String realDebug = String.format(format, args);
-            log.debug(String.format("Thread %d [queue = %s] %s", Thread.currentThread().getId(), getFullQName(), realDebug));
-        }
+    @Override
+    public String getFullQName() {
+        return NotificationQueueServiceBase.getCompositeName(svcName, queueName);
     }
 
     @Override
-    public void removeNotificationsByKey(final NotificationKey notificationKey, final InternalCallContext context) {
-        dao.removeNotificationsByKey(notificationKey.toString(), context);
+    public String getServiceName() {
+        return svcName;
     }
 
     @Override
-    public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
-        return dao.getNotificationForAccountAndDate(accountId.toString(), effectiveDate.toDate(), context);
+    public String getQueueName() {
+        return queueName;
     }
 
     @Override
-    public void removeNotification(final UUID notificationId, final InternalCallContext context) {
-        dao.removeNotification(notificationId.toString(), context);
+    public NotificationQueueHandler getHandler() {
+        return handler;
+    }
+
+    @Override
+    public void startQueue() {
+        notificationQueueService.startQueue();
+        isStarted = true;
     }
 
-    private InternalCallContext createCallContext(@Nullable final Long tenantRecordId, @Nullable final Long accountRecordId) {
-        return internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "NotificationQueue", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+    @Override
+    public void stopQueue() {
+        // Order matters...
+        isStarted = false;
+        notificationQueueService.stopQueue();
     }
+
+    @Override
+    public boolean isStarted() {
+        return isStarted;
+    }
+
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index 350a140..d56023c 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -21,25 +21,26 @@ import org.skife.jdbi.v2.IDBI;
 import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
 import com.google.inject.Inject;
 
 public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
+
     private final IDBI dbi;
-    private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
-    public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final InternalCallContextFactory internalCallContextFactory) {
-        super(clock);
+    public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final NotificationQueueConfig config,
+                                           final InternalCallContextFactory internalCallContextFactory) {
+        super(clock, config, dbi, internalCallContextFactory);
         this.dbi = dbi;
-        this.internalCallContextFactory = internalCallContextFactory;
     }
 
+
     @Override
     protected NotificationQueue createNotificationQueueInternal(final String svcName,
                                                                 final String queueName,
-                                                                final NotificationQueueHandler handler,
-                                                                final NotificationConfig config) {
-        return new DefaultNotificationQueue(dbi, clock, svcName, queueName, handler, config, internalCallContextFactory);
+                                                                final NotificationQueueHandler handler) {
+        return new DefaultNotificationQueue(svcName, queueName, handler, dbi, this);
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index a790d15..4c28ff5 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -35,6 +35,8 @@ public interface Notification extends PersistentQueueEntryLifecycle, Entity {
 
     public String getQueueName();
 
-    // TODO - do we still need it now we have account_record_id?
-    public UUID getAccountId();
+    // Future user token, i.e. user token of the context when this notification will be claimed.
+    // The user token can be used as a trace to follow events (e.g. all bus events triggered as a result of a
+    // claimed notification will share the same user token)
+    public UUID getFutureUserToken();
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index e592674..58b49c4 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -21,12 +21,11 @@ import java.util.List;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 import com.ning.billing.util.callcontext.InternalCallContext;
-import com.ning.billing.util.entity.dao.EntityDao;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.queue.QueueLifecycle;
 
 public interface NotificationQueue extends QueueLifecycle {
@@ -38,7 +37,6 @@ public interface NotificationQueue extends QueueLifecycle {
      * @param notificationKey        the key for that notification
      */
     public void recordFutureNotification(final DateTime futureNotificationTime,
-                                         final UUID accountId,
                                          final NotificationKey notificationKey,
                                          final InternalCallContext context)
             throws IOException;
@@ -52,7 +50,6 @@ public interface NotificationQueue extends QueueLifecycle {
      */
     public void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
                                                         final DateTime futureNotificationTime,
-                                                        final UUID accountId,
                                                         final NotificationKey notificationKey,
                                                         final InternalCallContext context)
             throws IOException;
@@ -70,13 +67,6 @@ public interface NotificationQueue extends QueueLifecycle {
     public void removeNotification(final UUID notificationId,
                                    final InternalCallContext context);
 
-    /**
-     * This is only valid when the queue has been configured with isNotificationProcessingOff is true
-     * In which case, it will callback users for all the ready notifications.
-     *
-     * @return the number of entries we processed
-     */
-    public int processReadyNotification();
 
     /**
      * @return the name of that queue
@@ -92,4 +82,6 @@ public interface NotificationQueue extends QueueLifecycle {
      * @return the queue name associated
      */
     public String getQueueName();
+
+    public NotificationQueueHandler getHandler();
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index d6df6dd..cae7775 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -16,11 +16,14 @@
 
 package com.ning.billing.util.notificationq;
 
+import java.util.UUID;
+
 import org.joda.time.DateTime;
 
 import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.queue.QueueLifecycle;
 
-public interface NotificationQueueService {
+public interface NotificationQueueService extends QueueLifecycle {
 
     public interface NotificationQueueHandler {
 
@@ -28,10 +31,11 @@ public interface NotificationQueueService {
          * Called for each notification ready
          *
          * @param notificationKey the notification key associated to that notification entry
+         * @param userToken user token associated with that notification entry
          * @param accountRecordId account record id associated with that notification entry
          * @param tenantRecordId  tenant record id associated with that notification entry
          */
-        public void handleReadyNotification(NotificationKey notificationKey, DateTime eventDateTime, Long accountRecordId, Long tenantRecordId);
+        public void handleReadyNotification(NotificationKey notificationKey, DateTime eventDateTime, UUID userToken, Long accountRecordId, Long tenantRecordId);
     }
 
     public static final class NotificationQueueAlreadyExists extends Exception {
@@ -58,12 +62,11 @@ public interface NotificationQueueService {
      * @param svcName   the name of the service using that queue
      * @param queueName a name for that queue (unique per service)
      * @param handler   the handler required for notifying the caller of state change
-     * @param config    the notification queue configuration
      * @return a new NotificationQueue
      * @throws com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists
      *          is the queue associated with that service and name already exits
      */
-    public NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config)
+    public NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler)
             throws NotificationQueueAlreadyExists;
 
     /**
@@ -89,8 +92,7 @@ public interface NotificationQueueService {
             throws NoSuchNotificationQueue;
 
     /**
-     * @param services
      * @return the number of processed notifications
      */
-    public int triggerManualQueueProcessing(final String[] services, final Boolean keepRunning);
+    public int triggerManualQueueProcessing(final Boolean keepRunning);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 8763192..99dd74f 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -17,39 +17,29 @@
 package com.ning.billing.util.notificationq;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.skife.jdbi.v2.IDBI;
 
-import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
 
-import com.google.common.base.Joiner;
 import com.google.inject.Inject;
 
-public abstract class NotificationQueueServiceBase implements NotificationQueueService {
-    protected final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+public abstract class NotificationQueueServiceBase extends NotificationQueueDispatcher implements NotificationQueueService {
 
-    protected final Clock clock;
-
-    private final Map<String, NotificationQueue> queues;
 
     @Inject
-    public NotificationQueueServiceBase(final Clock clock) {
-        this.clock = clock;
-        this.queues = new TreeMap<String, NotificationQueue>();
+    public NotificationQueueServiceBase(final Clock clock, final NotificationQueueConfig config, final IDBI dbi,
+                                        final InternalCallContextFactory internalCallContextFactory) {
+        super(clock, config, dbi, internalCallContextFactory);
     }
 
     @Override
     public NotificationQueue createNotificationQueue(final String svcName,
                                                      final String queueName,
-                                                     final NotificationQueueHandler handler,
-                                                     final NotificationConfig config) throws NotificationQueueAlreadyExists {
-        if (svcName == null || queueName == null || handler == null || config == null) {
+                                                     final NotificationQueueHandler handler) throws NotificationQueueAlreadyExists {
+        if (svcName == null || queueName == null || handler == null) {
             throw new RuntimeException("Need to specify all parameters");
         }
 
@@ -61,7 +51,7 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
                 throw new NotificationQueueAlreadyExists(String.format("Queue for svc %s and name %s already exist",
                                                                        svcName, queueName));
             }
-            result = createNotificationQueueInternal(svcName, queueName, handler, config);
+            result = createNotificationQueueInternal(svcName, queueName, handler);
             queues.put(compositeName, result);
         }
         return result;
@@ -96,33 +86,29 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
         }
     }
 
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("NotificationQueueServiceBase");
+        sb.append("{queues=").append(queues);
+        sb.append('}');
+        return sb.toString();
+    }
+
+
     //
     // Test ONLY
     //
     @Override
-    public int triggerManualQueueProcessing(final String[] services, final Boolean keepRunning) {
+    public int triggerManualQueueProcessing(final Boolean keepRunning) {
 
         int result = 0;
 
-        List<NotificationQueue> manualQueues = null;
-        if (services == null) {
-            manualQueues = new ArrayList<NotificationQueue>(queues.values());
-        } else {
-            final Joiner join = Joiner.on(",");
-            join.join(services);
-
-            log.info("Trigger manual processing for services {} ", join.toString());
-            manualQueues = new LinkedList<NotificationQueue>();
-            synchronized (queues) {
-                for (final String svc : services) {
-                    addQueuesForService(manualQueues, svc);
-                }
-            }
-        }
+        List<NotificationQueue> manualQueues = new ArrayList<NotificationQueue>(queues.values());
         for (final NotificationQueue cur : manualQueues) {
             int processedNotifications = 0;
             do {
-                processedNotifications = cur.processReadyNotification();
+                doProcessEventsWithLimit(1);
                 log.info("Got {} results from queue {}", processedNotifications, cur.getFullQName());
                 result += processedNotifications;
             } while (keepRunning && processedNotifications > 0);
@@ -130,28 +116,6 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
         return result;
     }
 
-    private void addQueuesForService(final List<NotificationQueue> result, final String svcName) {
-        for (final String cur : queues.keySet()) {
-            if (cur.startsWith(svcName)) {
-                result.add(queues.get(cur));
-            }
-        }
-    }
-
     protected abstract NotificationQueue createNotificationQueueInternal(String svcName,
-                                                                         String queueName, NotificationQueueHandler handler,
-                                                                         NotificationConfig config);
-
-    public static String getCompositeName(final String svcName, final String queueName) {
-        return svcName + ":" + queueName;
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("NotificationQueueServiceBase");
-        sb.append("{queues=").append(queues);
-        sb.append('}');
-        return sb.toString();
-    }
+                                                                         String queueName, NotificationQueueHandler handler);
 }
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index 8deea0e..2fa9baa 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -34,23 +34,25 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
 
     private final int nbThreads;
     private final Executor executor;
-    private final String svcName;
+    private final String svcQName;
     private final long sleepTimeMs;
     private boolean isProcessingEvents;
     private int curActiveThreads;
 
     protected final ObjectMapper objectMapper;
-    
-    public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
+
+
+    public PersistentQueueBase(final String svcQName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
         this.executor = executor;
         this.nbThreads = nbThreads;
-        this.svcName = svcName;
+        this.svcQName = svcQName;
         this.sleepTimeMs = config.getSleepTimeMs();
         this.objectMapper = new ObjectMapper();        
         this.isProcessingEvents = false;
         this.curActiveThreads = 0;
     }
 
+
     @Override
     public void startQueue() {
 
@@ -61,7 +63,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
         final CountDownLatch doneInitialization = new CountDownLatch(nbThreads);
 
         log.info(String.format("%s: Starting with %d threads",
-                               svcName, nbThreads));
+                               svcQName, nbThreads));
 
         for (int i = 0; i < nbThreads; i++) {
             executor.execute(new Runnable() {
@@ -69,7 +71,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
                 public void run() {
 
                     log.info(String.format("%s: Thread %s [%d] starting",
-                                           svcName,
+                                           svcQName,
                                            Thread.currentThread().getName(),
                                            Thread.currentThread().getId()));
 
@@ -93,19 +95,19 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
                                 doProcessEvents();
                             } catch (Exception e) {
                                 log.warn(String.format("%s: Thread  %s  [%d] got an exception, catching and moving on...",
-                                                       svcName,
+                                                       svcQName,
                                                        Thread.currentThread().getName(),
                                                        Thread.currentThread().getId()), e);
                             }
                             sleepALittle();
                         }
                     } catch (InterruptedException e) {
-                        log.info(String.format("%s: Thread %s got interrupted, exting... ", svcName, Thread.currentThread().getName()));
+                        log.info(String.format("%s: Thread %s got interrupted, exting... ", svcQName, Thread.currentThread().getName()));
                     } catch (Throwable e) {
-                        log.error(String.format("%s: Thread %s got an exception, exting... ", svcName, Thread.currentThread().getName()), e);
+                        log.error(String.format("%s: Thread %s got an exception, exting... ", svcQName, Thread.currentThread().getName()), e);
                     } finally {
 
-                        log.info(String.format("%s: Thread %s has exited", svcName, Thread.currentThread().getName()));
+                        log.info(String.format("%s: Thread %s has exited", svcQName, Thread.currentThread().getName()));
                         synchronized (thePersistentQ) {
                             curActiveThreads--;
                         }
@@ -121,12 +123,12 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
             final boolean success = doneInitialization.await(waitTimeoutMs, TimeUnit.MILLISECONDS);
             if (!success) {
 
-                log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+                log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcQName, (nbThreads - doneInitialization.getCount()), nbThreads));
             } else {
-                log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+                log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcQName, (nbThreads - doneInitialization.getCount()), nbThreads));
             }
         } catch (InterruptedException e) {
-            log.warn(String.format("%s: Start sequence, got interrupted", svcName));
+            log.warn(String.format("%s: Start sequence, got interrupted", svcQName));
         }
     }
 
@@ -147,17 +149,17 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
             }
 
         } catch (InterruptedException ignore) {
-            log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcName, curActiveThreads));
+            log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcQName, curActiveThreads));
         } finally {
             if (remaining > 0) {
-                log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+                log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcQName, curActiveThreads));
             } else {
-                log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+                log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcQName, curActiveThreads));
             }
             curActiveThreads = 0;
         }
     }
-    
+
     protected <T> T deserializeEvent(final String className, final String json) {
         try {
             final Class<?> claz = Class.forName(className);
@@ -168,9 +170,8 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
         }
     }
 
+    public abstract int doProcessEvents();
 
+    public abstract boolean isStarted();
 
-
-    @Override
-    public abstract int doProcessEvents();
 }
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java
index ea4ab88..922ed34 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.util.queue;
 
+import java.util.UUID;
+
 import org.joda.time.DateTime;
 
 public interface PersistentQueueEntryLifecycle {
@@ -40,4 +42,8 @@ public interface PersistentQueueEntryLifecycle {
     public PersistentQueueEntryLifecycleState getProcessingState();
 
     public boolean isAvailableForProcessing(DateTime now);
+
+    // User token associated with this bus event or notification (i.e. user token from the context that
+    // was used to generate this PersistentQueueEntryLifecycle)
+    public UUID getUserToken();
 }
diff --git a/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java b/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
index b839468..e183bb9 100644
--- a/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
@@ -26,8 +26,5 @@ public interface QueueLifecycle {
      */
     public void stopQueue();
 
-    /**
-     * Processes event from queue
-     */
-    public int doProcessEvents();
+    public boolean isStarted();
 }
diff --git a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
index 5d67ee3..18ec252 100644
--- a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
@@ -8,6 +8,7 @@ getNextBusEventEntry() ::= <<
       record_id
       , class_name
       , event_json
+      , user_token
       , created_date
       , creating_owner
       , processing_owner
@@ -64,6 +65,7 @@ insertBusEvent() ::= <<
     insert into bus_events (
       class_name
     , event_json
+    , user_token
     , created_date
     , creating_owner
     , processing_owner
@@ -74,6 +76,7 @@ insertBusEvent() ::= <<
     ) values (
       :className
     , :eventJson
+    , :userToken
     , :createdDate
     , :creatingOwner
     , :processingOwner
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 157001b..2a19753 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -130,8 +130,9 @@ CREATE TABLE notifications (
     id char(36) NOT NULL,
     created_date datetime NOT NULL,
     class_name varchar(256) NOT NULL,
-    account_id  char(36),
     notification_key varchar(2048) NOT NULL,
+    user_token char(36),
+    future_user_token char(36),
     creating_owner char(50) NOT NULL,
     effective_date datetime NOT NULL,
     queue_name char(64) NOT NULL,
@@ -143,7 +144,7 @@ CREATE TABLE notifications (
     PRIMARY KEY(record_id)
 );
 CREATE UNIQUE INDEX notifications_id ON notifications(id);
-CREATE INDEX  `idx_comp_where` ON notifications (`effective_date`, `queue_name`, `processing_state`,`processing_owner`,`processing_available_date`);
+CREATE INDEX  `idx_comp_where` ON notifications (`effective_date`, `processing_state`,`processing_owner`,`processing_available_date`);
 CREATE INDEX  `idx_update` ON notifications (`processing_state`,`processing_owner`,`processing_available_date`);
 CREATE INDEX  `idx_get_ready` ON notifications (`effective_date`,`created_date`,`id`);
 CREATE INDEX notifications_tenant_account_record_id ON notifications(tenant_record_id, account_record_id);
@@ -185,8 +186,9 @@ CREATE INDEX audit_log_tenant_account_record_id ON audit_log(tenant_record_id, a
 DROP TABLE IF EXISTS bus_events;
 CREATE TABLE bus_events (
     record_id int(11) unsigned NOT NULL AUTO_INCREMENT,
-    class_name varchar(128) NOT NULL, 
-    event_json varchar(2048) NOT NULL,     
+    class_name varchar(128) NOT NULL,
+    event_json varchar(2048) NOT NULL,
+    user_token char(36),
     created_date datetime NOT NULL,
     creating_owner char(50) NOT NULL,
     processing_owner char(50) DEFAULT NULL,
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index f0d9187..895ea5d 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -8,8 +8,9 @@ getReadyNotifications() ::= <<
       record_id
       , id
       , class_name
-      , account_id
       , notification_key
+      , user_token
+      , future_user_token
       , created_date
       , creating_owner
       , effective_date
@@ -22,7 +23,6 @@ getReadyNotifications() ::= <<
     from notifications
     where
       effective_date \<= :now
-      and queue_name = :queueName
       and processing_state != 'PROCESSED'
       and processing_state != 'REMOVED'
       and (processing_owner IS NULL OR processing_available_date \<= :now)
@@ -36,11 +36,12 @@ getReadyNotifications() ::= <<
 
 getNotificationForAccountAndDate() ::= <<
    select
-     record_id
+       record_id
      , id
      , class_name
-     , account_id
      , notification_key
+     , user_token
+     , future_user_token
      , created_date
      , creating_owner
      , effective_date
@@ -52,7 +53,7 @@ getNotificationForAccountAndDate() ::= <<
      , tenant_record_id
    from notifications
    where
-   account_id = :accountId AND effective_date = :effectiveDate
+   account_record_id = :accountRecordId AND effective_date = :effectiveDate
    ;
 >>
 
@@ -101,8 +102,9 @@ insertNotification() ::= <<
     insert into notifications (
       id
       , class_name
-      , account_id
       , notification_key
+      , user_token
+      , future_user_token
       , created_date
       , creating_owner
       , effective_date
@@ -115,8 +117,9 @@ insertNotification() ::= <<
     ) values (
       :id
       , :className
-      , :accountId
       , :notificationKey
+      , :userToken
+      , :futureUserToken
       , :createdDate
       , :creatingOwner
       , :effectiveDate
diff --git a/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java b/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
index aed75f5..93ef001 100644
--- a/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
+++ b/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
@@ -16,16 +16,24 @@
 
 package com.ning.billing.mock.glue;
 
+import org.skife.config.ConfigurationObjectFactory;
+
 import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 
 import com.google.inject.AbstractModule;
 
 public class MockNotificationQueueModule extends AbstractModule {
 
+    protected void configureNotificationQueueConfig() {
+        final NotificationQueueConfig config = new ConfigurationObjectFactory(System.getProperties()).build(NotificationQueueConfig.class);
+        bind(NotificationQueueConfig.class).toInstance(config);
+    }
     @Override
     protected void configure() {
         bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
+        configureNotificationQueueConfig();
     }
 
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 2634e62..a3266d9 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -26,7 +26,6 @@ import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.testng.Assert;
 import org.testng.annotations.BeforeSuite;
-import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
@@ -45,7 +44,6 @@ import static org.testng.Assert.assertNotNull;
 @Guice(modules = TestNotificationSqlDao.TestNotificationSqlDaoModule.class)
 public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
 
-    private static final UUID accountId = UUID.randomUUID();
     private static final String hostname = "Yop";
 
     @Inject
@@ -58,31 +56,20 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
         dao = dbi.onDemand(NotificationSqlDao.class);
     }
 
-    @BeforeTest(groups = "slow")
-    public void cleanupDb() {
-        dbi.withHandle(new HandleCallback<Void>() {
-            @Override
-            public Void withHandle(final Handle handle) throws Exception {
-                handle.execute("delete from notifications");
-                handle.execute("delete from claimed_notifications");
-                return null;
-            }
-        });
-    }
-
     @Test(groups = "slow")
     public void testBasic() throws InterruptedException {
+        final long accountRecordId = 1242L;
         final String ownerId = UUID.randomUUID().toString();
 
         final String notificationKey = UUID.randomUUID().toString();
         final DateTime effDt = new DateTime();
-        final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt,
-                                                           null, internalCallContext.getTenantRecordId());
+        final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
+                                                           accountRecordId, internalCallContext.getTenantRecordId());
         dao.insertNotification(notif, internalCallContext);
 
         Thread.sleep(1000);
         final DateTime now = new DateTime();
-        final List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, "testBasic", internalCallContext);
+        final List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, internalCallContext);
         assertNotNull(notifications);
         assertEquals(notifications.size(), 1);
 
@@ -117,24 +104,25 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
 
     @Test(groups = "slow")
     public void testGetByAccountAndDate() throws InterruptedException {
+        final long accountRecordId = 1242L;
         final String notificationKey = UUID.randomUUID().toString();
         final DateTime effDt = new DateTime();
-        final Notification notif1 = new DefaultNotification("testBasic1", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt,
-                                                            null, internalCallContext.getTenantRecordId());
+        final Notification notif1 = new DefaultNotification("testBasic1", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
+                                                            accountRecordId, internalCallContext.getTenantRecordId());
         dao.insertNotification(notif1, internalCallContext);
 
-        final Notification notif2 = new DefaultNotification("testBasic2", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt,
-                                                            null, internalCallContext.getTenantRecordId());
+        final Notification notif2 = new DefaultNotification("testBasic2", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
+                                                            accountRecordId, internalCallContext.getTenantRecordId());
         dao.insertNotification(notif2, internalCallContext);
 
-        List<Notification> notifications = dao.getNotificationForAccountAndDate(accountId.toString(), effDt.toDate(), internalCallContext);
+        List<Notification> notifications = dao.getNotificationForAccountAndDate(accountRecordId, effDt.toDate(), internalCallContext);
         assertEquals(notifications.size(), 2);
         for (final Notification cur : notifications) {
             Assert.assertEquals(cur.getProcessingState(), PersistentQueueEntryLifecycleState.AVAILABLE);
             dao.removeNotification(cur.getId().toString(), internalCallContext);
         }
 
-        notifications = dao.getNotificationForAccountAndDate(accountId.toString(), effDt.toDate(), internalCallContext);
+        notifications = dao.getNotificationForAccountAndDate(accountRecordId, effDt.toDate(), internalCallContext);
         assertEquals(notifications.size(), 2);
         for (final Notification cur : notifications) {
             Assert.assertEquals(cur.getProcessingState(), PersistentQueueEntryLifecycleState.REMOVED);
@@ -149,8 +137,9 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
                                           " record_id " +
                                           ", id" +
                                           ", class_name" +
-                                          ", account_id" +
                                           ", notification_key" +
+                                          ", user_token" +
+                                          ", future_user_token" +
                                           ", created_date" +
                                           ", creating_owner" +
                                           ", effective_date" +
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index ec917f2..ac267a7 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -24,25 +24,39 @@ import java.util.TreeSet;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
-import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.Hostname;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
-import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
+public class MockNotificationQueue implements NotificationQueue {
+
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
+    private final String hostname;
     private final TreeSet<Notification> notifications;
+    private final Clock clock;
+    private final String svcName;
+    private final String queueName;
+    private final NotificationQueueHandler handler;
+    private final MockNotificationQueueService queueService;
+
+    private volatile boolean isStarted;
+
+    public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final MockNotificationQueueService mockNotificationQueueService) {
+
+        this.svcName = svcName;
+        this.queueName = queueName;
+        this.handler = handler;
+        this.clock = clock;
+        this.hostname = Hostname.get();
+        this.queueService = mockNotificationQueueService;
 
-    public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
-        super(clock, svcName, queueName, handler, config);
         notifications = new TreeSet<Notification>(new Comparator<Notification>() {
             @Override
             public int compare(final Notification o1, final Notification o2) {
@@ -56,11 +70,10 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    public void recordFutureNotification(final DateTime futureNotificationTime, final UUID accountId,
-                                         final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
+    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
         final String json = objectMapper.writeValueAsString(notificationKey);
-        final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, accountId, futureNotificationTime,
-                                                                  null, 0L);
+        final Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.getClass().getName(), json, context.getUserToken(),
+                                                                  UUID.randomUUID(), futureNotificationTime, null, 0L);
         synchronized (notifications) {
             notifications.add(notification);
         }
@@ -68,85 +81,116 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
 
     @Override
     public void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao, final DateTime futureNotificationTime,
-                                                        final UUID accountId, final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
-        recordFutureNotification(futureNotificationTime, accountId, notificationKey, context);
+                                                        final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
+        recordFutureNotification(futureNotificationTime, notificationKey, context);
     }
 
-    public List<Notification> getPendingEvents() {
-        final List<Notification> result = new ArrayList<Notification>();
 
+    @Override
+    public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
+        final List<Notification> toClearNotifications = new ArrayList<Notification>();
         for (final Notification notification : notifications) {
-            if (notification.getProcessingState() == PersistentQueueEntryLifecycleState.AVAILABLE) {
-                result.add(notification);
+            if (notification.getNotificationKey().equals(key.toString())) {
+                toClearNotifications.add(notification);
             }
         }
 
-        return result;
+        synchronized (notifications) {
+            if (toClearNotifications.size() > 0) {
+                notifications.removeAll(toClearNotifications);
+            }
+        }
     }
 
     @Override
-    public int doProcessEvents() {
-        final int result;
-        final List<Notification> processedNotifications = new ArrayList<Notification>();
-        final List<Notification> oldNotifications = new ArrayList<Notification>();
-
-        final List<Notification> readyNotifications = new ArrayList<Notification>();
+    public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
+        /*
+        final List<Notification> result = new ArrayList<Notification>();
         synchronized (notifications) {
-            for (final Notification cur : notifications) {
-                if (cur.isAvailableForProcessing(getClock().getUTCNow())) {
-                    readyNotifications.add(cur);
+            for (Notification cur : notifications) {
+                if (cur.getAccountId().equals(accountId) || cur.getEffectiveDate().compareTo(effectiveDate) == 0) {
+                    result.add(cur);
                 }
             }
         }
+        return result;
+        */
+        return null;
+    }
 
-        result = readyNotifications.size();
-        for (final Notification cur : readyNotifications) {
-            final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
-            getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
-            final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
-                                                                                      "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
-                                                                                      PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
-                                                                                      cur.getNotificationKey(), cur.getAccountId(), cur.getEffectiveDate(),
-                                                                                      cur.getAccountRecordId(), cur.getTenantRecordId());
-            oldNotifications.add(cur);
-            processedNotifications.add(processedNotification);
-        }
 
+    @Override
+    public void removeNotification(final UUID notificationId, final InternalCallContext context) {
         synchronized (notifications) {
-            if (oldNotifications.size() > 0) {
-                notifications.removeAll(oldNotifications);
-            }
-
-            if (processedNotifications.size() > 0) {
-                notifications.addAll(processedNotifications);
+            for (Notification cur : notifications) {
+                if (cur.getId().equals(notificationId)) {
+                    notifications.remove(cur);
+                    break;
+                }
             }
         }
 
-        return result;
     }
 
     @Override
-    public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
-        final List<Notification> toClearNotifications = new ArrayList<Notification>();
-        for (final Notification notification : notifications) {
-            if (notification.getNotificationKey().equals(key.toString())) {
-                toClearNotifications.add(notification);
-            }
-        }
+    public String getFullQName() {
+        return NotificationQueueDispatcher.getCompositeName(svcName, queueName);
+    }
 
-        synchronized (notifications) {
-            if (toClearNotifications.size() > 0) {
-                notifications.removeAll(toClearNotifications);
-            }
-        }
+    @Override
+    public String getServiceName() {
+        return svcName;
     }
 
     @Override
-    public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
-        return null;
+    public String getQueueName() {
+        return queueName;
     }
 
     @Override
-    public void removeNotification(final UUID notificationId, final InternalCallContext context) {
+    public NotificationQueueHandler getHandler() {
+        return handler;
+    }
+
+    @Override
+    public void startQueue() {
+        isStarted = true;
+        queueService.startQueue();
+    }
+
+    @Override
+    public void stopQueue() {
+        isStarted = false;
+        queueService.stopQueue();
+    }
+
+    @Override
+    public boolean isStarted() {
+        return isStarted;
+    }
+
+
+    public List<Notification> getReadyNotifications() {
+        final int result;
+        final List<Notification> processedNotifications = new ArrayList<Notification>();
+        final List<Notification> oldNotifications = new ArrayList<Notification>();
+
+        final List<Notification> readyNotifications = new ArrayList<Notification>();
+        synchronized (notifications) {
+            for (final Notification cur : notifications) {
+                if (cur.isAvailableForProcessing(clock.getUTCNow())) {
+                    readyNotifications.add(cur);
+                }
+            }
+        }
+        return readyNotifications;
     }
+
+    public void markProcessedNotifications(final List<Notification> toBeremoved, final List<Notification> toBeAdded) {
+        synchronized (notifications) {
+            notifications.removeAll(toBeremoved);
+            notifications.addAll(toBeAdded);
+        }
+    }
+
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
index ee5ca32..0284747 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
@@ -16,22 +16,65 @@
 
 package com.ning.billing.util.notificationq;
 
-import com.ning.billing.util.config.NotificationConfig;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
 import com.google.inject.Inject;
 
 public class MockNotificationQueueService extends NotificationQueueServiceBase {
 
     @Inject
-    public MockNotificationQueueService(final Clock clock) {
-        super(clock);
+    public MockNotificationQueueService(final Clock clock, final NotificationQueueConfig config) {
+        super(clock, config, null, null);
+    }
+
+
+    @Override
+    protected NotificationQueue createNotificationQueueInternal(final String svcName, final String queueName,
+                                                                final NotificationQueueHandler handler) {
+        return new MockNotificationQueue(clock, svcName, queueName, handler, this);
     }
 
+
     @Override
-    protected NotificationQueue createNotificationQueueInternal(final String svcName,
-                                                                final String queueName, final NotificationQueueHandler handler,
-                                                                final NotificationConfig config) {
-        return new MockNotificationQueue(clock, svcName, queueName, handler, config);
+    public int doProcessEvents() {
+
+        int result = 0;
+
+        for (NotificationQueue cur : queues.values()) {
+            result += doProcessEventsForQueue((MockNotificationQueue) cur);
+        }
+        return result;
+    }
+
+    private int doProcessEventsForQueue(final MockNotificationQueue queue) {
+
+
+        int result = 0;
+        final List<Notification> processedNotifications = new ArrayList<Notification>();
+        final List<Notification> oldNotifications = new ArrayList<Notification>();
+
+        List<Notification> readyNotifications = queue.getReadyNotifications();
+        for (final Notification cur : readyNotifications) {
+            final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
+            queue.getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getFutureUserToken(), cur.getAccountRecordId(), cur.getTenantRecordId());
+            final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
+                                                                                      "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
+                                                                                      PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
+                                                                                      cur.getNotificationKey(), cur.getUserToken(), cur.getFutureUserToken(), cur.getEffectiveDate(),
+                                                                                      cur.getAccountRecordId(), cur.getTenantRecordId());
+            oldNotifications.add(cur);
+            processedNotifications.add(processedNotification);
+            result++;
+        }
+
+        queue.markProcessedNotifications(oldNotifications, processedNotifications);
+        return result;
     }
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 31834b2..d12cccf 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -24,9 +24,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.joda.time.DateTime;
-import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -36,10 +34,8 @@ import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
-import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
-import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
@@ -65,8 +61,6 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
 
     private final Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
 
-    private static final UUID accountId = UUID.randomUUID();
-
     private EntitySqlDaoTransactionalJdbiWrapper entitySqlDaoTransactionalJdbiWrapper;
 
     @Inject
@@ -75,7 +69,8 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
     @Inject
     private Clock clock;
 
-    private DummySqlTest dao;
+    @Inject
+    NotificationQueueService queueService;
 
     private int eventsReceived;
 
@@ -97,28 +92,24 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         public int compareTo(TestNotificationKey arg0) {
             return value.compareTo(arg0.value);
         }
+
+        @Override
+        public String toString() {
+            final StringBuilder sb = new StringBuilder();
+            sb.append(value);
+            return sb.toString();
+        }
     }
 
     @BeforeSuite(groups = "slow")
     public void setup() throws Exception {
         final String testDdl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl_test.sql"));
         helper.initDb(testDdl);
-        dao = dbi.onDemand(DummySqlTest.class);
         entitySqlDaoTransactionalJdbiWrapper = new EntitySqlDaoTransactionalJdbiWrapper(dbi);
     }
 
     @BeforeTest(groups = "slow")
     public void beforeTest() {
-        dbi.withHandle(new HandleCallback<Void>() {
-
-            @Override
-            public Void withHandle(final Handle handle) throws Exception {
-                handle.execute("delete from notifications");
-                handle.execute("delete from claimed_notifications");
-                handle.execute("delete from dummy");
-                return null;
-            }
-        });
         // Reset time to real value
         ((ClockMock) clock).resetDeltaFromReality();
         eventsReceived = 0;
@@ -135,20 +126,19 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
 
         final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
 
-        final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
-                                                                                    synchronized (expectedNotifications) {
-                                                                                        log.info("Handler received key: " + notificationKey);
-
-                                                                                        expectedNotifications.put(notificationKey, Boolean.TRUE);
-                                                                                        expectedNotifications.notify();
-                                                                                    }
-                                                                                }
-                                                                            },
-                                                                            getNotificationConfig(false, 100, 1, 10000),
-                                                                            new InternalCallContextFactory(dbi, clock));
+        final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+                                                                             "foo",
+                                                                             new NotificationQueueHandler() {
+                                                                                 @Override
+                                                                                 public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                     synchronized (expectedNotifications) {
+                                                                                         log.info("Handler received key: " + notificationKey);
+
+                                                                                         expectedNotifications.put(notificationKey, Boolean.TRUE);
+                                                                                         expectedNotifications.notify();
+                                                                                     }
+                                                                                 }
+                                                                             });
 
         queue.startQueue();
 
@@ -161,50 +151,69 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         expectedNotifications.put(notificationKey, Boolean.FALSE);
 
         // Insert dummy to be processed in 2 sec'
-        entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+        entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>()
+
+        {
             @Override
-            public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+            public Void inTransaction(
+                    final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws
+                                                                                               Exception {
 
                 entitySqlDaoWrapperFactory.transmogrify(DummySqlTest.class).insertDummy(obj);
-                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, accountId, notificationKey, internalCallContext);
+                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, notificationKey, internalCallContext);
                 log.info("Posted key: " + notificationKey);
 
                 return null;
             }
-        });
+        }
+
+                                                    );
 
         // Move time in the future after the notification effectiveDate
-        ((ClockMock) clock).setDeltaFromReality(3000);
+        ((ClockMock) clock).
+
+                                   setDeltaFromReality(3000);
 
         // Notification should have kicked but give it at least a sec' for thread scheduling
-        await().atMost(1, MINUTES).until(new Callable<Boolean>() {
-            @Override
-            public Boolean call() throws Exception {
-                return expectedNotifications.get(notificationKey);
-            }
-        });
+        await()
+
+                .
+
+                        atMost(1, MINUTES)
+
+                .
+
+                        until(new Callable<Boolean>() {
+                            @Override
+                            public Boolean call() throws
+                                                  Exception {
+                                return expectedNotifications.get(notificationKey);
+                            }
+                        }
+
+                             );
 
         queue.stopQueue();
         Assert.assertTrue(expectedNotifications.get(notificationKey));
     }
 
     @Test(groups = "slow")
-    public void testManyNotifications() throws InterruptedException {
+    public void testManyNotifications() throws Exception {
         final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
 
-        final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
-                                                                                    synchronized (expectedNotifications) {
-                                                                                        expectedNotifications.put(notificationKey, Boolean.TRUE);
-                                                                                        expectedNotifications.notify();
-                                                                                    }
-                                                                                }
-                                                                            },
-                                                                            getNotificationConfig(false, 100, 10, 10000),
-                                                                            new InternalCallContextFactory(dbi, clock));
-
+        final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+                                                                             "many",
+                                                                             new NotificationQueueHandler() {
+                                                                                 @Override
+                                                                                 public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                     synchronized (expectedNotifications) {
+                                                                                         log.info("Handler received key: " + notificationKey.toString());
+
+                                                                                         expectedNotifications.put(notificationKey, Boolean.TRUE);
+                                                                                         expectedNotifications.notify();
+                                                                                     }
+                                                                                 }
+                                                                             });
         queue.startQueue();
 
         final DateTime now = clock.getUTCNow();
@@ -217,7 +226,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
             final DummyObject obj = new DummyObject("foo", key);
             final int currentIteration = i;
 
-            final NotificationKey notificationKey = new TestNotificationKey(key.toString());
+            final NotificationKey notificationKey = new TestNotificationKey(new Integer(i).toString());
             expectedNotifications.put(notificationKey, Boolean.FALSE);
 
             entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -226,7 +235,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
 
                     entitySqlDaoWrapperFactory.transmogrify(DummySqlTest.class).insertDummy(obj);
                     queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, now.plus((currentIteration + 1) * nextReadyTimeIncrementMs),
-                                                                  accountId, notificationKey, internalCallContext);
+                                                                  notificationKey, internalCallContext);
                     return null;
                 }
             });
@@ -255,7 +264,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
                     success = true;
                     break;
                 }
-                //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+                log.info(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
                 expectedNotifications.wait(1000);
             }
         } while (nbTry-- > 0);
@@ -281,40 +290,23 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         final Map<NotificationKey, Boolean> expectedNotificationsFred = new TreeMap<NotificationKey, Boolean>();
         final Map<NotificationKey, Boolean> expectedNotificationsBarney = new TreeMap<NotificationKey, Boolean>();
 
-        final NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, new InternalCallContextFactory(dbi, clock));
-
-        final NotificationConfig config = new NotificationConfig() {
-            @Override
-            public boolean isNotificationProcessingOff() {
-                return false;
-            }
-
+        final NotificationQueue queueFred = queueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
             @Override
-            public long getSleepTimeMs() {
-                return 10;
-            }
-        };
-
-        final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
-            @Override
-            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                 log.info("Fred received key: " + notificationKey);
                 expectedNotificationsFred.put(notificationKey, Boolean.TRUE);
                 eventsReceived++;
             }
-        },
-                                                                                             config);
+        });
 
-        final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
+        final NotificationQueue queueBarney = queueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
             @Override
-            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                 log.info("Barney received key: " + notificationKey);
                 expectedNotificationsBarney.put(notificationKey, Boolean.TRUE);
                 eventsReceived++;
             }
-        },
-                                                                                               config);
-
+        });
         queueFred.startQueue();
         //		We don't start Barney so it can never pick up notifications
 
@@ -335,9 +327,9 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
             public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
                 entitySqlDaoWrapperFactory.transmogrify(DummySqlTest.class).insertDummy(obj);
 
-                queueFred.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, accountId, notificationKeyFred, internalCallContext);
+                queueFred.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, notificationKeyFred, internalCallContext);
                 log.info("posted key: " + notificationKeyFred.toString());
-                queueBarney.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, accountId, notificationKeyBarney, internalCallContext);
+                queueBarney.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, notificationKeyBarney, internalCallContext);
                 log.info("posted key: " + notificationKeyBarney.toString());
                 return null;
             }
@@ -365,40 +357,24 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney));
     }
 
-    NotificationConfig getNotificationConfig(final boolean off, final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
-        return new NotificationConfig() {
-            @Override
-            public boolean isNotificationProcessingOff() {
-                return off;
-            }
-
-            @Override
-            public long getSleepTimeMs() {
-                return sleepTime;
-            }
-        };
-    }
-
     @Test(groups = "slow")
-    public void testRemoveNotifications() throws InterruptedException {
+    public void testRemoveNotifications() throws Exception {
         final UUID key = UUID.randomUUID();
         final NotificationKey notificationKey = new TestNotificationKey(key.toString());
         final UUID key2 = UUID.randomUUID();
         final NotificationKey notificationKey2 = new TestNotificationKey(key2.toString());
 
-        final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
-                                                                                    if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
-                                                                                        log.info("Received notification with key: " + notificationKey);
-                                                                                        eventsReceived++;
-                                                                                    }
-                                                                                }
-                                                                            },
-                                                                            getNotificationConfig(false, 100, 10, 10000),
-                                                                            new InternalCallContextFactory(dbi, clock));
-
+        final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+                                                                             "remove",
+                                                                             new NotificationQueueHandler() {
+                                                                                 @Override
+                                                                                 public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                     if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
+                                                                                         log.info("Received notification with key: " + notificationKey);
+                                                                                         eventsReceived++;
+                                                                                     }
+                                                                                 }
+                                                                             });
         queue.startQueue();
 
         final DateTime start = clock.getUTCNow().plusHours(1);
@@ -409,9 +385,9 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
             @Override
             public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
-                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(nextReadyTimeIncrementMs), accountId, notificationKey, internalCallContext);
-                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(2 * nextReadyTimeIncrementMs), accountId, notificationKey, internalCallContext);
-                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(3 * nextReadyTimeIncrementMs), accountId, notificationKey2, internalCallContext);
+                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(nextReadyTimeIncrementMs), notificationKey, internalCallContext);
+                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(2 * nextReadyTimeIncrementMs), notificationKey, internalCallContext);
+                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(3 * nextReadyTimeIncrementMs), notificationKey2, internalCallContext);
                 return null;
             }
         });
@@ -436,16 +412,32 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         queue.stopQueue();
     }
 
+    static NotificationQueueConfig getNotificationConfig(final boolean off, final long sleepTime) {
+        return new NotificationQueueConfig() {
+            @Override
+            public boolean isNotificationProcessingOff() {
+                return off;
+            }
+
+            @Override
+            public long getSleepTimeMs() {
+                return sleepTime;
+            }
+        };
+    }
+
     public static class TestNotificationQueueModule extends AbstractModule {
 
         @Override
         protected void configure() {
-            bind(Clock.class).to(ClockMock.class);
+            bind(Clock.class).to(ClockMock.class).asEagerSingleton();
 
             final IDBI dbi = getDBI();
             bind(IDBI.class).toInstance(dbi);
             final IDBI otherDbi = getDBI();
             bind(IDBI.class).annotatedWith(Names.named("global-lock")).toInstance(otherDbi);
+            bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+            bind(NotificationQueueConfig.class).toInstance(getNotificationConfig(false, 100));
         }
     }
 }