killbill-memoizeit

Changes

entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionFactory.java 63(+0 -63)

Details

diff --git a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
index 7bd0ae1..c80e303 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
@@ -44,7 +44,6 @@ import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.api.svcs.DefaultEntitlementInternalApi;
 import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
 import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory;
 import com.ning.billing.entitlement.api.user.EntitlementUserApi;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
@@ -58,9 +57,11 @@ import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
 import com.ning.billing.util.config.CatalogConfig;
 import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 
+
 public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
 
     private BusinessAccountTagSqlDao accountTagSqlDao;
@@ -73,6 +74,19 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
     private EntitlementUserApi entitlementUserApi;
     private BusinessTagDao tagDao;
 
+
+    private NotificationQueueConfig config = new NotificationQueueConfig() {
+        @Override
+        public boolean isNotificationProcessingOff() {
+            return false;
+        }
+
+        @Override
+        public long getSleepTimeMs() {
+            return 3000;
+        }
+    };
+
     @BeforeMethod(groups = "slow")
     public void setUp() throws Exception {
         final Clock clock = new ClockMock();
@@ -89,13 +103,12 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
         accountUserApi = new DefaultAccountUserApi(callContextFactory, internalCallContextFactory, accountDao);
         final CatalogService catalogService = new DefaultCatalogService(Mockito.mock(CatalogConfig.class), Mockito.mock(VersionedCatalogLoader.class));
         final AddonUtils addonUtils = new AddonUtils(catalogService);
-        final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, internalCallContextFactory);
+        final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, config, internalCallContextFactory);
         final EntitlementDao entitlementDao = new DefaultEntitlementDao(dbi, clock, addonUtils, notificationQueueService, eventBus, catalogService);
         final PlanAligner planAligner = new PlanAligner(catalogService);
-        final DefaultSubscriptionApiService apiService = new DefaultSubscriptionApiService(clock, entitlementDao, catalogService, planAligner, internalCallContextFactory);
-        final DefaultSubscriptionFactory subscriptionFactory = new DefaultSubscriptionFactory(apiService, clock, catalogService);
-        entitlementApi = new DefaultEntitlementInternalApi(entitlementDao, subscriptionFactory);
-        entitlementUserApi = new DefaultEntitlementUserApi(clock, entitlementDao, catalogService, apiService, subscriptionFactory, addonUtils, internalCallContextFactory);
+        final DefaultSubscriptionApiService apiService = new DefaultSubscriptionApiService(clock, entitlementDao, catalogService, planAligner, addonUtils, internalCallContextFactory);
+        entitlementApi = new DefaultEntitlementInternalApi(entitlementDao, apiService, clock, catalogService);
+        entitlementUserApi = new DefaultEntitlementUserApi(clock, entitlementDao, catalogService, apiService, addonUtils, internalCallContextFactory);
         tagDao = new BusinessTagDao(accountTagSqlDao, invoicePaymentTagSqlDao, invoiceTagSqlDao, subscriptionTransitionTagSqlDao,
                                     accountApi, entitlementApi);
 
diff --git a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
index e04fed7..5ecdbb6 100644
--- a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
+++ b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
@@ -292,4 +292,12 @@ public interface InvoiceUserApi {
      * @throws InvoiceApiException
      */
     public String getInvoiceAsHTML(UUID invoiceId, TenantContext context) throws AccountApiException, IOException, InvoiceApiException;
+
+    /**
+     * Rebalance CBA for account which have credit and unpaid invoices-- only needed if system is configured to not rebalance automatically.
+     *
+     * @param accountId account id
+     * @param context the callcontext
+     */
+    public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final CallContext context);
 }
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
index 81c832a..eddf4b5 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
@@ -63,6 +63,8 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
     private final InternalCallContextFactory internalCallContextFactory;
     private final AccountInternalApi accountApi;
 
+    private volatile boolean isStarted;
+
     private static final class EventBusDelegate extends EventBus {
 
         public EventBusDelegate(final String busName) {
@@ -90,10 +92,12 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
 
     public void start() {
         startQueue();
+        isStarted = true;
     }
 
     public void stop() {
         stopQueue();
+        isStarted = false;
     }
 
     @Override
@@ -119,6 +123,11 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
         return result;
     }
 
+    @Override
+    public boolean isStarted() {
+        return isStarted;
+    }
+
     private final UUID getAccountIdFromRecordId(final Long recordId, final InternalCallContext context) {
         try {
             final Account account = accountApi.getAccountByRecordId(recordId, context);
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
index 38b3a50..fcd10f8 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
@@ -179,17 +179,17 @@ public class TestOverdueIntegration extends TestOverdueBase {
                                             new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 30), new LocalDate(2012, 7, 31), InvoiceItemType.REPAIR_ADJ, new BigDecimal("-249.95")),
                                             new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 23), InvoiceItemType.CBA_ADJ, new BigDecimal("249.95")));
         invoiceChecker.checkInvoice(account.getId(), 4, callContext,
-                                    // Note the end date here is not 07-25, but 07-9. The overdue configuration disabled invoicing between 07-09 and 07-23 (e.g. the bundle
+                                    // Note the end date here is not 07-25, but 07-10. The overdue configuration disabled invoicing between 07-10 and 07-23 (e.g. the bundle
                                     // was inaccessible, hence we didn't want to charge the customer for that period, even though the account was overdue).
-                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 30), new LocalDate(2012, 7, 9), InvoiceItemType.RECURRING, new BigDecimal("74.99")),
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 30), new LocalDate(2012, 7, 10), InvoiceItemType.RECURRING, new BigDecimal("83.31")),
                                     // Item for the upgraded recurring plan
                                     new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 31), InvoiceItemType.RECURRING, new BigDecimal("154.85")),
                                     // Credits consumed
-                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 23), InvoiceItemType.CBA_ADJ, new BigDecimal("-229.84")));
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 23), InvoiceItemType.CBA_ADJ, new BigDecimal("-238.16")));
         invoiceChecker.checkChargedThroughDate(baseSubscription.getId(), new LocalDate(2012, 7, 31), callContext);
 
         // Verify the account balance: 249.95 - 74.99 - 154.85
-        assertEquals(invoiceUserApi.getAccountBalance(account.getId(), callContext).compareTo(new BigDecimal("-20.11")), 0);
+        assertEquals(invoiceUserApi.getAccountBalance(account.getId(), callContext).compareTo(new BigDecimal("-11.79")), 0);
     }
 
     @Test(groups = "slow")
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/EntitlementApiBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/EntitlementApiBase.java
new file mode 100644
index 0000000..39d636b
--- /dev/null
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/EntitlementApiBase.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.entitlement.api;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.ning.billing.catalog.api.CatalogService;
+import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.engine.dao.EntitlementDao;
+import com.ning.billing.entitlement.events.EntitlementEvent;
+import com.ning.billing.util.clock.Clock;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
+public class EntitlementApiBase {
+
+    protected final EntitlementDao dao;
+
+    protected final SubscriptionApiService apiService;
+    protected final Clock clock;
+    protected final CatalogService catalogService;
+
+    public EntitlementApiBase(final EntitlementDao dao, final SubscriptionApiService apiService, final Clock clock, final CatalogService catalogService) {
+        this.dao = dao;
+        this.apiService = apiService;
+        this.clock = clock;
+        this.catalogService = catalogService;
+    }
+
+    protected List<Subscription> createSubscriptionsForApiUse(final List<Subscription> internalSubscriptions) {
+        return new ArrayList<Subscription>(Collections2.transform(internalSubscriptions, new Function<Subscription, Subscription>() {
+            @Override
+            public Subscription apply(final Subscription subscription) {
+                return createSubscriptionForApiUse((SubscriptionData) subscription);
+            }
+        }));
+    }
+
+    protected SubscriptionData createSubscriptionForApiUse(final Subscription internalSubscription) {
+        return new SubscriptionData((SubscriptionData) internalSubscription, apiService, clock);
+    }
+
+    protected SubscriptionData createSubscriptionForApiUse(SubscriptionBuilder builder, List<EntitlementEvent> events) {
+        final SubscriptionData subscription = new SubscriptionData(builder, apiService, clock);
+        if (events.size() > 0) {
+            subscription.rebuildTransitions(events, catalogService.getFullCatalog());
+        }
+        return subscription;
+    }
+}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java
index a6eb3d9..30bb487 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java
@@ -20,7 +20,8 @@ import java.util.List;
 
 import org.joda.time.DateTime;
 
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.events.EntitlementEvent;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
index e836d59..1b42632 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
@@ -25,13 +25,15 @@ import java.util.UUID;
 
 import org.joda.time.DateTime;
 
+import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.alignment.MigrationPlanAligner;
 import com.ning.billing.entitlement.alignment.TimedMigration;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
@@ -54,24 +56,20 @@ import com.ning.billing.util.clock.Clock;
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 
-public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
+public class DefaultEntitlementMigrationApi extends EntitlementApiBase implements EntitlementMigrationApi {
 
-    private final EntitlementDao dao;
     private final MigrationPlanAligner migrationAligner;
-    private final SubscriptionFactory factory;
-    private final Clock clock;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
     public DefaultEntitlementMigrationApi(final MigrationPlanAligner migrationAligner,
-                                          final SubscriptionFactory factory,
+                                          final SubscriptionApiService apiService,
+                                          final CatalogService catalogService,
                                           final EntitlementDao dao,
                                           final Clock clock,
                                           final InternalCallContextFactory internalCallContextFactory) {
-        this.dao = dao;
+        super(dao, apiService, clock, catalogService);
         this.migrationAligner = migrationAligner;
-        this.factory = factory;
-        this.clock = clock;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
@@ -141,13 +139,13 @@ public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
         final TimedMigration[] events = migrationAligner.getEventsMigration(input, now);
         final DateTime migrationStartDate = events[0].getEventTime();
         final List<EntitlementEvent> emptyEvents = Collections.emptyList();
-        final SubscriptionData subscriptionData = factory.createSubscription(new SubscriptionBuilder()
-                                                                                     .setId(UUID.randomUUID())
-                                                                                     .setBundleId(bundleId)
-                                                                                     .setCategory(productCategory)
-                                                                                     .setBundleStartDate(migrationStartDate)
-                                                                                     .setAlignStartDate(migrationStartDate),
-                                                                             emptyEvents);
+        final SubscriptionData subscriptionData = createSubscriptionForApiUse(new SubscriptionBuilder()
+                                                                                      .setId(UUID.randomUUID())
+                                                                                      .setBundleId(bundleId)
+                                                                                      .setCategory(productCategory)
+                                                                                      .setBundleStartDate(migrationStartDate)
+                                                                                      .setAlignStartDate(migrationStartDate),
+                                                                              emptyEvents);
         return new SubscriptionMigrationData(subscriptionData, toEvents(subscriptionData, now, ctd, events, context), ctd);
     }
 
@@ -157,13 +155,13 @@ public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
         final TimedMigration[] events = migrationAligner.getEventsMigration(input, now);
         final DateTime migrationStartDate = events[0].getEventTime();
         final List<EntitlementEvent> emptyEvents = Collections.emptyList();
-        final SubscriptionData subscriptionData = factory.createSubscription(new SubscriptionBuilder()
-                                                                                     .setId(UUID.randomUUID())
-                                                                                     .setBundleId(bundleId)
-                                                                                     .setCategory(productCategory)
-                                                                                     .setBundleStartDate(bundleStartDate)
-                                                                                     .setAlignStartDate(migrationStartDate),
-                                                                             emptyEvents);
+        final SubscriptionData subscriptionData = createSubscriptionForApiUse(new SubscriptionBuilder()
+                                                                                      .setId(UUID.randomUUID())
+                                                                                      .setBundleId(bundleId)
+                                                                                      .setCategory(productCategory)
+                                                                                      .setBundleStartDate(bundleStartDate)
+                                                                                      .setAlignStartDate(migrationStartDate),
+                                                                              emptyEvents);
         return new SubscriptionMigrationData(subscriptionData, toEvents(subscriptionData, now, ctd, events, context), ctd);
     }
 
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
index 4721c66..db0af82 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
@@ -23,10 +23,11 @@ import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.PhaseType;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.PlanPhaseSpecifier;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.InternalCallContext;
 
 public interface SubscriptionApiService {
 
@@ -54,4 +55,6 @@ public interface SubscriptionApiService {
     public boolean changePlanWithPolicy(SubscriptionData subscription, String productName, BillingPeriod term,
                                         String priceList, DateTime requestedDate, ActionPolicy policy, CallContext context)
             throws EntitlementUserApiException;
+
+    public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context);
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java
index 1596030..f4b0420 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java
@@ -29,17 +29,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.ErrorCode;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.catalog.api.CatalogService;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
 import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
+import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.events.EffectiveSubscriptionInternalEvent;
 import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
 
@@ -48,18 +51,17 @@ import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 
-public class DefaultEntitlementInternalApi implements EntitlementInternalApi {
+public class DefaultEntitlementInternalApi extends EntitlementApiBase implements EntitlementInternalApi {
 
     private final Logger log = LoggerFactory.getLogger(DefaultEntitlementInternalApi.class);
 
-    private final EntitlementDao dao;
-    private final SubscriptionFactory subscriptionFactory;
 
     @Inject
     public DefaultEntitlementInternalApi(final EntitlementDao dao,
-                                         final SubscriptionFactory subscriptionFactory) {
-        this.dao = dao;
-        this.subscriptionFactory = subscriptionFactory;
+                                         final DefaultSubscriptionApiService apiService,
+                                         final Clock clock,
+                                         final CatalogService catalogService) {
+        super(dao, apiService, clock, catalogService);
     }
 
     @Override
@@ -68,26 +70,31 @@ public class DefaultEntitlementInternalApi implements EntitlementInternalApi {
     }
 
     @Override
-    public List<Subscription> getSubscriptionsForBundle(final UUID bundleId, final InternalTenantContext context) {
-        return dao.getSubscriptions(subscriptionFactory, bundleId, context);
+    public List<Subscription> getSubscriptionsForBundle(UUID bundleId,
+                                                        InternalTenantContext context) {
+        final List<Subscription> internalSubscriptions = dao.getSubscriptions(bundleId, context);
+        return createSubscriptionsForApiUse(internalSubscriptions);
     }
 
     @Override
-    public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) throws EntitlementUserApiException {
-        final Subscription result = dao.getBaseSubscription(subscriptionFactory, bundleId, context);
+    public Subscription getBaseSubscription(UUID bundleId,
+                                            InternalTenantContext context) throws EntitlementUserApiException {
+        final Subscription result = dao.getBaseSubscription(bundleId, context);
         if (result == null) {
             throw new EntitlementUserApiException(ErrorCode.ENT_GET_NO_SUCH_BASE_SUBSCRIPTION, bundleId);
         }
-        return result;
+        return createSubscriptionForApiUse(result);
     }
 
     @Override
-    public Subscription getSubscriptionFromId(final UUID id, final InternalTenantContext context) throws EntitlementUserApiException {
-        final Subscription result = dao.getSubscriptionFromId(subscriptionFactory, id, context);
+
+    public Subscription getSubscriptionFromId(UUID id,
+                                              InternalTenantContext context) throws EntitlementUserApiException {
+        final Subscription result = dao.getSubscriptionFromId(id, context);
         if (result == null) {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_SUBSCRIPTION_ID, id);
         }
-        return result;
+        return createSubscriptionForApiUse(result);
     }
 
     @Override
@@ -105,8 +112,9 @@ public class DefaultEntitlementInternalApi implements EntitlementInternalApi {
     }
 
     @Override
-    public void setChargedThroughDate(final UUID subscriptionId, final LocalDate localChargedThruDate, final InternalCallContext context) {
-        final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, subscriptionId, context);
+    public void setChargedThroughDate(UUID subscriptionId,
+                                      LocalDate localChargedThruDate, InternalCallContext context) {
+        final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionId, context);
         final DateTime chargedThroughDate = localChargedThruDate.toDateTime(new LocalTime(subscription.getStartDate(), DateTimeZone.UTC), DateTimeZone.UTC);
         final SubscriptionBuilder builder = new SubscriptionBuilder(subscription)
                 .setChargedThroughDate(chargedThroughDate)
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
index f61d2a6..c44617a 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
@@ -16,6 +16,7 @@
 
 package com.ning.billing.entitlement.api.timeline;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -25,21 +26,25 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
 
+import javax.annotation.Nullable;
+
 import org.joda.time.DateTime;
 
 import com.ning.billing.ErrorCode;
 import com.ning.billing.catalog.api.CatalogApiException;
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
 import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.NewEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.glue.DefaultEntitlementModule;
@@ -47,17 +52,21 @@ import com.ning.billing.util.callcontext.CallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 import com.ning.billing.util.callcontext.TenantContext;
+import com.ning.billing.util.clock.Clock;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
-public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
+public class DefaultEntitlementTimelineApi extends EntitlementApiBase implements EntitlementTimelineApi {
 
-    private final EntitlementDao dao;
-    private final SubscriptionFactory factory;
     private final RepairEntitlementLifecycleDao repairDao;
     private final CatalogService catalogService;
     private final InternalCallContextFactory internalCallContextFactory;
+    private final AddonUtils addonUtils;
+
+    private final SubscriptionApiService repairApiService;
 
     private enum RepairType {
         BASE_REPAIR,
@@ -66,14 +75,17 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
     }
 
     @Inject
-    public DefaultEntitlementTimelineApi(@Named(DefaultEntitlementModule.REPAIR_NAMED) final SubscriptionFactory factory, final CatalogService catalogService,
+    public DefaultEntitlementTimelineApi(final CatalogService catalogService,
+                                         final SubscriptionApiService apiService,
                                          @Named(DefaultEntitlementModule.REPAIR_NAMED) final RepairEntitlementLifecycleDao repairDao, final EntitlementDao dao,
-                                         final InternalCallContextFactory internalCallContextFactory) {
+                                         @Named(DefaultEntitlementModule.REPAIR_NAMED) final SubscriptionApiService repairApiService,
+                                         final InternalCallContextFactory internalCallContextFactory, final Clock clock, final AddonUtils addonUtils) {
+        super(dao, apiService, clock, catalogService);
         this.catalogService = catalogService;
-        this.dao = dao;
         this.repairDao = repairDao;
-        this.factory = factory;
         this.internalCallContextFactory = internalCallContextFactory;
+        this.repairApiService = repairApiService;
+        this.addonUtils = addonUtils;
     }
 
     @Override
@@ -101,7 +113,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
             if (bundle == null) {
                 throw new EntitlementRepairException(ErrorCode.ENT_REPAIR_UNKNOWN_BUNDLE, descBundle);
             }
-            final List<Subscription> subscriptions = dao.getSubscriptions(factory, bundle.getId(), internalCallContextFactory.createInternalTenantContext(context));
+            final List<SubscriptionDataRepair> subscriptions = convertToSubscriptionsDataRepair(dao.getSubscriptions(bundle.getId(), internalCallContextFactory.createInternalTenantContext(context)));
             if (subscriptions.size() == 0) {
                 throw new EntitlementRepairException(ErrorCode.ENT_REPAIR_NO_ACTIVE_SUBSCRIPTIONS, bundle.getId());
             }
@@ -113,6 +125,18 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
         }
     }
 
+    private List<SubscriptionDataRepair> convertToSubscriptionsDataRepair(List<Subscription> input) {
+        return new ArrayList<SubscriptionDataRepair>(Collections2.transform(input, new Function<Subscription, SubscriptionDataRepair>() {
+            @Override
+            public SubscriptionDataRepair apply(@Nullable final Subscription subscription) {
+                return convertToSubscriptionDataRepair((SubscriptionData) subscription);
+            }
+        }));
+    }
+    private SubscriptionDataRepair convertToSubscriptionDataRepair(SubscriptionData input) {
+        return new SubscriptionDataRepair(input, repairApiService, (EntitlementDao) repairDao, clock, addonUtils, catalogService, internalCallContextFactory);
+    }
+
     @Override
     public BundleTimeline repairBundle(final BundleTimeline input, final boolean dryRun, final CallContext context) throws EntitlementRepairException {
         final InternalTenantContext tenantContext = internalCallContextFactory.createInternalTenantContext(context);
@@ -123,7 +147,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
             }
 
             // Subscriptions are ordered with BASE subscription first-- if exists
-            final List<Subscription> subscriptions = dao.getSubscriptions(factory, input.getId(), tenantContext);
+            final List<SubscriptionDataRepair> subscriptions = convertToSubscriptionsDataRepair(dao.getSubscriptions(input.getId(), tenantContext));
             if (subscriptions.size() == 0) {
                 throw new EntitlementRepairException(ErrorCode.ENT_REPAIR_NO_ACTIVE_SUBSCRIPTIONS, input.getId());
             }
@@ -252,7 +276,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
         }
     }
 
-    private void validateBasePlanRecreate(final boolean isBasePlanRecreate, final List<Subscription> subscriptions, final List<SubscriptionTimeline> input)
+    private void validateBasePlanRecreate(final boolean isBasePlanRecreate, final List<SubscriptionDataRepair> subscriptions, final List<SubscriptionTimeline> input)
             throws EntitlementRepairException {
         if (!isBasePlanRecreate) {
             return;
@@ -269,7 +293,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
         }
     }
 
-    private void validateInputSubscriptionsKnown(final List<Subscription> subscriptions, final List<SubscriptionTimeline> input)
+    private void validateInputSubscriptionsKnown(final List<SubscriptionDataRepair> subscriptions, final List<SubscriptionTimeline> input)
             throws EntitlementRepairException {
         for (final SubscriptionTimeline cur : input) {
             boolean found = false;
@@ -365,7 +389,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
         return result;
     }
 
-    private String getViewId(final DateTime lastUpdateBundleDate, final List<Subscription> subscriptions) {
+    private String getViewId(final DateTime lastUpdateBundleDate, final List<SubscriptionDataRepair> subscriptions) {
         final StringBuilder tmp = new StringBuilder();
         long lastOrderedId = -1;
         for (final Subscription cur : subscriptions) {
@@ -412,7 +436,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
         };
     }
 
-    private List<SubscriptionTimeline> createGetSubscriptionRepairList(final List<Subscription> subscriptions, final List<SubscriptionTimeline> inRepair) throws CatalogApiException {
+    private List<SubscriptionTimeline> createGetSubscriptionRepairList(final List<SubscriptionDataRepair> subscriptions, final List<SubscriptionTimeline> inRepair) throws CatalogApiException {
 
         final List<SubscriptionTimeline> result = new LinkedList<SubscriptionTimeline>();
         final Set<UUID> repairIds = new TreeSet<UUID>();
@@ -464,7 +488,9 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
             }
         }
 
-        return (SubscriptionDataRepair) factory.createSubscription(builder, initialEvents);
+        final SubscriptionDataRepair subscriptiondataRepair = new SubscriptionDataRepair(builder, curData.getEvents(), repairApiService, (EntitlementDao) repairDao, clock, addonUtils, catalogService, internalCallContextFactory);
+        subscriptiondataRepair.rebuildTransitions(curData.getEvents(), catalogService.getFullCatalog());
+        return subscriptiondataRepair;
     }
 
     private SubscriptionTimeline findAndCreateSubscriptionRepair(final UUID target, final List<SubscriptionTimeline> input) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
index 1870bfe..9f5d397 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
@@ -16,12 +16,17 @@
 
 package com.ning.billing.entitlement.api.timeline;
 
+import org.joda.time.DateTime;
+
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
+import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.glue.DefaultEntitlementModule;
+import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
 
@@ -35,7 +40,14 @@ public class RepairSubscriptionApiService extends DefaultSubscriptionApiService 
                                         @Named(DefaultEntitlementModule.REPAIR_NAMED) final EntitlementDao dao,
                                         final CatalogService catalogService,
                                         final PlanAligner planAligner,
+                                        final AddonUtils addonUtils,
                                         final InternalCallContextFactory internalCallContextFactory) {
-        super(clock, dao, catalogService, planAligner, internalCallContextFactory);
+        super(clock, dao, catalogService, planAligner, addonUtils, internalCallContextFactory);
+    }
+
+    // Nothing to do for repair as we pass all the repair events in the stream
+    @Override
+    public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context) {
+        return 0;
     }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
index 301f15f..310469c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
@@ -24,7 +24,6 @@ import org.joda.time.DateTime;
 
 import com.ning.billing.ErrorCode;
 import com.ning.billing.ObjectType;
-import com.ning.billing.catalog.api.Catalog;
 import com.ning.billing.catalog.api.CatalogApiException;
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.Plan;
@@ -33,8 +32,8 @@ import com.ning.billing.catalog.api.Product;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.api.user.SubscriptionTransition;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
@@ -59,8 +58,6 @@ public class SubscriptionDataRepair extends SubscriptionData {
     private final List<EntitlementEvent> initialEvents;
     private final InternalCallContextFactory internalCallContextFactory;
 
-    // Low level events are ONLY used for Repair APIs
-    private List<EntitlementEvent> events;
 
     public SubscriptionDataRepair(final SubscriptionBuilder builder, final List<EntitlementEvent> initialEvents, final SubscriptionApiService apiService,
                                   final EntitlementDao dao, final Clock clock, final AddonUtils addonUtils, final CatalogService catalogService,
@@ -74,6 +71,20 @@ public class SubscriptionDataRepair extends SubscriptionData {
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
+
+
+    public SubscriptionDataRepair(final SubscriptionData subscriptionData, final SubscriptionApiService apiService,
+                                  final EntitlementDao dao, final Clock clock, final AddonUtils addonUtils, final CatalogService catalogService,
+                                  final InternalCallContextFactory internalCallContextFactory) {
+        super(subscriptionData, apiService , clock);
+        this.repairDao = dao;
+        this.addonUtils = addonUtils;
+        this.clock = clock;
+        this.catalogService = catalogService;
+        this.initialEvents = subscriptionData.getEvents();
+        this.internalCallContextFactory = internalCallContextFactory;
+    }
+
     DateTime getLastUserEventEffectiveDate() {
         DateTime res = null;
         for (final EntitlementEvent cur : events) {
@@ -185,12 +196,6 @@ public class SubscriptionDataRepair extends SubscriptionData {
         }
     }
 
-    @Override
-    public void rebuildTransitions(final List<EntitlementEvent> inputEvents, final Catalog catalog) {
-        this.events = inputEvents;
-        super.rebuildTransitions(inputEvents, catalog);
-    }
-
     public List<EntitlementEvent> getEvents() {
         return events;
     }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java
index 466d466..651b297 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java
@@ -29,7 +29,8 @@ import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.PlanPhase;
 import com.ning.billing.catalog.api.PlanPhaseSpecifier;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
 import com.ning.billing.entitlement.api.timeline.BundleTimeline;
@@ -37,7 +38,8 @@ import com.ning.billing.entitlement.api.timeline.EntitlementRepairException;
 import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
 import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline;
 import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.ExistingEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
@@ -58,22 +60,17 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 
-public class DefaultEntitlementTransferApi implements EntitlementTransferApi {
+public class DefaultEntitlementTransferApi extends EntitlementApiBase implements EntitlementTransferApi {
 
-    private final Clock clock;
-    private final EntitlementDao dao;
     private final CatalogService catalogService;
-    private final SubscriptionFactory subscriptionFactory;
     private final EntitlementTimelineApi timelineApi;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
     public DefaultEntitlementTransferApi(final Clock clock, final EntitlementDao dao, final EntitlementTimelineApi timelineApi, final CatalogService catalogService,
-                                         final SubscriptionFactory subscriptionFactory, final InternalCallContextFactory internalCallContextFactory) {
-        this.clock = clock;
-        this.dao = dao;
+                                         final SubscriptionApiService apiService, final InternalCallContextFactory internalCallContextFactory) {
+        super(dao, apiService, clock, catalogService);
         this.catalogService = catalogService;
-        this.subscriptionFactory = subscriptionFactory;
         this.timelineApi = timelineApi;
         this.internalCallContextFactory = internalCallContextFactory;
     }
@@ -220,7 +217,7 @@ public class DefaultEntitlementTransferApi implements EntitlementTransferApi {
             DateTime bundleStartdate = null;
 
             for (final SubscriptionTimeline cur : bundleTimeline.getSubscriptions()) {
-                final SubscriptionData oldSubscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, cur.getId(), fromInternalCallContext);
+                final SubscriptionData oldSubscription = (SubscriptionData) dao.getSubscriptionFromId(cur.getId(), fromInternalCallContext);
                 final List<ExistingEvent> existingEvents = cur.getExistingEvents();
                 final ProductCategory productCategory = existingEvents.get(0).getPlanPhaseSpecifier().getProductCategory();
                 if (productCategory == ProductCategory.ADD_ON) {
@@ -254,13 +251,13 @@ public class DefaultEntitlementTransferApi implements EntitlementTransferApi {
                 }
 
                 // Create the new subscription for the new bundle on the new account
-                final SubscriptionData subscriptionData = subscriptionFactory.createSubscription(new SubscriptionBuilder()
-                                                                                                         .setId(UUID.randomUUID())
-                                                                                                         .setBundleId(subscriptionBundleData.getId())
-                                                                                                         .setCategory(productCategory)
-                                                                                                         .setBundleStartDate(effectiveTransferDate)
-                                                                                                         .setAlignStartDate(subscriptionAlignStartDate),
-                                                                                                 ImmutableList.<EntitlementEvent>of());
+                final SubscriptionData subscriptionData = createSubscriptionForApiUse(new SubscriptionBuilder()
+                                                                                              .setId(UUID.randomUUID())
+                                                                                              .setBundleId(subscriptionBundleData.getId())
+                                                                                              .setCategory(productCategory)
+                                                                                              .setBundleStartDate(effectiveTransferDate)
+                                                                                              .setAlignStartDate(subscriptionAlignStartDate),
+                                                                                      ImmutableList.<EntitlementEvent>of());
 
                 final List<EntitlementEvent> events = toEvents(existingEvents, subscriptionData, effectiveTransferDate, context);
                 final SubscriptionMigrationData curData = new SubscriptionMigrationData(subscriptionData, events, null);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
index 32d968c..4272038 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 public class DefaultEffectiveSubscriptionEvent extends DefaultSubscriptionEvent implements EffectiveSubscriptionInternalEvent {
+
     public DefaultEffectiveSubscriptionEvent(final SubscriptionTransitionData in, final DateTime startDate, final Long accountRecordId, final Long tenantRecordId) {
         super(in, startDate, accountRecordId, tenantRecordId);
     }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
index d7b8d6e..d3d128c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
@@ -33,8 +33,7 @@ import com.ning.billing.catalog.api.PlanPhase;
 import com.ning.billing.catalog.api.PlanPhaseSpecifier;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
 import com.ning.billing.entitlement.api.user.SubscriptionStatusDryRun.DryRunChangeReason;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
@@ -48,26 +47,19 @@ import com.ning.billing.util.clock.DefaultClock;
 
 import com.google.inject.Inject;
 
-public class DefaultEntitlementUserApi implements EntitlementUserApi {
+public class DefaultEntitlementUserApi extends EntitlementApiBase implements EntitlementUserApi {
 
-    private final Clock clock;
-    private final EntitlementDao dao;
     private final CatalogService catalogService;
-    private final DefaultSubscriptionApiService apiService;
     private final AddonUtils addonUtils;
-    private final SubscriptionFactory subscriptionFactory;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
     public DefaultEntitlementUserApi(final Clock clock, final EntitlementDao dao, final CatalogService catalogService,
-                                     final DefaultSubscriptionApiService apiService, final SubscriptionFactory subscriptionFactory,
+                                     final DefaultSubscriptionApiService apiService,
                                      final AddonUtils addonUtils, final InternalCallContextFactory internalCallContextFactory) {
-        this.clock = clock;
-        this.apiService = apiService;
-        this.dao = dao;
+        super(dao, apiService, clock, catalogService);
         this.catalogService = catalogService;
         this.addonUtils = addonUtils;
-        this.subscriptionFactory = subscriptionFactory;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
@@ -82,11 +74,11 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
 
     @Override
     public Subscription getSubscriptionFromId(final UUID id, final TenantContext context) throws EntitlementUserApiException {
-        final Subscription result = dao.getSubscriptionFromId(subscriptionFactory, id, internalCallContextFactory.createInternalTenantContext(context));
+        final Subscription result = dao.getSubscriptionFromId(id, internalCallContextFactory.createInternalTenantContext(context));
         if (result == null) {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_SUBSCRIPTION_ID, id);
         }
-        return result;
+        return createSubscriptionForApiUse(result);
     }
 
     @Override
@@ -111,23 +103,26 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
 
     @Override
     public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId, final String bundleKey, final TenantContext context) {
-        return dao.getSubscriptionsForAccountAndKey(subscriptionFactory, accountId, bundleKey, internalCallContextFactory.createInternalTenantContext(context));
+        final List<Subscription> internalSubscriptions = dao.getSubscriptionsForAccountAndKey(accountId, bundleKey, internalCallContextFactory.createInternalTenantContext(context));
+        return createSubscriptionsForApiUse(internalSubscriptions);
     }
 
     @Override
     public List<Subscription> getSubscriptionsForBundle(final UUID bundleId, final TenantContext context) {
-        return dao.getSubscriptions(subscriptionFactory, bundleId, internalCallContextFactory.createInternalTenantContext(context));
+        final List<Subscription> internalSubscriptions = dao.getSubscriptions(bundleId, internalCallContextFactory.createInternalTenantContext(context));
+        return createSubscriptionsForApiUse(internalSubscriptions);
     }
 
     @Override
     public Subscription getBaseSubscription(final UUID bundleId, final TenantContext context) throws EntitlementUserApiException {
-        final Subscription result = dao.getBaseSubscription(subscriptionFactory, bundleId, internalCallContextFactory.createInternalTenantContext(context));
+        final Subscription result = dao.getBaseSubscription(bundleId, internalCallContextFactory.createInternalTenantContext(context));
         if (result == null) {
             throw new EntitlementUserApiException(ErrorCode.ENT_GET_NO_SUCH_BASE_SUBSCRIPTION, bundleId);
         }
-        return result;
+        return createSubscriptionForApiUse(result);
     }
 
+
     @Override
     public SubscriptionBundle createBundleForAccount(final UUID accountId, final String bundleName, final CallContext context)
             throws EntitlementUserApiException {
@@ -162,7 +157,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
             }
 
             DateTime bundleStartDate = null;
-            final SubscriptionData baseSubscription = (SubscriptionData) dao.getBaseSubscription(subscriptionFactory, bundleId, internalCallContextFactory.createInternalTenantContext(context));
+            final SubscriptionData baseSubscription = (SubscriptionData) dao.getBaseSubscription(bundleId, internalCallContextFactory.createInternalTenantContext(context));
             switch (plan.getProduct().getCategory()) {
                 case BASE:
                     if (baseSubscription != null) {
@@ -231,7 +226,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
     public List<SubscriptionStatusDryRun> getDryRunChangePlanStatus(final UUID subscriptionId, @Nullable final String baseProductName,
                                                                     final DateTime requestedDate, final TenantContext context)
             throws EntitlementUserApiException {
-        final Subscription subscription = dao.getSubscriptionFromId(subscriptionFactory, subscriptionId, internalCallContextFactory.createInternalTenantContext(context));
+        final Subscription subscription = dao.getSubscriptionFromId(subscriptionId, internalCallContextFactory.createInternalTenantContext(context));
         if (subscription == null) {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_SUBSCRIPTION_ID, subscriptionId);
         }
@@ -241,7 +236,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
 
         final List<SubscriptionStatusDryRun> result = new LinkedList<SubscriptionStatusDryRun>();
 
-        final List<Subscription> bundleSubscriptions = dao.getSubscriptions(subscriptionFactory, subscription.getBundleId(), internalCallContextFactory.createInternalTenantContext(context));
+        final List<Subscription> bundleSubscriptions = dao.getSubscriptions(subscription.getBundleId(), internalCallContextFactory.createInternalTenantContext(context));
         for (final Subscription cur : bundleSubscriptions) {
             if (cur.getId().equals(subscriptionId)) {
                 continue;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
index 3dd5850..d60a898 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
@@ -17,6 +17,8 @@
 package com.ning.billing.entitlement.api.user;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 
@@ -37,11 +39,12 @@ import com.ning.billing.catalog.api.PlanSpecifier;
 import com.ning.billing.catalog.api.PriceList;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.Product;
+import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.alignment.TimedPhase;
 import com.ning.billing.entitlement.api.SubscriptionApiService;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.phase.PhaseEvent;
@@ -68,18 +71,22 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
     private final EntitlementDao dao;
     private final CatalogService catalogService;
     private final PlanAligner planAligner;
+    private final AddonUtils addonUtils;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
     public DefaultSubscriptionApiService(final Clock clock, final EntitlementDao dao, final CatalogService catalogService,
-                                         final PlanAligner planAligner, final InternalCallContextFactory internalCallContextFactory) {
+                                         final PlanAligner planAligner, final AddonUtils addonUtils,
+                                         final InternalCallContextFactory internalCallContextFactory) {
         this.clock = clock;
         this.catalogService = catalogService;
         this.planAligner = planAligner;
         this.dao = dao;
+        this.addonUtils = addonUtils;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
+
     @Override
     public SubscriptionData createPlan(final SubscriptionBuilder builder, final Plan plan, final PhaseType initialPhase,
                                        final String realPriceList, final DateTime requestedDate, final DateTime effectiveDate, final DateTime processedDate,
@@ -215,6 +222,9 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
         final InternalCallContext internalCallContext = createCallContextFromBundleId(subscription.getBundleId(), context);
         dao.cancelSubscription(subscription, cancelEvent, internalCallContext, 0);
         subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId(), internalCallContext), catalogService.getFullCatalog());
+
+        cancelAddOnsIfRequired(subscription, effectiveDate, internalCallContext);
+
         return (policy == ActionPolicy.IMMEDIATE);
     }
 
@@ -354,9 +364,58 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
         dao.changePlan(subscription, changeEvents, internalCallContext);
         subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId(), internalCallContext), catalogService.getFullCatalog());
 
+        cancelAddOnsIfRequired(subscription, effectiveDate, internalCallContext);
+
         return (policy == ActionPolicy.IMMEDIATE);
     }
 
+
+    public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context) {
+
+        // If cancellation/change occur in the future, there is nothing to do
+        final DateTime now = clock.getUTCNow();
+        if (effectiveDate.compareTo(now) > 0) {
+            return 0;
+        }
+
+        final Product baseProduct = (baseSubscription.getState() == SubscriptionState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
+
+        final List<Subscription> subscriptions = dao.getSubscriptions(baseSubscription.getBundleId(), context);
+
+        final List<SubscriptionData> subscriptionsToBeCancelled = new LinkedList<SubscriptionData>();
+        final List<EntitlementEvent> cancelEvents = new LinkedList<EntitlementEvent>();
+
+        for (final Subscription subscription : subscriptions) {
+            final SubscriptionData cur = (SubscriptionData) subscription;
+            if (cur.getState() == SubscriptionState.CANCELLED ||
+                cur.getCategory() != ProductCategory.ADD_ON) {
+                continue;
+            }
+
+            final Plan addonCurrentPlan = cur.getCurrentPlan();
+            if (baseProduct == null ||
+                addonUtils.isAddonIncluded(baseProduct, addonCurrentPlan) ||
+                !addonUtils.isAddonAvailable(baseProduct, addonCurrentPlan)) {
+                //
+                // Perform AO cancellation using the effectiveDate of the BP
+                //
+                final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
+                                                                                .setSubscriptionId(cur.getId())
+                                                                                .setActiveVersion(cur.getActiveVersion())
+                                                                                .setProcessedDate(now)
+                                                                                .setEffectiveDate(effectiveDate)
+                                                                                .setRequestedDate(now)
+                                                                                .setUserToken(context.getUserToken())
+                                                                                .setFromDisk(true));
+                subscriptionsToBeCancelled.add(cur);
+                cancelEvents.add(cancelEvent);
+            }
+        }
+
+        dao.cancelSubscriptions(subscriptionsToBeCancelled, cancelEvents, context);
+        return subscriptionsToBeCancelled.size();
+    }
+
     private void validateRequestedDate(final SubscriptionData subscription, final DateTime now, final DateTime requestedDate)
             throws EntitlementUserApiException {
 
@@ -364,7 +423,7 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_FUTURE_DATE, requestedDate.toString());
         }
 
-        final SubscriptionTransition  previousTransition = subscription.getPreviousTransition();
+        final SubscriptionTransition previousTransition = subscription.getPreviousTransition();
         if (previousTransition != null && previousTransition.getEffectiveTransitionTime().isAfter(requestedDate)) {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_DATE,
                                                   requestedDate.toString(), previousTransition.getEffectiveTransitionTime());
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
index 2903504..7d88024 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
@@ -39,7 +39,6 @@ import com.ning.billing.catalog.api.PriceList;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionDataIterator.Kind;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionDataIterator.Order;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionDataIterator.TimeLimit;
@@ -85,6 +84,14 @@ public class SubscriptionData extends EntityBase implements Subscription {
     //
     private LinkedList<SubscriptionTransitionData> transitions;
 
+    // Low level events are ONLY used for Repair APIs
+    protected List<EntitlementEvent> events;
+
+
+    public List<EntitlementEvent> getEvents() {
+        return events;
+    }
+
     // Transient object never returned at the API
     public SubscriptionData(final SubscriptionBuilder builder) {
         this(builder, null, null);
@@ -103,6 +110,22 @@ public class SubscriptionData extends EntityBase implements Subscription {
         this.paidThroughDate = builder.getPaidThroughDate();
     }
 
+    // Used for API to make sure we have a clock and an apiService set before we return the object
+    public SubscriptionData(final SubscriptionData internalSubscription, final SubscriptionApiService apiService, final Clock clock) {
+        super(internalSubscription.getId(), internalSubscription.getCreatedDate(), internalSubscription.getUpdatedDate());
+        this.apiService = apiService;
+        this.clock = clock;
+        this.bundleId = internalSubscription.getBundleId();
+        this.alignStartDate = internalSubscription.getAlignStartDate();
+        this.bundleStartDate = internalSubscription.getBundleStartDate();
+        this.category = internalSubscription.getCategory();
+        this.activeVersion = internalSubscription.getActiveVersion();
+        this.chargedThroughDate = internalSubscription.getChargedThroughDate();
+        this.paidThroughDate = internalSubscription.getPaidThroughDate();
+        this.transitions = new LinkedList<SubscriptionTransitionData>(internalSubscription.getAllTransitions());
+        this.events = internalSubscription.getEvents();
+    }
+
     @Override
     public UUID getBundleId() {
         return bundleId;
@@ -484,13 +507,14 @@ public class SubscriptionData extends EntityBase implements Subscription {
                 "Failed to find CurrentPhaseStart id = %s", getId().toString()));
     }
 
-    public void rebuildTransitions(final List<EntitlementEvent> inputEvents,
-            final Catalog catalog) {
+    public void rebuildTransitions(final List<EntitlementEvent> inputEvents, final Catalog catalog) {
 
         if (inputEvents == null) {
             return;
         }
 
+        this.events = inputEvents;
+
         SubscriptionState nextState = null;
         String nextPlanName = null;
         String nextPhaseName = null;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 10ac6c9..ce206ed 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -28,12 +28,10 @@ import org.slf4j.LoggerFactory;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.Product;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.util.config.EntitlementConfig;
-import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.alignment.TimedPhase;
 import com.ning.billing.entitlement.api.EntitlementService;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
 import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
@@ -80,29 +78,25 @@ public class Engine implements EventListener, EntitlementService {
     private final PlanAligner planAligner;
     private final AddonUtils addonUtils;
     private final InternalBus eventBus;
-    private final EntitlementConfig config;
     private final NotificationQueueService notificationQueueService;
-    private final SubscriptionFactory subscriptionFactory;
     private final InternalCallContextFactory internalCallContextFactory;
-
     private NotificationQueue subscriptionEventQueue;
+    private final SubscriptionApiService apiService;
 
     @Inject
     public Engine(final Clock clock, final EntitlementDao dao, final PlanAligner planAligner,
-                  final EntitlementConfig config,
                   final AddonUtils addonUtils, final InternalBus eventBus,
                   final NotificationQueueService notificationQueueService,
-                  final SubscriptionFactory subscriptionFactory,
-                  final InternalCallContextFactory internalCallContextFactory) {
+                  final InternalCallContextFactory internalCallContextFactory,
+                  final SubscriptionApiService apiService) {
         this.clock = clock;
         this.dao = dao;
         this.planAligner = planAligner;
         this.addonUtils = addonUtils;
-        this.config = config;
         this.eventBus = eventBus;
         this.notificationQueueService = notificationQueueService;
-        this.subscriptionFactory = subscriptionFactory;
         this.internalCallContextFactory = internalCallContextFactory;
+        this.apiService = apiService;
     }
 
     @Override
@@ -135,22 +129,9 @@ public class Engine implements EventListener, EntitlementService {
                 }
             };
 
-            final NotificationConfig notificationConfig = new NotificationConfig() {
-                @Override
-                public long getSleepTimeMs() {
-                    return config.getSleepTimeMs();
-                }
-
-                @Override
-                public boolean isNotificationProcessingOff() {
-                    return config.isNotificationProcessingOff();
-                }
-            };
-
             subscriptionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
                                                                                       NOTIFICATION_QUEUE_NAME,
-                                                                                      queueHandler,
-                                                                                      notificationConfig);
+                                                                                      queueHandler);
         } catch (NotificationQueueAlreadyExists e) {
             throw new RuntimeException(e);
         }
@@ -175,7 +156,7 @@ public class Engine implements EventListener, EntitlementService {
             return;
         }
 
-        final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, event.getSubscriptionId(), context);
+        final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(event.getSubscriptionId(), context);
         if (subscription == null) {
             log.warn("Failed to retrieve subscription for id %s", event.getSubscriptionId());
             return;
@@ -188,7 +169,6 @@ public class Engine implements EventListener, EntitlementService {
         //
         // Do any internal processing on that event before we send the event to the bus
         //
-
         int theRealSeqId = seqId;
         if (event.getType() == EventType.PHASE) {
             onPhaseEvent(subscription, context);
@@ -199,7 +179,7 @@ public class Engine implements EventListener, EntitlementService {
         try {
             final SubscriptionTransitionData transition = (subscription.getTransitionFromEvent(event, theRealSeqId));
             final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, subscription.getAlignStartDate(),
-                    context.getAccountRecordId(), context.getTenantRecordId());
+                                                                                                      context.getAccountRecordId(), context.getTenantRecordId());
             eventBus.post(busEvent, context);
         } catch (EventBusException e) {
             log.warn("Failed to post entitlement event " + event, e);
@@ -222,47 +202,8 @@ public class Engine implements EventListener, EntitlementService {
     }
 
     private int onBasePlanEvent(final SubscriptionData baseSubscription, final ApiEvent event, final InternalCallContext context) {
-        final DateTime now = clock.getUTCNow();
-        final Product baseProduct = (baseSubscription.getState() == SubscriptionState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
-
-        final List<Subscription> subscriptions = dao.getSubscriptions(subscriptionFactory, baseSubscription.getBundleId(), context);
-
-        final Map<UUID, EntitlementEvent> addOnCancellations = new HashMap<UUID, EntitlementEvent>();
-        final Map<UUID, SubscriptionData> addOnCancellationSubscriptions = new HashMap<UUID, SubscriptionData>();
-        for (final Subscription subscription : subscriptions) {
-            final SubscriptionData cur = (SubscriptionData) subscription;
-            if (cur.getState() == SubscriptionState.CANCELLED ||
-                cur.getCategory() != ProductCategory.ADD_ON) {
-                continue;
-            }
+        return apiService.cancelAddOnsIfRequired(baseSubscription, event.getEffectiveDate(), context);
+    }
 
-            final Plan addonCurrentPlan = cur.getCurrentPlan();
-            if (baseProduct == null ||
-                addonUtils.isAddonIncluded(baseProduct, addonCurrentPlan) ||
-                !addonUtils.isAddonAvailable(baseProduct, addonCurrentPlan)) {
-                //
-                // Perform AO cancellation using the effectiveDate of the BP
-                //
-                final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
-                                                                                .setSubscriptionId(cur.getId())
-                                                                                .setActiveVersion(cur.getActiveVersion())
-                                                                                .setProcessedDate(now)
-                                                                                .setEffectiveDate(event.getEffectiveDate())
-                                                                                .setRequestedDate(now)
-                                                                                .setUserToken(context.getUserToken())
-                                                                                .setFromDisk(true));
 
-                addOnCancellations.put(cur.getId(), cancelEvent);
-                addOnCancellationSubscriptions.put(cur.getId(), cur);
-            }
-        }
-
-        final int addOnSize = addOnCancellations.size();
-        int cancelSeq = addOnSize - 1;
-        for (final UUID key : addOnCancellations.keySet()) {
-            dao.cancelSubscription(addOnCancellationSubscriptions.get(key), addOnCancellations.get(key), context, cancelSeq);
-            cancelSeq--;
-        }
-        return addOnSize;
-    }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
index c033364..7f077ce 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2010-2011 Ning, Inc.
+ * Copyright 2010-2012 Ning, Inc.
  *
  * Ning licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -39,19 +40,20 @@ import com.ning.billing.ErrorCode;
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
 import com.ning.billing.entitlement.api.timeline.DefaultRepairEntitlementEvent;
 import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
 import com.ning.billing.entitlement.api.transfer.TransferCancelData;
+import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
 import com.ning.billing.entitlement.api.user.DefaultRequestedSubscriptionEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.core.Engine;
 import com.ning.billing.entitlement.engine.core.EntitlementNotificationKey;
@@ -76,6 +78,7 @@ import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import com.ning.billing.util.events.EffectiveSubscriptionInternalEvent;
 import com.ning.billing.util.events.RepairEntitlementInternalEvent;
 import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
@@ -98,6 +101,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
     private final NotificationQueueService notificationQueueService;
     private final AddonUtils addonUtils;
     private final InternalBus eventBus;
+    private final CatalogService catalogService;
 
     @Inject
     public DefaultEntitlementDao(final IDBI dbi, final Clock clock, final AddonUtils addonUtils,
@@ -107,6 +111,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
         this.notificationQueueService = notificationQueueService;
         this.addonUtils = addonUtils;
         this.eventBus = eventBus;
+        this.catalogService = catalogService;
     }
 
     @Override
@@ -206,29 +211,26 @@ public class DefaultEntitlementDao implements EntitlementDao {
     }
 
     @Override
-    public Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
-        return getBaseSubscription(factory, bundleId, true, context);
+    public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) {
+        return getBaseSubscription(bundleId, true, context);
     }
 
     @Override
-    public Subscription getSubscriptionFromId(final SubscriptionFactory factory, final UUID subscriptionId, final InternalTenantContext context) {
-        return buildSubscription(factory, getSubscriptionFromId(subscriptionId, context), context);
-    }
-
-    private Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
-        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Subscription>() {
+    public Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
+        final Subscription shellSubscription = transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Subscription>() {
             @Override
             public Subscription inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
                 final SubscriptionModelDao model = entitySqlDaoWrapperFactory.become(SubscriptionSqlDao.class).getById(subscriptionId.toString(), context);
                 return SubscriptionModelDao.toSubscription(model);
             }
         });
+        return buildSubscription(shellSubscription, context);
     }
 
 
     @Override
-    public List<Subscription> getSubscriptions(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
-        return buildBundleSubscriptions(bundleId, factory, getSubscriptionFromBundleId(bundleId, context), context);
+    public List<Subscription> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
+        return buildBundleSubscriptions(bundleId, getSubscriptionFromBundleId(bundleId, context), context);
     }
 
     private List<Subscription> getSubscriptionFromBundleId(final UUID bundleId, final InternalTenantContext context) {
@@ -248,7 +250,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
 
 
     @Override
-    public List<Subscription> getSubscriptionsForAccountAndKey(final SubscriptionFactory factory, final UUID accountId,
+    public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId,
                                                                final String bundleKey, final InternalTenantContext context) {
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<Subscription>>() {
             @Override
@@ -257,7 +259,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
                 if (bundleModel == null) {
                     return Collections.emptyList();
                 }
-                return getSubscriptions(factory, bundleModel.getId(), context);
+                return getSubscriptions(bundleModel.getId(), context);
             }
         });
     }
@@ -389,10 +391,10 @@ public class DefaultEntitlementDao implements EntitlementDao {
                 final EntitlementEventSqlDao eventsDaoFromSameTransaction = entitySqlDaoWrapperFactory.become(EntitlementEventSqlDao.class);
                 for (final EntitlementEvent cur : initialEvents) {
                     eventsDaoFromSameTransaction.create(new EntitlementEventModelDao(cur), context);
-                    recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                            cur.getEffectiveDate(),
-                                                            new EntitlementNotificationKey(cur.getId()),
-                                                            context);
+
+                    final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+                    recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
+
                 }
                 // Notify the Bus of the latest requested change, if needed
                 if (initialEvents.size() > 0) {
@@ -403,6 +405,8 @@ public class DefaultEntitlementDao implements EntitlementDao {
         });
     }
 
+
+
     @Override
     public void recreateSubscription(final SubscriptionData subscription, final List<EntitlementEvent> recreateEvents, final InternalCallContext context) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -412,11 +416,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
 
                 for (final EntitlementEvent cur : recreateEvents) {
                     transactional.create(new EntitlementEventModelDao(cur), context);
-                    recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                            cur.getEffectiveDate(),
-                                                            new EntitlementNotificationKey(cur.getId()),
-                                                            context);
 
+                    final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+                    recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
                 }
 
                 // Notify the Bus of the latest requested change
@@ -428,6 +430,24 @@ public class DefaultEntitlementDao implements EntitlementDao {
     }
 
     @Override
+    public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+
+        transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+            @Override
+            public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                for (int i = 0; i < subscriptions.size(); i++) {
+                    final SubscriptionData subscription = subscriptions.get(i);
+                    final EntitlementEvent cancelEvent = cancelEvents.get(i);
+                    cancelSubscriptionFromTransaction(subscription, cancelEvent, entitySqlDaoWrapperFactory, context, i);
+                }
+                return null;
+            }
+        });
+    }
+
+
+
+    @Override
     public void cancelSubscription(final SubscriptionData subscription, final EntitlementEvent cancelEvent, final InternalCallContext context, final int seqId) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
             @Override
@@ -498,10 +518,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
                 for (final EntitlementEvent cur : changeEventsTweakedWithMigrateBilling) {
 
                     transactional.create(new EntitlementEventModelDao(cur), context);
-                    recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                            cur.getEffectiveDate(),
-                                                            new EntitlementNotificationKey(cur.getId()),
-                                                            context);
+
+                    final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+                    recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
                 }
 
                 // Notify the Bus of the latest requested change
@@ -608,10 +627,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
         final UUID subscriptionId = subscription.getId();
         cancelFutureEventsFromTransaction(subscriptionId, entitySqlDaoWrapperFactory, context);
         entitySqlDaoWrapperFactory.become(EntitlementEventSqlDao.class).create(new EntitlementEventModelDao(cancelEvent), context);
-        recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                cancelEvent.getEffectiveDate(),
-                                                new EntitlementNotificationKey(cancelEvent.getId(), seqId),
-                                                context);
+
+        final boolean isBusEvent = cancelEvent.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0;
+        recordBusOrFutureNotificationFromTransaction(subscription, cancelEvent, entitySqlDaoWrapperFactory, isBusEvent, seqId, context);
 
         // Notify the Bus of the requested change
         notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, cancelEvent, context);
@@ -663,14 +681,14 @@ public class DefaultEntitlementDao implements EntitlementDao {
         }
     }
 
-    private Subscription buildSubscription(final SubscriptionFactory factory, final Subscription input, final InternalTenantContext context) {
+    private Subscription buildSubscription(final Subscription input, final InternalTenantContext context) {
         if (input == null) {
             return null;
         }
 
         final List<Subscription> bundleInput = new ArrayList<Subscription>();
         if (input.getCategory() == ProductCategory.ADD_ON) {
-            final Subscription baseSubscription = getBaseSubscription(factory, input.getBundleId(), false, context);
+            final Subscription baseSubscription = getBaseSubscription(input.getBundleId(), false, context);
             if (baseSubscription == null) {
                 return null;
             }
@@ -681,7 +699,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
             bundleInput.add(input);
         }
 
-        final List<Subscription> reloadedSubscriptions = buildBundleSubscriptions(input.getBundleId(), factory, bundleInput, context);
+        final List<Subscription> reloadedSubscriptions = buildBundleSubscriptions(input.getBundleId(), bundleInput, context);
         for (final Subscription cur : reloadedSubscriptions) {
             if (cur.getId().equals(input.getId())) {
                 return cur;
@@ -691,7 +709,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
         throw new EntitlementError("Unexpected code path in buildSubscription");
     }
 
-    private List<Subscription> buildBundleSubscriptions(final UUID bundleId, final SubscriptionFactory factory, final List<Subscription> input, final InternalTenantContext context) {
+    private List<Subscription> buildBundleSubscriptions(final UUID bundleId, final List<Subscription> input, final InternalTenantContext context) {
         if (input == null || input.size() == 0) {
             return Collections.emptyList();
         }
@@ -714,7 +732,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
         final List<Subscription> result = new ArrayList<Subscription>(input.size());
         for (final Subscription cur : input) {
             final List<EntitlementEvent> events = getEventsForSubscription(cur.getId(), context);
-            Subscription reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
+            Subscription reloaded = createSubscriptionForInternalUse(cur, events);
 
             switch (cur.getCategory()) {
                 case BASE:
@@ -752,7 +770,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
 
                         events.add(addOnCancelEvent);
                         // Finally reload subscription with full set of events
-                        reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
+                        reloaded = createSubscriptionForInternalUse(cur, events);
                     }
                     break;
                 default:
@@ -838,26 +856,58 @@ public class DefaultEntitlementDao implements EntitlementDao {
         });
     }
 
-    private Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final boolean rebuildSubscription, final InternalTenantContext context) {
+    private SubscriptionData createSubscriptionForInternalUse(final Subscription shellSubscription, final List<EntitlementEvent> events) {
+        final SubscriptionData result = new SubscriptionData(new SubscriptionBuilder(((SubscriptionData) shellSubscription)), null, clock);
+        if (events.size() > 0) {
+            result.rebuildTransitions(events, catalogService.getFullCatalog());
+        }
+        return result;
+    }
+
+    private Subscription getBaseSubscription(final UUID bundleId, final boolean rebuildSubscription, final InternalTenantContext context) {
         final List<Subscription> subscriptions = getSubscriptionFromBundleId(bundleId, context);
         for (final Subscription cur : subscriptions) {
             if (cur.getCategory() == ProductCategory.BASE) {
-                return rebuildSubscription ? buildSubscription(factory, cur, context) : cur;
+                return rebuildSubscription ? buildSubscription(cur, context) : cur;
             }
         }
         return null;
     }
 
-    private void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final DateTime effectiveDate,
-                                                         final NotificationKey notificationKey, final InternalCallContext context) {
+
+    //
+    // Either records a notfication or sends a bus event is operation is immediate
+    //
+    private void recordBusOrFutureNotificationFromTransaction(final SubscriptionData subscription, final EntitlementEvent event, final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final boolean busEvent,
+                                                              final int seqId, final InternalCallContext context) {
+        if (busEvent) {
+            notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, seqId, context);
+        } else {
+            recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
+                                                    event.getEffectiveDate(),
+                                                    new EntitlementNotificationKey(event.getId()),
+                                                    context);
+        }
+    }
+
+    //
+    // Sends bus notification for event on effecfive date-- only used for operation that happen immediately:
+    // - CREATE,
+    // - IMM CANCEL or CHANGE
+    //
+    private void notifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final SubscriptionData subscription,
+                                            final EntitlementEvent immediateEvent, final int seqId, final InternalCallContext context) {
         try {
-            final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
-                                                                                                           Engine.NOTIFICATION_QUEUE_NAME);
-            subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, notificationKey, context);
-        } catch (NoSuchNotificationQueue e) {
-            throw new RuntimeException(e);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+            final SubscriptionData upToDateSubscription = createSubscriptionWithNewEvent(subscription, immediateEvent);
+
+            final SubscriptionTransitionData transition = upToDateSubscription.getTransitionFromEvent(immediateEvent, seqId);
+            final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, upToDateSubscription.getAlignStartDate(),
+                                                                                                      context.getAccountRecordId(), context.getTenantRecordId());
+
+
+            eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory, context);
+        } catch (EventBusException e) {
+            log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
         }
     }
 
@@ -870,6 +920,18 @@ public class DefaultEntitlementDao implements EntitlementDao {
         }
     }
 
+    private void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final DateTime effectiveDate,
+                                                         final NotificationKey notificationKey, final InternalCallContext context) {
+        try {
+            final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+                                                                                                           Engine.NOTIFICATION_QUEUE_NAME);
+            subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, notificationKey, context);
+        } catch (NoSuchNotificationQueue e) {
+            throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
     private void migrateBundleDataFromTransaction(final BundleMigrationData bundleTransferData, final EntitlementEventSqlDao transactional,
                                                   final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final InternalCallContext context) throws EntityPersistenceException {
 
@@ -902,4 +964,20 @@ public class DefaultEntitlementDao implements EntitlementDao {
 
         transBundleDao.create(new SubscriptionBundleModelDao(bundleData), context);
     }
+
+    //
+    // Creates a copy of the existing subscriptions whose 'transitions' will reflect the new event
+    //
+    private SubscriptionData createSubscriptionWithNewEvent(final SubscriptionData subscription, EntitlementEvent newEvent) {
+
+        final SubscriptionData subscriptionWithNewEvent = new SubscriptionData(subscription, null, clock);
+        final List<EntitlementEvent> allEvents = new LinkedList<EntitlementEvent>();
+        if (subscriptionWithNewEvent.getEvents() != null) {
+            allEvents.addAll(subscriptionWithNewEvent.getEvents());
+        }
+        allEvents.add(newEvent);
+        subscriptionWithNewEvent.rebuildTransitions(allEvents, catalogService.getFullCatalog());
+        return subscriptionWithNewEvent;
+    }
+
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
index 1c17929..2b83ab2 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
@@ -20,7 +20,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import com.ning.billing.entitlement.api.SubscriptionFactory;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
@@ -46,17 +45,17 @@ public interface EntitlementDao {
 
     public SubscriptionBundle createSubscriptionBundle(SubscriptionBundleData bundle, InternalCallContext context);
 
-    public Subscription getSubscriptionFromId(SubscriptionFactory factory, UUID subscriptionId, InternalTenantContext context);
+    public Subscription getSubscriptionFromId(UUID subscriptionId, InternalTenantContext context);
 
     // ACCOUNT retrieval
     public UUID getAccountIdFromSubscriptionId(UUID subscriptionId, InternalTenantContext context);
 
     // Subscription retrieval
-    public Subscription getBaseSubscription(SubscriptionFactory factory, UUID bundleId, InternalTenantContext context);
+    public Subscription getBaseSubscription(UUID bundleId, InternalTenantContext context);
 
-    public List<Subscription> getSubscriptions(SubscriptionFactory factory, UUID bundleId, InternalTenantContext context);
+    public List<Subscription> getSubscriptions(UUID bundleId, InternalTenantContext context);
 
-    public List<Subscription> getSubscriptionsForAccountAndKey(SubscriptionFactory factory, UUID accountId, String bundleKey, InternalTenantContext context);
+    public List<Subscription> getSubscriptionsForAccountAndKey(UUID accountId, String bundleKey, InternalTenantContext context);
 
     // Update
     public void updateChargedThroughDate(SubscriptionData subscription, InternalCallContext context);
@@ -79,6 +78,8 @@ public interface EntitlementDao {
 
     public void cancelSubscription(SubscriptionData subscription, EntitlementEvent cancelEvent, InternalCallContext context, int cancelSeq);
 
+    public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context);
+
     public void uncancelSubscription(SubscriptionData subscription, List<EntitlementEvent> uncancelEvents, InternalCallContext context);
 
     public void changePlan(SubscriptionData subscription, List<EntitlementEvent> changeEvents, InternalCallContext context);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java
index 0f0e18d..b208f18 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java
@@ -21,8 +21,8 @@ import java.util.UUID;
 import org.joda.time.DateTime;
 
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.util.dao.TableName;
 import com.ning.billing.util.entity.EntityBase;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
index b4d3899..38695ac 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
@@ -27,7 +27,6 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
 
-import com.ning.billing.entitlement.api.SubscriptionFactory;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
@@ -176,6 +175,10 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
     }
 
     @Override
+    public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+    }
+
+    @Override
     public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents, final InternalCallContext context) {
         addEvents(subscription.getId(), changeEvents);
     }
@@ -228,7 +231,7 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
     }
 
     @Override
-    public Subscription getSubscriptionFromId(final SubscriptionFactory factory, final UUID subscriptionId, final InternalTenantContext context) {
+    public Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
         throw new EntitlementError(NOT_IMPLEMENTED);
     }
 
@@ -238,17 +241,17 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
     }
 
     @Override
-    public Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+    public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) {
         throw new EntitlementError(NOT_IMPLEMENTED);
     }
 
     @Override
-    public List<Subscription> getSubscriptions(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+    public List<Subscription> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
         throw new EntitlementError(NOT_IMPLEMENTED);
     }
 
     @Override
-    public List<Subscription> getSubscriptionsForAccountAndKey(final SubscriptionFactory factory, final UUID accountId,
+    public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId,
                                                                final String bundleKey, final InternalTenantContext context) {
         throw new EntitlementError(NOT_IMPLEMENTED);
     }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java b/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
index 459cae7..f3dc0b9 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
@@ -24,7 +24,6 @@ import com.ning.billing.entitlement.alignment.MigrationPlanAligner;
 import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.api.EntitlementService;
 import com.ning.billing.entitlement.api.SubscriptionApiService;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
 import com.ning.billing.entitlement.api.migration.DefaultEntitlementMigrationApi;
 import com.ning.billing.entitlement.api.migration.EntitlementMigrationApi;
 import com.ning.billing.entitlement.api.svcs.DefaultEntitlementInternalApi;
@@ -32,12 +31,10 @@ import com.ning.billing.entitlement.api.timeline.DefaultEntitlementTimelineApi;
 import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
 import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
 import com.ning.billing.entitlement.api.timeline.RepairSubscriptionApiService;
-import com.ning.billing.entitlement.api.timeline.RepairSubscriptionFactory;
 import com.ning.billing.entitlement.api.transfer.DefaultEntitlementTransferApi;
 import com.ning.billing.entitlement.api.transfer.EntitlementTransferApi;
 import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
 import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory;
 import com.ning.billing.entitlement.api.user.EntitlementUserApi;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.core.Engine;
@@ -68,9 +65,6 @@ public class DefaultEntitlementModule extends AbstractModule implements Entitlem
 
     protected void installEntitlementCore() {
 
-        bind(SubscriptionFactory.class).annotatedWith(Names.named(REPAIR_NAMED)).to(RepairSubscriptionFactory.class).asEagerSingleton();
-        bind(SubscriptionFactory.class).to(DefaultSubscriptionFactory.class).asEagerSingleton();
-
         bind(SubscriptionApiService.class).annotatedWith(Names.named(REPAIR_NAMED)).to(RepairSubscriptionApiService.class).asEagerSingleton();
         bind(SubscriptionApiService.class).to(DefaultSubscriptionApiService.class).asEagerSingleton();
 
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java b/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java
index 7a3ceaf..38c949f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java
@@ -33,8 +33,8 @@ import com.ning.billing.catalog.api.PhaseType;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.io.VersionedCatalogLoader;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.util.config.CatalogConfig;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
@@ -164,7 +164,7 @@ public class TestPlanAligner extends KillbillTestSuite {
     }
 
     private SubscriptionData createSubscriptionStartedInThePast(final String productName, final PhaseType phaseType) {
-        final DefaultSubscriptionFactory.SubscriptionBuilder builder = new DefaultSubscriptionFactory.SubscriptionBuilder();
+        final SubscriptionBuilder builder = new SubscriptionBuilder();
         builder.setBundleStartDate(clock.getUTCNow().minusHours(10));
         // Make sure to set the dates apart
         builder.setAlignStartDate(new DateTime(builder.getBundleStartDate().plusHours(5)));
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index 0b95f47..bd05062 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -275,6 +275,10 @@ public abstract class TestApiBase extends EntitlementTestSuiteWithEmbeddedDB imp
                                                                                                    new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSet, null),
                                                                                                    requestedDate == null ? clock.getUTCNow() : requestedDate, callContext);
         assertNotNull(subscription);
+
+
+        //try {Thread.sleep(100000000); } catch (Exception e) {};
+
         assertTrue(testListener.isCompleted(5000));
         return subscription;
     }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
index db3df23..8052a4f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
@@ -342,7 +342,7 @@ public class TestRepairBP extends TestApiBaseRepair {
 
         assertEquals(dryRunBaseSubscription.getActiveVersion(), SubscriptionEvents.INITIAL_VERSION);
         assertEquals(dryRunBaseSubscription.getBundleId(), bundle.getId());
-        assertEquals(dryRunBaseSubscription.getStartDate(), baseSubscription.getStartDate());
+        assertTrue(dryRunBaseSubscription.getStartDate().compareTo(baseSubscription.getStartDate()) == 0);
 
         Plan currentPlan = dryRunBaseSubscription.getCurrentPlan();
         assertNotNull(currentPlan);
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java
index 2f21fe2..cf792cc 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java
@@ -35,11 +35,11 @@ import com.ning.billing.catalog.api.PlanPhaseSpecifier;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.EntitlementTestSuite;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
 import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
 import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.ExistingEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.events.EntitlementEvent;
@@ -63,10 +63,10 @@ public class TestDefaultEntitlementTransferApi extends EntitlementTestSuite {
     public void setUp() throws Exception {
         final EntitlementDao dao = Mockito.mock(EntitlementDao.class);
         final CatalogService catalogService = new MockCatalogService(new MockCatalog());
-        final SubscriptionFactory subscriptionFactory = Mockito.mock(SubscriptionFactory.class);
+        final SubscriptionApiService apiService =  Mockito.mock(SubscriptionApiService.class);
         final EntitlementTimelineApi timelineApi = Mockito.mock(EntitlementTimelineApi.class);
         final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(IDBI.class), clock);
-        transferApi = new DefaultEntitlementTransferApi(clock, dao, timelineApi, catalogService, subscriptionFactory, internalCallContextFactory);
+        transferApi = new DefaultEntitlementTransferApi(clock, dao, timelineApi, catalogService, apiService, internalCallContextFactory);
     }
 
     @Test(groups = "fast")
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index 408a0f9..a7f3191 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -33,14 +33,13 @@ import org.slf4j.LoggerFactory;
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.catalog.api.TimeUnit;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
 import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
 import com.ning.billing.entitlement.api.transfer.TransferCancelData;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
@@ -141,10 +140,10 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     @Override
-    public Subscription getSubscriptionFromId(final SubscriptionFactory factory, final UUID subscriptionId, final InternalTenantContext context) {
+    public Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
         for (final Subscription cur : subscriptions) {
             if (cur.getId().equals(subscriptionId)) {
-                return buildSubscription(factory, (SubscriptionData) cur, context);
+                return buildSubscription((SubscriptionData) cur, context);
             }
         }
         return null;
@@ -156,11 +155,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     @Override
-    public List<Subscription> getSubscriptionsForAccountAndKey(final SubscriptionFactory factory, final UUID accountId, final String bundleKey, final InternalTenantContext context) {
+    public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId, final String bundleKey, final InternalTenantContext context) {
 
         for (final SubscriptionBundle cur : bundles) {
             if (cur.getExternalKey().equals(bundleKey) && cur.getAccountId().equals(bundleKey)) {
-                return getSubscriptions(factory, cur.getId(), context);
+                return getSubscriptions(cur.getId(), context);
             }
         }
         return Collections.emptyList();
@@ -175,7 +174,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
                 recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new EntitlementNotificationKey(cur.getId()), context);
             }
         }
-        final Subscription updatedSubscription = buildSubscription(null, subscription, context);
+        final Subscription updatedSubscription = buildSubscription(subscription, context);
         subscriptions.add(updatedSubscription);
     }
 
@@ -190,11 +189,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     @Override
-    public List<Subscription> getSubscriptions(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+    public List<Subscription> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
         final List<Subscription> results = new ArrayList<Subscription>();
         for (final Subscription cur : subscriptions) {
             if (cur.getBundleId().equals(bundleId)) {
-                results.add(buildSubscription(factory, (SubscriptionData) cur, context));
+                results.add(buildSubscription((SubscriptionData) cur, context));
             }
         }
         return results;
@@ -229,11 +228,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     @Override
-    public Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+    public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) {
         for (final Subscription cur : subscriptions) {
             if (cur.getBundleId().equals(bundleId) &&
                 cur.getCurrentPlan().getProduct().getCategory() == ProductCategory.BASE) {
-                return buildSubscription(factory, (SubscriptionData) cur, context);
+                return buildSubscription((SubscriptionData) cur, context);
             }
         }
         return null;
@@ -245,16 +244,13 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
         insertEvent(nextPhase, context);
     }
 
-    private Subscription buildSubscription(final SubscriptionFactory factory, final SubscriptionData in, final InternalTenantContext context) {
-        if (factory != null) {
-            return factory.createSubscription(new SubscriptionBuilder(in), getEventsForSubscription(in.getId(), context));
-        } else {
+    private Subscription buildSubscription(final SubscriptionData in, final InternalTenantContext context) {
             final SubscriptionData subscription = new SubscriptionData(new SubscriptionBuilder(in), null, clock);
             if (events.size() > 0) {
                 subscription.rebuildTransitions(getEventsForSubscription(in.getId(), context), catalogService.getFullCatalog());
             }
             return subscription;
-        }
+
     }
 
     @Override
@@ -284,6 +280,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     @Override
+    public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+        synchronized (events) {
+            for (int i = 0; i < subscriptions.size(); i++) {
+                cancelSubscription(subscriptions.get(i), cancelEvents.get(i), context, 0);
+            }
+        }
+    }
+
+    @Override
     public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents, final InternalCallContext context) {
         synchronized (events) {
             cancelNextChangeEvent(subscription.getId());
@@ -303,7 +308,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     private void cancelNextPhaseEvent(final UUID subscriptionId, final InternalTenantContext context) {
-        final Subscription curSubscription = getSubscriptionFromId(null, subscriptionId, context);
+        final Subscription curSubscription = getSubscriptionFromId(subscriptionId, context);
         if (curSubscription.getCurrentPhase() == null ||
             curSubscription.getCurrentPhase().getDuration().getUnit() == TimeUnit.UNLIMITED) {
             return;
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
index fff17a9..a580f01 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
@@ -17,6 +17,7 @@
 package com.ning.billing.entitlement.glue;
 
 import org.mockito.Mockito;
+import org.skife.config.ConfigurationObjectFactory;
 import org.skife.jdbi.v2.IDBI;
 
 import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
@@ -28,6 +29,7 @@ import com.ning.billing.util.callcontext.MockCallContextSqlDao;
 import com.ning.billing.util.glue.BusModule;
 import com.ning.billing.util.glue.BusModule.BusType;
 import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 
 import com.google.inject.name.Names;
@@ -44,6 +46,12 @@ public class MockEngineModuleMemory extends MockEngineModule {
 
     private void installNotificationQueue() {
         bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
+        configureNotificationQueueConfig();
+    }
+
+    protected void configureNotificationQueueConfig() {
+        final NotificationQueueConfig config = new ConfigurationObjectFactory(System.getProperties()).build(NotificationQueueConfig.class);
+        bind(NotificationQueueConfig.class).toInstance(config);
     }
 
     protected void installDBI() {
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index fa5da08..3f43198 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -305,6 +305,11 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
         return generator.generateInvoice(account, invoice, manualPay);
     }
 
+    @Override
+    public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final CallContext context) {
+        dao.consumeExstingCBAOnAccountWithUnpaidInvoices(accountId, internalCallContextFactory.createInternalCallContext(accountId, context));
+    }
+
     private void notifyBusOfInvoiceAdjustment(final UUID invoiceId, final UUID accountId, final InternalCallContext context) {
         try {
             eventBus.post(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId()), context);
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index 02b3ec5..f5aa44e 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -19,6 +19,7 @@ package com.ning.billing.invoice.dao;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -56,6 +57,7 @@ import com.google.common.base.Objects;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.collect.Ordering;
 import com.google.inject.Inject;
 
 public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, InvoiceApiException> implements InvoiceDao {
@@ -198,13 +200,16 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
                         transInvoiceItemSqlDao.create(invoiceItemModelDao, context);
                     }
 
+                    // Now we check whether we generated any credit that could be used on some unpaid invoices
+                    useExistingCBAFromTransaction(invoice.getAccountId(), entitySqlDaoWrapperFactory, context);
+
                     notifyOfFutureBillingEvents(entitySqlDaoWrapperFactory, invoice.getAccountId(), callbackDateTimePerSubscriptions, context.getUserToken());
 
                     // Create associated payments
                     final InvoicePaymentSqlDao invoicePaymentSqlDao = entitySqlDaoWrapperFactory.become(InvoicePaymentSqlDao.class);
                     invoicePaymentSqlDao.batchCreateFromTransaction(invoicePayments, context);
-                }
 
+                }
                 return null;
             }
         });
@@ -258,18 +263,12 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<InvoiceModelDao>>() {
             @Override
             public List<InvoiceModelDao> inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
-                final List<InvoiceModelDao> invoices = getAllInvoicesByAccountFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
-                final Collection<InvoiceModelDao> unpaidInvoices = Collections2.filter(invoices, new Predicate<InvoiceModelDao>() {
-                    @Override
-                    public boolean apply(final InvoiceModelDao in) {
-                        return (InvoiceModelDaoHelper.getBalance(in).compareTo(BigDecimal.ZERO) >= 1) && (upToDate == null || !in.getTargetDate().isAfter(upToDate));
-                    }
-                });
-                return new ArrayList<InvoiceModelDao>(unpaidInvoices);
+                return getUnpaidInvoicesByAccountFromTransaction(accountId, entitySqlDaoWrapperFactory, upToDate, context);
             }
         });
     }
 
+
     @Override
     public UUID getInvoiceIdByPaymentId(final UUID paymentId, final InternalTenantContext context) {
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<UUID>() {
@@ -291,7 +290,6 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
     }
 
     @Override
-
     public InvoicePaymentModelDao createRefund(final UUID paymentId, final BigDecimal requestedRefundAmount, final boolean isInvoiceAdjusted,
                                                final Map<UUID, BigDecimal> invoiceItemIdsWithNullAmounts, final UUID paymentCookieId,
                                                final InternalCallContext context)
@@ -367,6 +365,9 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
                     }
                 }
 
+                // Now we check whether we have any credit that could be used on some unpaid invoices (for which payment was just refunded)
+                useExistingCBAFromTransaction(invoice.getAccountId(), entitySqlDaoWrapperFactory, context);
+
                 // Notify the bus since the balance of the invoice changed
                 notifyBusOfInvoiceAdjustment(entitySqlDaoWrapperFactory, invoice.getId(), invoice.getAccountId(), context.getUserToken(), context);
 
@@ -495,18 +496,20 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
                 final InvoicePaymentModelDao payment = entitySqlDaoWrapperFactory.become(InvoicePaymentSqlDao.class).getById(invoicePaymentId.toString(), context);
                 if (payment == null) {
                     throw new InvoiceApiException(ErrorCode.INVOICE_PAYMENT_NOT_FOUND, invoicePaymentId.toString());
-                } else {
-                    final InvoicePaymentModelDao chargeBack = new InvoicePaymentModelDao(UUID.randomUUID(), context.getCreatedDate(), InvoicePaymentType.CHARGED_BACK,
-                                                                                         payment.getInvoiceId(), payment.getPaymentId(), context.getCreatedDate(),
-                                                                                         requestedChargedBackAmout.negate(), payment.getCurrency(), null, payment.getId());
-                    transactional.create(chargeBack, context);
+                }
+                final InvoicePaymentModelDao chargeBack = new InvoicePaymentModelDao(UUID.randomUUID(), context.getCreatedDate(), InvoicePaymentType.CHARGED_BACK,
+                                                                                     payment.getInvoiceId(), payment.getPaymentId(), context.getCreatedDate(),
+                                                                                     requestedChargedBackAmout.negate(), payment.getCurrency(), null, payment.getId());
+                transactional.create(chargeBack, context);
 
-                    // Notify the bus since the balance of the invoice changed
-                    final UUID accountId = transactional.getAccountIdFromInvoicePaymentId(chargeBack.getId().toString(), context);
-                    notifyBusOfInvoiceAdjustment(entitySqlDaoWrapperFactory, payment.getInvoiceId(), accountId, context.getUserToken(), context);
+                // Notify the bus since the balance of the invoice changed
+                final UUID accountId = transactional.getAccountIdFromInvoicePaymentId(chargeBack.getId().toString(), context);
+                notifyBusOfInvoiceAdjustment(entitySqlDaoWrapperFactory, payment.getInvoiceId(), accountId, context.getUserToken(), context);
 
-                    return chargeBack;
-                }
+                // Now we check whether we have any credit that could be used on some unpaid invoices (for which payment was just charged back)
+                useExistingCBAFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
+
+                return chargeBack;
             }
         });
     }
@@ -624,18 +627,8 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
                 }
                 populateChildren(invoice, entitySqlDaoWrapperFactory, context);
 
-                final BigDecimal accountCbaAvailable = getAccountCBAFromTransaction(invoice.getAccountId(), entitySqlDaoWrapperFactory, context);
-                final BigDecimal balance = InvoiceModelDaoHelper.getBalance(invoice);
-                if (accountCbaAvailable.compareTo(BigDecimal.ZERO) > 0 && balance.compareTo(BigDecimal.ZERO) > 0) {
-                    final BigDecimal cbaAmountToConsume = accountCbaAvailable.compareTo(balance) > 0 ? balance.negate() : accountCbaAvailable.negate();
-                    final InvoiceItemModelDao cbaAdjItem = new InvoiceItemModelDao(context.getCreatedDate(), InvoiceItemType.CBA_ADJ,
-                                                                                   invoice.getId(), invoice.getAccountId(),
-                                                                                   null, null, null, null,
-                                                                                   context.getCreatedDate().toLocalDate(),
-                                                                                   null, cbaAmountToConsume, null,
-                                                                                   invoice.getCurrency(), null);
-                    transInvoiceItemDao.create(cbaAdjItem, context);
-                }
+                // Now we check whether we have any credit that could be used towards that charge
+                useExistingCBAFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
 
                 // Notify the bus since the balance of the invoice changed
                 notifyBusOfInvoiceAdjustment(entitySqlDaoWrapperFactory, invoiceId, accountId, context.getUserToken(), context);
@@ -798,6 +791,68 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
         });
     }
 
+    public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final InternalCallContext context) {
+        transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+            @Override
+            public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                useExistingCBAFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
+                return null;
+            }
+        });
+    }
+
+    private void useExistingCBAFromTransaction(final UUID accountId, final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final InternalCallContext context) throws InvoiceApiException, EntityPersistenceException {
+
+        final BigDecimal accountCBA = getAccountCBAFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
+        if (accountCBA.compareTo(BigDecimal.ZERO) <= 0) {
+            return;
+        }
+
+        final List<InvoiceModelDao> unpaidInvoices = getUnpaidInvoicesByAccountFromTransaction(accountId, entitySqlDaoWrapperFactory, null, context);
+        // We order the same os BillingStateCalculator-- should really share the comparator
+        final List<InvoiceModelDao> orderedUnpaidInvoices = Ordering.from(new Comparator<InvoiceModelDao>() {
+            @Override
+            public int compare(final InvoiceModelDao i1, final InvoiceModelDao i2) {
+                return i1.getInvoiceDate().compareTo(i2.getInvoiceDate());
+            }
+        }).immutableSortedCopy(unpaidInvoices);
+
+        BigDecimal remainingAccountCBA = accountCBA;
+        for (InvoiceModelDao cur : orderedUnpaidInvoices) {
+            final BigDecimal curInvoiceBalance = InvoiceModelDaoHelper.getBalance(cur);
+            final BigDecimal cbaToApplyOnInvoice = remainingAccountCBA.compareTo(curInvoiceBalance) <= 0 ? remainingAccountCBA : curInvoiceBalance;
+            remainingAccountCBA = remainingAccountCBA.subtract(cbaToApplyOnInvoice);
+
+
+            final InvoiceItemModelDao cbaAdjItem = new InvoiceItemModelDao(context.getCreatedDate(), InvoiceItemType.CBA_ADJ,
+                                                                           cur.getId(), cur.getAccountId(),
+                                                                           null, null, null, null,
+                                                                           context.getCreatedDate().toLocalDate(),
+                                                                           null, cbaToApplyOnInvoice.negate(), null,
+                                                                           cur.getCurrency(), null);
+
+            final InvoiceItemSqlDao transInvoiceItemDao = entitySqlDaoWrapperFactory.become(InvoiceItemSqlDao.class);
+            transInvoiceItemDao.create(cbaAdjItem, context);
+
+            if (remainingAccountCBA.compareTo(BigDecimal.ZERO) <= 0) {
+                break;
+            }
+        }
+    }
+
+
+    private List<InvoiceModelDao> getUnpaidInvoicesByAccountFromTransaction(final UUID accountId, final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final LocalDate upToDate, final InternalTenantContext context) {
+        final List<InvoiceModelDao> invoices = getAllInvoicesByAccountFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
+        final Collection<InvoiceModelDao> unpaidInvoices = Collections2.filter(invoices, new Predicate<InvoiceModelDao>() {
+            @Override
+            public boolean apply(final InvoiceModelDao in) {
+                return (InvoiceModelDaoHelper.getBalance(in).compareTo(BigDecimal.ZERO) >= 1) && (upToDate == null || !in.getTargetDate().isAfter(upToDate));
+            }
+        });
+        return new ArrayList<InvoiceModelDao>(unpaidInvoices);
+    }
+
+
     /**
      * Create an adjustment for a given invoice item. This just creates the object in memory, it doesn't write it to disk.
      *
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java
index dbaa474..3e76427 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java
@@ -163,4 +163,11 @@ public interface InvoiceDao {
     void deleteCBA(UUID accountId, UUID invoiceId, UUID invoiceItemId, InternalCallContext context) throws InvoiceApiException;
 
     void notifyOfPayment(InvoicePaymentModelDao invoicePayment, InternalCallContext context);
+
+    /**
+     *
+     * @param accountId the account for which we need to rebalance the CBA
+     * @param context the callContext
+     */
+    public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final InternalCallContext context);
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 757859d..464376d 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -68,17 +68,6 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
 
     @Override
     public void initialize() throws NotificationQueueAlreadyExists {
-        final NotificationConfig notificationConfig = new NotificationConfig() {
-            @Override
-            public long getSleepTimeMs() {
-                return config.getSleepTimeMs();
-            }
-
-            @Override
-            public boolean isNotificationProcessingOff() {
-                return config.isNotificationProcessingOff();
-            }
-        };
 
         final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
             @Override
@@ -108,8 +97,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
 
         nextBillingQueue = notificationQueueService.createNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
                                                                             NEXT_BILLING_DATE_NOTIFIER_QUEUE,
-                                                                            notificationQueueHandler,
-                                                                            notificationConfig);
+                                                                            notificationQueueHandler);
     }
 
     @Override
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
index 8ca2f72..39e6e1d 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
@@ -67,16 +67,6 @@ public class InvoiceDaoTestBase extends InvoicingTestBase {
 
     private final InvoiceConfig invoiceConfig = new InvoiceConfig() {
         @Override
-        public long getSleepTimeMs() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public boolean isNotificationProcessingOff() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
         public int getNumberOfMonthsInFuture() {
             return 36;
         }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
index 6861d13..9371dce 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
@@ -182,6 +182,10 @@ public class MockInvoiceDao implements InvoiceDao {
     }
 
     @Override
+    public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final InternalCallContext context) {
+    }
+
+    @Override
     public BigDecimal getAccountBalance(final UUID accountId, final InternalTenantContext context) {
         BigDecimal balance = BigDecimal.ZERO;
 
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/TestInvoiceDao.java b/invoice/src/test/java/com/ning/billing/invoice/dao/TestInvoiceDao.java
index 43269b5..736bfe3 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/TestInvoiceDao.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/TestInvoiceDao.java
@@ -631,14 +631,16 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
         final boolean partialRefund = refundAmount.compareTo(amount) < 0;
         final BigDecimal cba = invoiceDao.getAccountCBA(accountId, internalCallContext);
         final InvoiceModelDao savedInvoice = invoiceDao.getById(invoice.getId(), internalCallContext);
-        assertEquals(cba.compareTo(new BigDecimal("20.0")), 0);
+
+        final BigDecimal expectedCba = balance.compareTo(BigDecimal.ZERO) < 0 ? balance.negate() : BigDecimal.ZERO;
+        assertEquals(cba.compareTo(expectedCba), 0);
         if (partialRefund) {
             // IB = 20 (rec) - 20 (repair) + 20 (cba) - (20 -7) = 7;  AB = IB - CBA = 7 - 20 = -13
             assertEquals(balance.compareTo(new BigDecimal("-13.0")), 0);
-            assertEquals(savedInvoice.getInvoiceItems().size(), 3);
+            assertEquals(savedInvoice.getInvoiceItems().size(), 4);
         } else {
             assertEquals(balance.compareTo(new BigDecimal("0.0")), 0);
-            assertEquals(savedInvoice.getInvoiceItems().size(), 3);
+            assertEquals(savedInvoice.getInvoiceItems().size(), 4);
         }
     }
 
@@ -730,7 +732,8 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
         balance = invoiceDao.getAccountBalance(accountId, internalCallContext);
         assertEquals(balance.compareTo(expectedFinalBalance), 0);
         cba = invoiceDao.getAccountCBA(accountId, internalCallContext);
-        assertEquals(cba.compareTo(new BigDecimal("10.00")), 0);
+        final BigDecimal expectedCba = balance.compareTo(BigDecimal.ZERO) < 0 ? balance.negate() : BigDecimal.ZERO;
+        assertEquals(cba.compareTo(expectedCba), 0);
     }
 
     @Test(groups = "slow")
@@ -738,13 +741,8 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
 
         final UUID accountId = UUID.randomUUID();
         final UUID bundleId = UUID.randomUUID();
-        final LocalDate targetDate1 = new LocalDate(2011, 10, 6);
-        final Invoice invoice1 = new DefaultInvoice(accountId, clock.getUTCToday(), targetDate1, Currency.USD);
-        createInvoice(invoice1, true, internalCallContext);
 
-        // CREATE INVOICE WITH A (just) CBA. Should not happen, but that does not matter for that test
-        final CreditBalanceAdjInvoiceItem cbaItem = new CreditBalanceAdjInvoiceItem(invoice1.getId(), accountId, new LocalDate(), new BigDecimal("20.0"), Currency.USD);
-        createInvoiceItem(cbaItem, internalCallContext);
+        invoiceDao.insertCredit(accountId, null,  new BigDecimal("20.0"), new LocalDate(), Currency.USD, internalCallContext);
 
         final InvoiceItemModelDao charge = invoiceDao.insertExternalCharge(accountId, null, bundleId, "bla", new BigDecimal("15.0"), clock.getUTCNow().toLocalDate(), Currency.USD, internalCallContext);
 
@@ -1358,7 +1356,7 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
     }
 
     @Test(groups = "slow")
-    public void testDeleteCBAPartiallyConsumed() throws Exception {
+    public void testRefundWithCBAPartiallyConsumed() throws Exception {
         final UUID accountId = UUID.randomUUID();
 
         // Create invoice 1
@@ -1377,6 +1375,12 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
         final CreditBalanceAdjInvoiceItem creditBalanceAdjInvoiceItem1 = new CreditBalanceAdjInvoiceItem(fixedItem1.getInvoiceId(), fixedItem1.getAccountId(),
                                                                                                          fixedItem1.getStartDate(), fixedItem1.getAmount(),
                                                                                                          fixedItem1.getCurrency());
+
+        final UUID paymentId = UUID.randomUUID();
+        final DefaultInvoicePayment defaultInvoicePayment = new DefaultInvoicePayment(InvoicePaymentType.ATTEMPT, paymentId, invoice1.getId(), clock.getUTCNow().plusDays(12), new BigDecimal("10.0"), Currency.USD);
+
+        invoiceDao.notifyOfPayment(new InvoicePaymentModelDao(defaultInvoicePayment), internalCallContext);
+
         createInvoice(invoice1, true, internalCallContext);
         createInvoiceItem(fixedItem1, internalCallContext);
         createInvoiceItem(repairAdjInvoiceItem, internalCallContext);
@@ -1398,20 +1402,20 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
 
         // Verify scenario - half of the CBA should have been used
         Assert.assertEquals(invoiceDao.getAccountCBA(accountId, internalCallContext).doubleValue(), 5.00);
-        verifyInvoice(invoice1.getId(), 10.00, 10.00);
+        verifyInvoice(invoice1.getId(), 0.00, 10.00);
         verifyInvoice(invoice2.getId(), 0.00, -5.00);
 
-        // Delete the CBA on invoice 1
-        invoiceDao.deleteCBA(accountId, invoice1.getId(), creditBalanceAdjInvoiceItem1.getId(), internalCallContext);
+        // Refund Payment before we can deleted CBA
+        invoiceDao.createRefund(paymentId, new BigDecimal("10.0"), false, ImmutableMap.<UUID,BigDecimal>of(), UUID.randomUUID(), internalCallContext);
 
         // Verify all three invoices were affected
         Assert.assertEquals(invoiceDao.getAccountCBA(accountId, internalCallContext).doubleValue(), 0.00);
-        verifyInvoice(invoice1.getId(), 0.00, 0.00);
-        verifyInvoice(invoice2.getId(), 5.00, 0.00);
+        verifyInvoice(invoice1.getId(), 5.00, 5.00);
+        verifyInvoice(invoice2.getId(), 0.00, -5.00);
     }
 
     @Test(groups = "slow")
-    public void testDeleteCBAFullyConsumedTwice() throws Exception {
+    public void testRefundCBAFullyConsumedTwice() throws Exception {
         final UUID accountId = UUID.randomUUID();
 
         // Create invoice 1
@@ -1435,6 +1439,13 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
         createInvoiceItem(repairAdjInvoiceItem, internalCallContext);
         createInvoiceItem(creditBalanceAdjInvoiceItem1, internalCallContext);
 
+
+        final BigDecimal paymentAmount = new BigDecimal("10.00");
+        final UUID paymentId = UUID.randomUUID();
+
+        final DefaultInvoicePayment defaultInvoicePayment = new DefaultInvoicePayment(InvoicePaymentType.ATTEMPT, paymentId, invoice1.getId(), clock.getUTCNow().plusDays(12), paymentAmount, Currency.USD);
+        invoiceDao.notifyOfPayment(new InvoicePaymentModelDao(defaultInvoicePayment), internalCallContext);
+
         // Create invoice 2
         // Scenario: single item
         // * $5 item
@@ -1465,18 +1476,17 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
 
         // Verify scenario - all CBA should have been used
         Assert.assertEquals(invoiceDao.getAccountCBA(accountId, internalCallContext).doubleValue(), 0.00);
-        verifyInvoice(invoice1.getId(), 10.00, 10.00);
+        verifyInvoice(invoice1.getId(), 0.00, 10.00);
         verifyInvoice(invoice2.getId(), 0.00, -5.00);
         verifyInvoice(invoice3.getId(), 0.00, -5.00);
 
-        // Delete the CBA on invoice 1
-        invoiceDao.deleteCBA(accountId, invoice1.getId(), creditBalanceAdjInvoiceItem1.getId(), internalCallContext);
+        invoiceDao.createRefund(paymentId, paymentAmount, false, ImmutableMap.<UUID, BigDecimal>of(), UUID.randomUUID(), internalCallContext);
 
         // Verify all three invoices were affected
         Assert.assertEquals(invoiceDao.getAccountCBA(accountId, internalCallContext).doubleValue(), 0.00);
-        verifyInvoice(invoice1.getId(), 0.00, 0.00);
-        verifyInvoice(invoice2.getId(), 5.00, 0.00);
-        verifyInvoice(invoice3.getId(), 5.00, 0.00);
+        verifyInvoice(invoice1.getId(), 10.00, 10.00);
+        verifyInvoice(invoice2.getId(), 0.00, -5.00);
+        verifyInvoice(invoice3.getId(), 0.00, -5.00);
     }
 
     @Test(groups = "slow")
diff --git a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java
index 5864d95..9c4087a 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java
@@ -82,11 +82,6 @@ public class TestDefaultInvoiceGenerator extends InvoicingTestBase {
         final Clock clock = new DefaultClock();
         final InvoiceConfig invoiceConfig = new InvoiceConfig() {
             @Override
-            public long getSleepTimeMs() {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
             public int getNumberOfMonthsInFuture() {
                 return 36;
             }
@@ -95,11 +90,6 @@ public class TestDefaultInvoiceGenerator extends InvoicingTestBase {
             public boolean isEmailNotificationsEnabled() {
                 return false;
             }
-
-            @Override
-            public boolean isNotificationProcessingOff() {
-                throw new UnsupportedOperationException();
-            }
         };
         this.generator = new DefaultInvoiceGenerator(clock, invoiceConfig);
     }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java
index 1fd99d3..c5c7168 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java
@@ -67,21 +67,11 @@ public class TestDefaultInvoiceGeneratorUnit extends InvoicingTestBase {
         clock = new ClockMock();
         gen = new TestDefaultInvoiceGeneratorMock(clock, new InvoiceConfig() {
             @Override
-            public boolean isNotificationProcessingOff() {
-                return false;
-            }
-
-            @Override
             public boolean isEmailNotificationsEnabled() {
                 return false;
             }
 
             @Override
-            public long getSleepTimeMs() {
-                return 100;
-            }
-
-            @Override
             public int getNumberOfMonthsInFuture() {
                 return 5;
             }
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
index 0162c8f..f07276d 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
@@ -332,6 +332,27 @@ public class AccountResource extends JaxRsResourceBase {
         return Response.status(Status.OK).build();
     }
 
+
+    /*
+     * ************************** INVOICE CBA REBALANCING ********************************
+     */
+    @POST
+    @Path("/{accountId:" + UUID_PATTERN + "}/" + CBA_REBALANCING)
+    @Consumes(APPLICATION_JSON)
+    @Produces(APPLICATION_JSON)
+    public Response rebalanceExistingCBAOnAccount(@PathParam("accountId") final String accountIdString,
+                                   @HeaderParam(HDR_CREATED_BY) final String createdBy,
+                                  @HeaderParam(HDR_REASON) final String reason,
+                                  @HeaderParam(HDR_COMMENT) final String comment,
+                                  @javax.ws.rs.core.Context final HttpServletRequest request) throws AccountApiException {
+
+        final CallContext callContext = context.createContext(createdBy, reason, comment, request);
+        final UUID accountId = UUID.fromString(accountIdString);
+
+        invoiceApi.consumeExstingCBAOnAccountWithUnpaidInvoices(accountId, callContext);
+        return Response.status(Status.OK).build();
+    }
+
     /*
      * ************************** PAYMENTS ********************************
      */
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
index 8384cf9..7d0ec23 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
@@ -132,4 +132,7 @@ public interface JaxrsResource {
 
     public static final String EXPORT = "export";
     public static final String EXPORT_PATH = PREFIX + "/" + EXPORT;
+
+    public static final String CBA_REBALANCING = "cbaRebalancing";
+
 }
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
index 67e783e..d107468 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -56,18 +56,6 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
 
     @Override
     public void initialize() {
-        final NotificationConfig notificationConfig = new NotificationConfig() {
-            @Override
-            public boolean isNotificationProcessingOff() {
-                return config.isNotificationProcessingOff();
-            }
-
-            @Override
-            public long getSleepTimeMs() {
-                return config.getSleepTimeMs();
-            }
-        };
-
         final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
             @Override
             public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
@@ -89,8 +77,7 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
         try {
             overdueQueue = notificationQueueService.createNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
                                                                             OVERDUE_CHECK_NOTIFIER_QUEUE,
-                                                                            notificationQueueHandler,
-                                                                            notificationConfig);
+                                                                            notificationQueueHandler);
         } catch (NotificationQueueAlreadyExists e) {
             throw new RuntimeException(e);
         }
diff --git a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
index 34aca06..9f0955c 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
@@ -66,6 +66,7 @@ import com.ning.billing.util.email.templates.TemplateModule;
 import com.ning.billing.util.globallocker.GlobalLocker;
 import com.ning.billing.util.globallocker.MySqlGlobalLocker;
 import com.ning.billing.util.glue.BusModule;
+import com.ning.billing.util.glue.NotificationQueueModule;
 import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
@@ -120,7 +121,6 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
                 super.configure();
                 bind(Clock.class).to(ClockMock.class).asEagerSingleton();
                 bind(CallContextFactory.class).to(DefaultCallContextFactory.class).asEagerSingleton();
-                bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
                 final InvoiceConfig invoiceConfig = new ConfigurationObjectFactory(System.getProperties()).build(InvoiceConfig.class);
                 bind(InvoiceConfig.class).toInstance(invoiceConfig);
                 final CatalogConfig catalogConfig = new ConfigurationObjectFactory(System.getProperties()).build(CatalogConfig.class);
@@ -136,7 +136,7 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
                 install(new MockJunctionModule());
                 install(new EmailModule());
                 install(new TemplateModule());
-
+                install(new NotificationQueueModule());
                 final AccountInternalApi accountApi = Mockito.mock(AccountInternalApi.class);
                 bind(AccountInternalApi.class).toInstance(accountApi);
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
index b45819d..5ffb060 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
@@ -39,7 +39,7 @@ public class AutoPayRetryService extends BaseRetryService implements RetryServic
                                final PaymentConfig config,
                                final PaymentProcessor paymentProcessor,
                                final InternalCallContextFactory internalCallContextFactory) {
-        super(notificationQueueService, config, internalCallContextFactory);
+        super(notificationQueueService, internalCallContextFactory);
         this.paymentProcessor = paymentProcessor;
     }
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
index d1b78cc..c2cda2d 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
@@ -47,16 +47,13 @@ public abstract class BaseRetryService implements RetryService {
     private static final String PAYMENT_RETRY_SERVICE = "PaymentRetryService";
 
     private final NotificationQueueService notificationQueueService;
-    private final PaymentConfig config;
     private final InternalCallContextFactory internalCallContextFactory;
 
     private NotificationQueue retryQueue;
 
     public BaseRetryService(final NotificationQueueService notificationQueueService,
-                            final PaymentConfig config,
                             final InternalCallContextFactory internalCallContextFactory) {
         this.notificationQueueService = notificationQueueService;
-        this.config = config;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
@@ -75,8 +72,7 @@ public abstract class BaseRetryService implements RetryService {
                                                                               final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
                                                                               retry(key.getUuidKey(), callContext);
                                                                           }
-                                                                      },
-                                                                      config);
+                                                                      });
     }
 
     @Override
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
index 250987b..c9a80a1 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
@@ -45,7 +45,7 @@ public class FailedPaymentRetryService extends BaseRetryService implements Retry
                                      final PaymentConfig config,
                                      final PaymentProcessor paymentProcessor,
                                      final InternalCallContextFactory internalCallContextFactory) {
-        super(notificationQueueService, config, internalCallContextFactory);
+        super(notificationQueueService, internalCallContextFactory);
         this.paymentProcessor = paymentProcessor;
     }
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
index 9217abd..2e76ea2 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
@@ -43,10 +43,9 @@ public class PluginFailureRetryService extends BaseRetryService implements Retry
 
     @Inject
     public PluginFailureRetryService(final NotificationQueueService notificationQueueService,
-                                     final PaymentConfig config,
                                      final PaymentProcessor paymentProcessor,
                                      final InternalCallContextFactory internalCallContextFactory) {
-        super(notificationQueueService, config, internalCallContextFactory);
+        super(notificationQueueService, internalCallContextFactory);
         this.paymentProcessor = paymentProcessor;
     }
 
diff --git a/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java b/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java
index a01dda5..0da7d2b 100644
--- a/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java
+++ b/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java
@@ -16,8 +16,6 @@
 
 package com.ning.billing.payment.glue;
 
-import static org.testng.Assert.assertNotNull;
-
 import java.io.IOException;
 import java.net.URL;
 import java.util.Properties;
@@ -28,7 +26,6 @@ import org.skife.config.SimplePropertyConfigSource;
 import org.skife.jdbi.v2.IDBI;
 
 import com.ning.billing.ObjectType;
-import com.ning.billing.util.config.PaymentConfig;
 import com.ning.billing.mock.glue.MockInvoiceModule;
 import com.ning.billing.mock.glue.MockNotificationQueueModule;
 import com.ning.billing.payment.dao.MockPaymentDao;
@@ -37,6 +34,7 @@ import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
 import com.ning.billing.util.callcontext.CallContextSqlDao;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 import com.ning.billing.util.callcontext.MockCallContextSqlDao;
+import com.ning.billing.util.config.PaymentConfig;
 import com.ning.billing.util.globallocker.GlobalLocker;
 import com.ning.billing.util.globallocker.MockGlobalLocker;
 import com.ning.billing.util.glue.BusModule;
@@ -46,9 +44,11 @@ import com.ning.billing.util.svcapi.tag.TagInternalApi;
 import com.ning.billing.util.tag.Tag;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+
+import static org.testng.Assert.assertNotNull;
 
 public class PaymentTestModuleWithMocks extends PaymentModule {
+
     public static final String PLUGIN_TEST_NAME = "my-mock";
 
     private void loadSystemPropertiesFromClasspath(final String resource) {
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
index 57122fe..f2fa6ab 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
@@ -59,6 +59,8 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
     private final String hostname;
     private final InternalCallContextFactory internalCallContextFactory;
 
+    private volatile boolean isStarted;
+
     private static final class EventBusDelegate extends EventBus {
 
         public EventBusDelegate(final String busName) {
@@ -95,16 +97,19 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
         this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
         this.hostname = Hostname.get();
         this.internalCallContextFactory = internalCallContextFactory;
+        this.isStarted = false;
     }
 
     @Override
     public void start() {
         startQueue();
+        isStarted = true;
     }
 
     @Override
     public void stop() {
         stopQueue();
+        isStarted = false;
     }
 
     @Override
@@ -130,6 +135,11 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
         return result;
     }
 
+    @Override
+    public boolean isStarted() {
+        return isStarted;
+    }
+
     private List<BusEventEntry> getNextBusEvent(final InternalCallContext context) {
         final Date now = clock.getUTCNow().toDate();
         final Date nextAvailable = clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
diff --git a/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java b/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java
index aad7427..30c0ebd 100644
--- a/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java
@@ -19,14 +19,5 @@ package com.ning.billing.util.config;
 import org.skife.config.Config;
 import org.skife.config.Default;
 
-public interface EntitlementConfig extends NotificationConfig, KillbillConfig {
-    @Override
-    @Config("killbill.entitlement.engine.notifications.sleep")
-    @Default("500")
-    public long getSleepTimeMs();
-
-    @Override
-    @Config("killbill.entitlement.engine.notifications.off")
-    @Default("false")
-    public boolean isNotificationProcessingOff();
+public interface EntitlementConfig extends KillbillConfig {
 }
diff --git a/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java b/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java
index a1f8d7b..73db4fc 100644
--- a/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java
@@ -19,17 +19,7 @@ package com.ning.billing.util.config;
 import org.skife.config.Config;
 import org.skife.config.Default;
 
-public interface InvoiceConfig extends NotificationConfig, KillbillConfig {
-    @Override
-    @Config("killbill.invoice.engine.notifications.sleep")
-    @Default("500")
-    public long getSleepTimeMs();
-
-    @Override
-    @Config("killbill.invoice.engine.notifications.off")
-    @Default("false")
-    public boolean isNotificationProcessingOff();
-
+public interface InvoiceConfig extends KillbillConfig {
     @Config("killbill.invoice.maxNumberOfMonthsInFuture")
     @Default("36")
     public int getNumberOfMonthsInFuture();
diff --git a/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java b/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java
index 16a6e4b..64ae12d 100644
--- a/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java
@@ -22,7 +22,7 @@ import org.skife.config.Config;
 import org.skife.config.Default;
 
 
-public interface PaymentConfig extends NotificationConfig, KillbillConfig {
+public interface PaymentConfig extends KillbillConfig {
     @Config("killbill.payment.provider.default")
     @Default("noop")
     public String getDefaultPaymentProvider();
@@ -43,16 +43,6 @@ public interface PaymentConfig extends NotificationConfig, KillbillConfig {
     @Default("8")
     public int getPluginFailureRetryMaxAttempts();
 
-    @Override
-    @Config("killbill.payment.engine.notifications.sleep")
-    @Default("500")
-    public long getSleepTimeMs();
-
-    @Override
-    @Config("killbill.payment.engine.notifications.off")
-    @Default("false")
-    public boolean isNotificationProcessingOff();
-
     @Config("killbill.payment.off")
     @Default("false")
     public boolean isPaymentOff();
diff --git a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
index 557a6e1..7e4e1c9 100644
--- a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
@@ -16,15 +16,25 @@
 
 package com.ning.billing.util.glue;
 
+import org.skife.config.ConfigurationObjectFactory;
+
+import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 
 import com.google.inject.AbstractModule;
 
 public class NotificationQueueModule extends AbstractModule {
 
+
+    protected void configureNotificationQueueConfig() {
+        final NotificationQueueConfig config = new ConfigurationObjectFactory(System.getProperties()).build(NotificationQueueConfig.class);
+        bind(NotificationQueueConfig.class).toInstance(config);
+    }
     @Override
     protected void configure() {
         bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+        configureNotificationQueueConfig();
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 66d6688..0a295ae 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -55,7 +55,6 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
     public List<Notification> getReadyNotifications(@Bind("now") Date now,
                                                     @Bind("owner") String owner,
                                                     @Bind("max") int max,
-                                                    @Bind("queueName") String queueName,
                                                     @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index b054d93..2780faa 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -17,14 +17,10 @@
 package com.ning.billing.util.notificationq;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import javax.annotation.Nullable;
-
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
@@ -32,10 +28,11 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ning.billing.util.callcontext.CallOrigin;
+import com.ning.billing.util.Hostname;
+import com.ning.billing.util.config.NotificationConfig;
+
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
-import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
@@ -43,51 +40,41 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 
-public class DefaultNotificationQueue extends NotificationQueueBase {
+public class DefaultNotificationQueue implements NotificationQueue {
 
     private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
 
     private final IDBI dbi;
     private final NotificationSqlDao dao;
-    private final InternalCallContextFactory internalCallContextFactory;
+    private final String hostname;
 
-    public DefaultNotificationQueue(final IDBI dbi, final Clock clock, final String svcName, final String queueName,
-                                    final NotificationQueueHandler handler, final NotificationConfig config,
-                                    final InternalCallContextFactory internalCallContextFactory) {
-        super(clock, svcName, queueName, handler, config);
-        this.dbi = dbi;
-        this.dao = dbi.onDemand(NotificationSqlDao.class);
-        this.internalCallContextFactory = internalCallContextFactory;
-    }
+    private final String svcName;
+    private final String queueName;
 
-    @Override
-    public int doProcessEvents() {
-        logDebug("ENTER doProcessEvents");
-        // Finding and claiming notifications is not done per tenant (yet?)
-        final List<Notification> notifications = getReadyNotifications(createCallContext(null, null, null));
-        if (notifications.size() == 0) {
-            logDebug("EXIT doProcessEvents");
-            return 0;
-        }
+    private final ObjectMapper objectMapper;
 
-        logDebug("START processing %d events at time %s", notifications.size(), getClock().getUTCNow().toDate());
-
-        int result = 0;
-        for (final Notification cur : notifications) {
-            getNbProcessedEvents().incrementAndGet();
-            logDebug("handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-            final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
-            getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getFutureUserToken(), cur.getAccountRecordId(), cur.getTenantRecordId());
-            result++;
-            clearNotification(cur, createCallContext(cur.getUserToken(), cur.getTenantRecordId(), cur.getAccountRecordId()));
-            logDebug("done handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-        }
+    private final NotificationQueueHandler handler;
+
+    private final NotificationQueueService notificationQueueService;
 
-        return result;
+    private volatile boolean isStarted;
+
+    public DefaultNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler,
+                                    final IDBI dbi, final NotificationQueueService notificationQueueService) {
+        this.svcName = svcName;
+        this.queueName = queueName;
+        this.handler = handler;
+        this.dbi = dbi;
+        this.dao = dbi.onDemand(NotificationSqlDao.class);
+        this.hostname = Hostname.get();
+        this.notificationQueueService = notificationQueueService;
+        this.objectMapper = new ObjectMapper();
     }
 
+
     @Override
     public void recordFutureNotification(final DateTime futureNotificationTime,
                                          final NotificationKey notificationKey,
@@ -109,54 +96,14 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
                                                   final NotificationSqlDao thisDao,
                                                   final InternalCallContext context) throws IOException {
         final String json = objectMapper.writeValueAsString(notificationKey);
-        // Create a new user token. This will be used in the future, when this notification is triggered, to trace
-        // generated bus events
         final UUID futureUserToken = UUID.randomUUID();
-        final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json,
-                                                                  context.getUserToken(), futureUserToken, futureNotificationTime,
-                                                                  context.getAccountRecordId(), context.getTenantRecordId());
-        thisDao.insertNotification(notification, context);
-    }
 
-    private void clearNotification(final Notification cleared, final InternalCallContext context) {
-        dao.clearNotification(cleared.getId().toString(), getHostname(), context);
+        final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.getClass().getName(), json,
+                                                                  context.getUserToken(), futureUserToken, futureNotificationTime, context.getAccountRecordId(), context.getTenantRecordId());
+        thisDao.insertNotification(notification, context);
     }
 
-    private List<Notification> getReadyNotifications(final InternalCallContext context) {
-        final Date now = getClock().getUTCNow().toDate();
-        final Date nextAvailable = getClock().getUTCNow().plus(CLAIM_TIME_MS).toDate();
-        final List<Notification> input = dao.getReadyNotifications(now, getHostname(), CLAIM_TIME_MS, getFullQName(), context);
-
-        final List<Notification> claimedNotifications = new ArrayList<Notification>();
-        for (final Notification cur : input) {
-            logDebug("about to claim notification %s,  key = %s for time %s",
-                     cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-
-            final boolean claimed = (dao.claimNotification(getHostname(), nextAvailable, cur.getId().toString(), now, context) == 1);
-            logDebug("claimed notification %s, key = %s for time %s result = %s",
-                     cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate(), claimed);
 
-            if (claimed) {
-                claimedNotifications.add(cur);
-                dao.insertClaimedHistory(getHostname(), now, cur.getId().toString(), context);
-            }
-        }
-
-        for (final Notification cur : claimedNotifications) {
-            if (cur.getOwner() != null && !cur.getOwner().equals(getHostname())) {
-                log.warn("NotificationQueue {} stealing notification {} from {}", new Object[]{getFullQName(), cur, cur.getOwner()});
-            }
-        }
-
-        return claimedNotifications;
-    }
-
-    private void logDebug(final String format, final Object... args) {
-        if (log.isDebugEnabled()) {
-            final String realDebug = String.format(format, args);
-            log.debug(String.format("Thread %d [queue = %s] %s", Thread.currentThread().getId(), getFullQName(), realDebug));
-        }
-    }
 
     @Override
     public void removeNotificationsByKey(final NotificationKey notificationKey, final InternalCallContext context) {
@@ -190,7 +137,42 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
         dao.removeNotification(notificationId.toString(), context);
     }
 
-    private InternalCallContext createCallContext(final UUID userToken, @Nullable final Long tenantRecordId, @Nullable final Long accountRecordId) {
-        return internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "NotificationQueue", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
+    @Override
+    public String getFullQName() {
+        return NotificationQueueServiceBase.getCompositeName(svcName, queueName);
+    }
+
+    @Override
+    public String getServiceName() {
+        return svcName;
+    }
+
+    @Override
+    public String getQueueName() {
+        return queueName;
+    }
+
+    @Override
+    public NotificationQueueHandler getHandler() {
+        return handler;
+    }
+
+    @Override
+    public void startQueue() {
+        notificationQueueService.startQueue();
+        isStarted = true;
+    }
+
+    @Override
+    public void stopQueue() {
+        // Order matters...
+        isStarted = false;
+        notificationQueueService.stopQueue();
+    }
+
+    @Override
+    public boolean isStarted() {
+        return isStarted;
     }
+
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index 350a140..d56023c 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -21,25 +21,26 @@ import org.skife.jdbi.v2.IDBI;
 import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
 import com.google.inject.Inject;
 
 public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
+
     private final IDBI dbi;
-    private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
-    public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final InternalCallContextFactory internalCallContextFactory) {
-        super(clock);
+    public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final NotificationQueueConfig config,
+                                           final InternalCallContextFactory internalCallContextFactory) {
+        super(clock, config, dbi, internalCallContextFactory);
         this.dbi = dbi;
-        this.internalCallContextFactory = internalCallContextFactory;
     }
 
+
     @Override
     protected NotificationQueue createNotificationQueueInternal(final String svcName,
                                                                 final String queueName,
-                                                                final NotificationQueueHandler handler,
-                                                                final NotificationConfig config) {
-        return new DefaultNotificationQueue(dbi, clock, svcName, queueName, handler, config, internalCallContextFactory);
+                                                                final NotificationQueueHandler handler) {
+        return new DefaultNotificationQueue(svcName, queueName, handler, dbi, this);
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 22cb36a..58b49c4 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -25,6 +25,7 @@ import org.joda.time.DateTime;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.queue.QueueLifecycle;
 
 public interface NotificationQueue extends QueueLifecycle {
@@ -66,13 +67,6 @@ public interface NotificationQueue extends QueueLifecycle {
     public void removeNotification(final UUID notificationId,
                                    final InternalCallContext context);
 
-    /**
-     * This is only valid when the queue has been configured with isNotificationProcessingOff is true
-     * In which case, it will callback users for all the ready notifications.
-     *
-     * @return the number of entries we processed
-     */
-    public int processReadyNotification();
 
     /**
      * @return the name of that queue
@@ -88,4 +82,6 @@ public interface NotificationQueue extends QueueLifecycle {
      * @return the queue name associated
      */
     public String getQueueName();
+
+    public NotificationQueueHandler getHandler();
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index f83a1ec..cae7775 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -21,8 +21,9 @@ import java.util.UUID;
 import org.joda.time.DateTime;
 
 import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.queue.QueueLifecycle;
 
-public interface NotificationQueueService {
+public interface NotificationQueueService extends QueueLifecycle {
 
     public interface NotificationQueueHandler {
 
@@ -61,12 +62,11 @@ public interface NotificationQueueService {
      * @param svcName   the name of the service using that queue
      * @param queueName a name for that queue (unique per service)
      * @param handler   the handler required for notifying the caller of state change
-     * @param config    the notification queue configuration
      * @return a new NotificationQueue
      * @throws com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists
      *          is the queue associated with that service and name already exits
      */
-    public NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config)
+    public NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler)
             throws NotificationQueueAlreadyExists;
 
     /**
@@ -92,8 +92,7 @@ public interface NotificationQueueService {
             throws NoSuchNotificationQueue;
 
     /**
-     * @param services
      * @return the number of processed notifications
      */
-    public int triggerManualQueueProcessing(final String[] services, final Boolean keepRunning);
+    public int triggerManualQueueProcessing(final Boolean keepRunning);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 8763192..99dd74f 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -17,39 +17,29 @@
 package com.ning.billing.util.notificationq;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.skife.jdbi.v2.IDBI;
 
-import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
 
-import com.google.common.base.Joiner;
 import com.google.inject.Inject;
 
-public abstract class NotificationQueueServiceBase implements NotificationQueueService {
-    protected final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+public abstract class NotificationQueueServiceBase extends NotificationQueueDispatcher implements NotificationQueueService {
 
-    protected final Clock clock;
-
-    private final Map<String, NotificationQueue> queues;
 
     @Inject
-    public NotificationQueueServiceBase(final Clock clock) {
-        this.clock = clock;
-        this.queues = new TreeMap<String, NotificationQueue>();
+    public NotificationQueueServiceBase(final Clock clock, final NotificationQueueConfig config, final IDBI dbi,
+                                        final InternalCallContextFactory internalCallContextFactory) {
+        super(clock, config, dbi, internalCallContextFactory);
     }
 
     @Override
     public NotificationQueue createNotificationQueue(final String svcName,
                                                      final String queueName,
-                                                     final NotificationQueueHandler handler,
-                                                     final NotificationConfig config) throws NotificationQueueAlreadyExists {
-        if (svcName == null || queueName == null || handler == null || config == null) {
+                                                     final NotificationQueueHandler handler) throws NotificationQueueAlreadyExists {
+        if (svcName == null || queueName == null || handler == null) {
             throw new RuntimeException("Need to specify all parameters");
         }
 
@@ -61,7 +51,7 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
                 throw new NotificationQueueAlreadyExists(String.format("Queue for svc %s and name %s already exist",
                                                                        svcName, queueName));
             }
-            result = createNotificationQueueInternal(svcName, queueName, handler, config);
+            result = createNotificationQueueInternal(svcName, queueName, handler);
             queues.put(compositeName, result);
         }
         return result;
@@ -96,33 +86,29 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
         }
     }
 
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("NotificationQueueServiceBase");
+        sb.append("{queues=").append(queues);
+        sb.append('}');
+        return sb.toString();
+    }
+
+
     //
     // Test ONLY
     //
     @Override
-    public int triggerManualQueueProcessing(final String[] services, final Boolean keepRunning) {
+    public int triggerManualQueueProcessing(final Boolean keepRunning) {
 
         int result = 0;
 
-        List<NotificationQueue> manualQueues = null;
-        if (services == null) {
-            manualQueues = new ArrayList<NotificationQueue>(queues.values());
-        } else {
-            final Joiner join = Joiner.on(",");
-            join.join(services);
-
-            log.info("Trigger manual processing for services {} ", join.toString());
-            manualQueues = new LinkedList<NotificationQueue>();
-            synchronized (queues) {
-                for (final String svc : services) {
-                    addQueuesForService(manualQueues, svc);
-                }
-            }
-        }
+        List<NotificationQueue> manualQueues = new ArrayList<NotificationQueue>(queues.values());
         for (final NotificationQueue cur : manualQueues) {
             int processedNotifications = 0;
             do {
-                processedNotifications = cur.processReadyNotification();
+                doProcessEventsWithLimit(1);
                 log.info("Got {} results from queue {}", processedNotifications, cur.getFullQName());
                 result += processedNotifications;
             } while (keepRunning && processedNotifications > 0);
@@ -130,28 +116,6 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
         return result;
     }
 
-    private void addQueuesForService(final List<NotificationQueue> result, final String svcName) {
-        for (final String cur : queues.keySet()) {
-            if (cur.startsWith(svcName)) {
-                result.add(queues.get(cur));
-            }
-        }
-    }
-
     protected abstract NotificationQueue createNotificationQueueInternal(String svcName,
-                                                                         String queueName, NotificationQueueHandler handler,
-                                                                         NotificationConfig config);
-
-    public static String getCompositeName(final String svcName, final String queueName) {
-        return svcName + ":" + queueName;
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("NotificationQueueServiceBase");
-        sb.append("{queues=").append(queues);
-        sb.append('}');
-        return sb.toString();
-    }
+                                                                         String queueName, NotificationQueueHandler handler);
 }
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index 8deea0e..2fa9baa 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -34,23 +34,25 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
 
     private final int nbThreads;
     private final Executor executor;
-    private final String svcName;
+    private final String svcQName;
     private final long sleepTimeMs;
     private boolean isProcessingEvents;
     private int curActiveThreads;
 
     protected final ObjectMapper objectMapper;
-    
-    public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
+
+
+    public PersistentQueueBase(final String svcQName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
         this.executor = executor;
         this.nbThreads = nbThreads;
-        this.svcName = svcName;
+        this.svcQName = svcQName;
         this.sleepTimeMs = config.getSleepTimeMs();
         this.objectMapper = new ObjectMapper();        
         this.isProcessingEvents = false;
         this.curActiveThreads = 0;
     }
 
+
     @Override
     public void startQueue() {
 
@@ -61,7 +63,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
         final CountDownLatch doneInitialization = new CountDownLatch(nbThreads);
 
         log.info(String.format("%s: Starting with %d threads",
-                               svcName, nbThreads));
+                               svcQName, nbThreads));
 
         for (int i = 0; i < nbThreads; i++) {
             executor.execute(new Runnable() {
@@ -69,7 +71,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
                 public void run() {
 
                     log.info(String.format("%s: Thread %s [%d] starting",
-                                           svcName,
+                                           svcQName,
                                            Thread.currentThread().getName(),
                                            Thread.currentThread().getId()));
 
@@ -93,19 +95,19 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
                                 doProcessEvents();
                             } catch (Exception e) {
                                 log.warn(String.format("%s: Thread  %s  [%d] got an exception, catching and moving on...",
-                                                       svcName,
+                                                       svcQName,
                                                        Thread.currentThread().getName(),
                                                        Thread.currentThread().getId()), e);
                             }
                             sleepALittle();
                         }
                     } catch (InterruptedException e) {
-                        log.info(String.format("%s: Thread %s got interrupted, exting... ", svcName, Thread.currentThread().getName()));
+                        log.info(String.format("%s: Thread %s got interrupted, exting... ", svcQName, Thread.currentThread().getName()));
                     } catch (Throwable e) {
-                        log.error(String.format("%s: Thread %s got an exception, exting... ", svcName, Thread.currentThread().getName()), e);
+                        log.error(String.format("%s: Thread %s got an exception, exting... ", svcQName, Thread.currentThread().getName()), e);
                     } finally {
 
-                        log.info(String.format("%s: Thread %s has exited", svcName, Thread.currentThread().getName()));
+                        log.info(String.format("%s: Thread %s has exited", svcQName, Thread.currentThread().getName()));
                         synchronized (thePersistentQ) {
                             curActiveThreads--;
                         }
@@ -121,12 +123,12 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
             final boolean success = doneInitialization.await(waitTimeoutMs, TimeUnit.MILLISECONDS);
             if (!success) {
 
-                log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+                log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcQName, (nbThreads - doneInitialization.getCount()), nbThreads));
             } else {
-                log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+                log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcQName, (nbThreads - doneInitialization.getCount()), nbThreads));
             }
         } catch (InterruptedException e) {
-            log.warn(String.format("%s: Start sequence, got interrupted", svcName));
+            log.warn(String.format("%s: Start sequence, got interrupted", svcQName));
         }
     }
 
@@ -147,17 +149,17 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
             }
 
         } catch (InterruptedException ignore) {
-            log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcName, curActiveThreads));
+            log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcQName, curActiveThreads));
         } finally {
             if (remaining > 0) {
-                log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+                log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcQName, curActiveThreads));
             } else {
-                log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+                log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcQName, curActiveThreads));
             }
             curActiveThreads = 0;
         }
     }
-    
+
     protected <T> T deserializeEvent(final String className, final String json) {
         try {
             final Class<?> claz = Class.forName(className);
@@ -168,9 +170,8 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
         }
     }
 
+    public abstract int doProcessEvents();
 
+    public abstract boolean isStarted();
 
-
-    @Override
-    public abstract int doProcessEvents();
 }
diff --git a/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java b/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
index b839468..e183bb9 100644
--- a/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
@@ -26,8 +26,5 @@ public interface QueueLifecycle {
      */
     public void stopQueue();
 
-    /**
-     * Processes event from queue
-     */
-    public int doProcessEvents();
+    public boolean isStarted();
 }
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 647fa9c..74aaee2 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -142,7 +142,7 @@ CREATE TABLE notifications (
     PRIMARY KEY(record_id)
 ) ENGINE=innodb;
 CREATE UNIQUE INDEX notifications_id ON notifications(id);
-CREATE INDEX  `idx_comp_where` ON notifications (`effective_date`, `queue_name`, `processing_state`,`processing_owner`,`processing_available_date`);
+CREATE INDEX  `idx_comp_where` ON notifications (`effective_date`, `processing_state`,`processing_owner`,`processing_available_date`);
 CREATE INDEX  `idx_update` ON notifications (`processing_state`,`processing_owner`,`processing_available_date`);
 CREATE INDEX  `idx_get_ready` ON notifications (`effective_date`,`created_date`,`id`);
 CREATE INDEX notifications_tenant_account_record_id ON notifications(tenant_record_id, account_record_id);
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 3b3aefc..e55a423 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -24,7 +24,6 @@ getReadyNotifications() ::= <<
     FORCE INDEX (idx_comp_where)
     where
       effective_date \<= :now
-      and queue_name = :queueName
       and processing_state != 'PROCESSED'
       and processing_state != 'REMOVED'
       and (processing_owner IS NULL OR processing_available_date \<= :now)
diff --git a/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java b/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
index aed75f5..93ef001 100644
--- a/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
+++ b/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
@@ -16,16 +16,24 @@
 
 package com.ning.billing.mock.glue;
 
+import org.skife.config.ConfigurationObjectFactory;
+
 import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 
 import com.google.inject.AbstractModule;
 
 public class MockNotificationQueueModule extends AbstractModule {
 
+    protected void configureNotificationQueueConfig() {
+        final NotificationQueueConfig config = new ConfigurationObjectFactory(System.getProperties()).build(NotificationQueueConfig.class);
+        bind(NotificationQueueConfig.class).toInstance(config);
+    }
     @Override
     protected void configure() {
         bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
+        configureNotificationQueueConfig();
     }
 
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 264f10c..6b07167 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -73,7 +73,7 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
 
         Thread.sleep(1000);
         final DateTime now = new DateTime();
-        final List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, "testBasic", internalCallContext);
+        final List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, internalCallContext);
         assertNotNull(notifications);
         assertEquals(notifications.size(), 1);
 
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 2ed6079..ac267a7 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -25,23 +25,38 @@ import java.util.UUID;
 
 import org.joda.time.DateTime;
 
+import com.ning.billing.util.Hostname;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
-import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
+public class MockNotificationQueue implements NotificationQueue {
+
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
+    private final String hostname;
     private final TreeSet<Notification> notifications;
+    private final Clock clock;
+    private final String svcName;
+    private final String queueName;
+    private final NotificationQueueHandler handler;
+    private final MockNotificationQueueService queueService;
+
+    private volatile boolean isStarted;
+
+    public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final MockNotificationQueueService mockNotificationQueueService) {
+
+        this.svcName = svcName;
+        this.queueName = queueName;
+        this.handler = handler;
+        this.clock = clock;
+        this.hostname = Hostname.get();
+        this.queueService = mockNotificationQueueService;
 
-    public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
-        super(clock, svcName, queueName, handler, config);
         notifications = new TreeSet<Notification>(new Comparator<Notification>() {
             @Override
             public int compare(final Notification o1, final Notification o2) {
@@ -57,7 +72,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     @Override
     public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
         final String json = objectMapper.writeValueAsString(notificationKey);
-        final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, context.getUserToken(),
+        final Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.getClass().getName(), json, context.getUserToken(),
                                                                   UUID.randomUUID(), futureNotificationTime, null, 0L);
         synchronized (notifications) {
             notifications.add(notification);
@@ -70,81 +85,112 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
         recordFutureNotification(futureNotificationTime, notificationKey, context);
     }
 
-    public List<Notification> getPendingEvents() {
-        final List<Notification> result = new ArrayList<Notification>();
 
+    @Override
+    public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
+        final List<Notification> toClearNotifications = new ArrayList<Notification>();
         for (final Notification notification : notifications) {
-            if (notification.getProcessingState() == PersistentQueueEntryLifecycleState.AVAILABLE) {
-                result.add(notification);
+            if (notification.getNotificationKey().equals(key.toString())) {
+                toClearNotifications.add(notification);
             }
         }
 
-        return result;
+        synchronized (notifications) {
+            if (toClearNotifications.size() > 0) {
+                notifications.removeAll(toClearNotifications);
+            }
+        }
     }
 
     @Override
-    public int doProcessEvents() {
-        final int result;
-        final List<Notification> processedNotifications = new ArrayList<Notification>();
-        final List<Notification> oldNotifications = new ArrayList<Notification>();
-
-        final List<Notification> readyNotifications = new ArrayList<Notification>();
+    public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
+        /*
+        final List<Notification> result = new ArrayList<Notification>();
         synchronized (notifications) {
-            for (final Notification cur : notifications) {
-                if (cur.isAvailableForProcessing(getClock().getUTCNow())) {
-                    readyNotifications.add(cur);
+            for (Notification cur : notifications) {
+                if (cur.getAccountId().equals(accountId) || cur.getEffectiveDate().compareTo(effectiveDate) == 0) {
+                    result.add(cur);
                 }
             }
         }
+        return result;
+        */
+        return null;
+    }
 
-        result = readyNotifications.size();
-        for (final Notification cur : readyNotifications) {
-            final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
-            getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getFutureUserToken(), cur.getAccountRecordId(), cur.getTenantRecordId());
-            final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
-                                                                                      "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
-                                                                                      PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
-                                                                                      cur.getNotificationKey(), cur.getUserToken(), UUID.randomUUID(), cur.getEffectiveDate(),
-                                                                                      cur.getAccountRecordId(), cur.getTenantRecordId());
-            oldNotifications.add(cur);
-            processedNotifications.add(processedNotification);
-        }
 
+    @Override
+    public void removeNotification(final UUID notificationId, final InternalCallContext context) {
         synchronized (notifications) {
-            if (oldNotifications.size() > 0) {
-                notifications.removeAll(oldNotifications);
-            }
-
-            if (processedNotifications.size() > 0) {
-                notifications.addAll(processedNotifications);
+            for (Notification cur : notifications) {
+                if (cur.getId().equals(notificationId)) {
+                    notifications.remove(cur);
+                    break;
+                }
             }
         }
 
-        return result;
     }
 
     @Override
-    public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
-        final List<Notification> toClearNotifications = new ArrayList<Notification>();
-        for (final Notification notification : notifications) {
-            if (notification.getNotificationKey().equals(key.toString())) {
-                toClearNotifications.add(notification);
-            }
-        }
+    public String getFullQName() {
+        return NotificationQueueDispatcher.getCompositeName(svcName, queueName);
+    }
 
-        synchronized (notifications) {
-            if (toClearNotifications.size() > 0) {
-                notifications.removeAll(toClearNotifications);
-            }
-        }
+    @Override
+    public String getServiceName() {
+        return svcName;
     }
 
     @Override
-    public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
-        return null;
+    public String getQueueName() {
+        return queueName;
     }
 
     @Override
-    public void removeNotification(final UUID notificationId, final InternalCallContext context) {
+    public NotificationQueueHandler getHandler() {
+        return handler;
+    }
+
+    @Override
+    public void startQueue() {
+        isStarted = true;
+        queueService.startQueue();
+    }
+
+    @Override
+    public void stopQueue() {
+        isStarted = false;
+        queueService.stopQueue();
+    }
+
+    @Override
+    public boolean isStarted() {
+        return isStarted;
+    }
+
+
+    public List<Notification> getReadyNotifications() {
+        final int result;
+        final List<Notification> processedNotifications = new ArrayList<Notification>();
+        final List<Notification> oldNotifications = new ArrayList<Notification>();
+
+        final List<Notification> readyNotifications = new ArrayList<Notification>();
+        synchronized (notifications) {
+            for (final Notification cur : notifications) {
+                if (cur.isAvailableForProcessing(clock.getUTCNow())) {
+                    readyNotifications.add(cur);
+                }
+            }
+        }
+        return readyNotifications;
     }
+
+    public void markProcessedNotifications(final List<Notification> toBeremoved, final List<Notification> toBeAdded) {
+        synchronized (notifications) {
+            notifications.removeAll(toBeremoved);
+            notifications.addAll(toBeAdded);
+        }
+    }
+
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
index ee5ca32..0284747 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
@@ -16,22 +16,65 @@
 
 package com.ning.billing.util.notificationq;
 
-import com.ning.billing.util.config.NotificationConfig;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
 import com.google.inject.Inject;
 
 public class MockNotificationQueueService extends NotificationQueueServiceBase {
 
     @Inject
-    public MockNotificationQueueService(final Clock clock) {
-        super(clock);
+    public MockNotificationQueueService(final Clock clock, final NotificationQueueConfig config) {
+        super(clock, config, null, null);
+    }
+
+
+    @Override
+    protected NotificationQueue createNotificationQueueInternal(final String svcName, final String queueName,
+                                                                final NotificationQueueHandler handler) {
+        return new MockNotificationQueue(clock, svcName, queueName, handler, this);
     }
 
+
     @Override
-    protected NotificationQueue createNotificationQueueInternal(final String svcName,
-                                                                final String queueName, final NotificationQueueHandler handler,
-                                                                final NotificationConfig config) {
-        return new MockNotificationQueue(clock, svcName, queueName, handler, config);
+    public int doProcessEvents() {
+
+        int result = 0;
+
+        for (NotificationQueue cur : queues.values()) {
+            result += doProcessEventsForQueue((MockNotificationQueue) cur);
+        }
+        return result;
+    }
+
+    private int doProcessEventsForQueue(final MockNotificationQueue queue) {
+
+
+        int result = 0;
+        final List<Notification> processedNotifications = new ArrayList<Notification>();
+        final List<Notification> oldNotifications = new ArrayList<Notification>();
+
+        List<Notification> readyNotifications = queue.getReadyNotifications();
+        for (final Notification cur : readyNotifications) {
+            final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
+            queue.getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getFutureUserToken(), cur.getAccountRecordId(), cur.getTenantRecordId());
+            final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
+                                                                                      "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
+                                                                                      PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
+                                                                                      cur.getNotificationKey(), cur.getUserToken(), cur.getFutureUserToken(), cur.getEffectiveDate(),
+                                                                                      cur.getAccountRecordId(), cur.getTenantRecordId());
+            oldNotifications.add(cur);
+            processedNotifications.add(processedNotification);
+            result++;
+        }
+
+        queue.markProcessedNotifications(oldNotifications, processedNotifications);
+        return result;
     }
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 03ea6ae..09be990 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -36,10 +36,8 @@ import org.testng.annotations.Test;
 import com.ning.billing.KillbillTestSuiteWithEmbeddedDB;
 import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
-import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
-import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
@@ -60,6 +58,7 @@ import static com.jayway.awaitility.Awaitility.await;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.testng.Assert.assertEquals;
 
+
 @Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
 public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
 
@@ -76,6 +75,9 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
     @Inject
     private Clock clock;
 
+    @Inject
+    NotificationQueueService queueService;
+
     private int eventsReceived;
 
     private static final class TestNotificationKey implements NotificationKey, Comparable<TestNotificationKey> {
@@ -96,6 +98,13 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         public int compareTo(TestNotificationKey arg0) {
             return value.compareTo(arg0.value);
         }
+
+        @Override
+        public String toString() {
+            final StringBuilder sb = new StringBuilder();
+            sb.append(value);
+            return sb.toString();
+        }
     }
 
     @BeforeSuite(groups = "slow")
@@ -123,20 +132,20 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
 
         final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
 
-        final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
-                                                                                    synchronized (expectedNotifications) {
-                                                                                        log.info("Handler received key: " + notificationKey);
-
-                                                                                        expectedNotifications.put(notificationKey, Boolean.TRUE);
-                                                                                        expectedNotifications.notify();
-                                                                                    }
-                                                                                }
-                                                                            },
-                                                                            getNotificationConfig(false, 100, 1, 10000),
-                                                                            new InternalCallContextFactory(dbi, clock));
+
+        final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+                                                                             "foo",
+                                                                             new NotificationQueueHandler() {
+                                                                                 @Override
+                                                                                 public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                     synchronized (expectedNotifications) {
+                                                                                         log.info("Handler received key: " + notificationKey);
+
+                                                                                         expectedNotifications.put(notificationKey, Boolean.TRUE);
+                                                                                         expectedNotifications.notify();
+                                                                                     }
+                                                                                 }
+                                                                             });
 
         queue.startQueue();
 
@@ -149,9 +158,13 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         expectedNotifications.put(notificationKey, Boolean.FALSE);
 
         // Insert dummy to be processed in 2 sec'
-        entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+        entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>()
+
+        {
             @Override
-            public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+            public Void inTransaction(
+                    final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws
+                                                                                               Exception {
 
                 entitySqlDaoWrapperFactory.transmogrify(DummySqlTest.class).insertDummy(obj);
                 queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, notificationKey, internalCallContext);
@@ -159,40 +172,56 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
 
                 return null;
             }
-        });
+        }
+
+                                                    );
 
         // Move time in the future after the notification effectiveDate
-        ((ClockMock) clock).setDeltaFromReality(3000);
+        ((ClockMock) clock).
+
+                                   setDeltaFromReality(3000);
 
         // Notification should have kicked but give it at least a sec' for thread scheduling
-        await().atMost(1, MINUTES).until(new Callable<Boolean>() {
-            @Override
-            public Boolean call() throws Exception {
-                return expectedNotifications.get(notificationKey);
-            }
-        });
+        await()
+
+                .
+
+                        atMost(1, MINUTES)
+
+                .
+
+                        until(new Callable<Boolean>() {
+                            @Override
+                            public Boolean call() throws
+                                                  Exception {
+                                return expectedNotifications.get(notificationKey);
+                            }
+                        }
+
+                             );
 
         queue.stopQueue();
         Assert.assertTrue(expectedNotifications.get(notificationKey));
     }
 
     @Test(groups = "slow")
-    public void testManyNotifications() throws InterruptedException {
+    public void testManyNotifications() throws Exception {
         final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
 
-        final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
-                                                                                    synchronized (expectedNotifications) {
-                                                                                        expectedNotifications.put(notificationKey, Boolean.TRUE);
-                                                                                        expectedNotifications.notify();
-                                                                                    }
-                                                                                }
-                                                                            },
-                                                                            getNotificationConfig(false, 100, 10, 10000),
-                                                                            new InternalCallContextFactory(dbi, clock));
 
+        final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+                                                                             "many",
+                                                                             new NotificationQueueHandler() {
+                                                                                 @Override
+                                                                                 public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                     synchronized (expectedNotifications) {
+                                                                                         log.info("Handler received key: " + notificationKey.toString());
+
+                                                                                         expectedNotifications.put(notificationKey, Boolean.TRUE);
+                                                                                         expectedNotifications.notify();
+                                                                                     }
+                                                                                 }
+                                                                             });
         queue.startQueue();
 
         final DateTime now = clock.getUTCNow();
@@ -205,7 +234,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
             final DummyObject obj = new DummyObject("foo", key);
             final int currentIteration = i;
 
-            final NotificationKey notificationKey = new TestNotificationKey(key.toString());
+            final NotificationKey notificationKey = new TestNotificationKey(new Integer(i).toString());
             expectedNotifications.put(notificationKey, Boolean.FALSE);
 
             entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -243,7 +272,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
                     success = true;
                     break;
                 }
-                //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+                log.info(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
                 expectedNotifications.wait(1000);
             }
         } while (nbTry-- > 0);
@@ -269,40 +298,24 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         final Map<NotificationKey, Boolean> expectedNotificationsFred = new TreeMap<NotificationKey, Boolean>();
         final Map<NotificationKey, Boolean> expectedNotificationsBarney = new TreeMap<NotificationKey, Boolean>();
 
-        final NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, new InternalCallContextFactory(dbi, clock));
 
-        final NotificationConfig config = new NotificationConfig() {
-            @Override
-            public boolean isNotificationProcessingOff() {
-                return false;
-            }
-
-            @Override
-            public long getSleepTimeMs() {
-                return 10;
-            }
-        };
-
-        final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
+        final NotificationQueue queueFred = queueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
             @Override
             public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                 log.info("Fred received key: " + notificationKey);
                 expectedNotificationsFred.put(notificationKey, Boolean.TRUE);
                 eventsReceived++;
             }
-        },
-                                                                                             config);
+        });
 
-        final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
+        final NotificationQueue queueBarney = queueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
             @Override
             public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                 log.info("Barney received key: " + notificationKey);
                 expectedNotificationsBarney.put(notificationKey, Boolean.TRUE);
                 eventsReceived++;
             }
-        },
-                                                                                               config);
-
+        });
         queueFred.startQueue();
         //		We don't start Barney so it can never pick up notifications
 
@@ -353,40 +366,26 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney));
     }
 
-    NotificationConfig getNotificationConfig(final boolean off, final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
-        return new NotificationConfig() {
-            @Override
-            public boolean isNotificationProcessingOff() {
-                return off;
-            }
-
-            @Override
-            public long getSleepTimeMs() {
-                return sleepTime;
-            }
-        };
-    }
 
     @Test(groups = "slow")
-    public void testRemoveNotifications() throws InterruptedException {
+    public void testRemoveNotifications() throws Exception {
         final UUID key = UUID.randomUUID();
         final NotificationKey notificationKey = new TestNotificationKey(key.toString());
         final UUID key2 = UUID.randomUUID();
         final NotificationKey notificationKey2 = new TestNotificationKey(key2.toString());
 
-        final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
-                                                                                    if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
-                                                                                        log.info("Received notification with key: " + notificationKey);
-                                                                                        eventsReceived++;
-                                                                                    }
-                                                                                }
-                                                                            },
-                                                                            getNotificationConfig(false, 100, 10, 10000),
-                                                                            new InternalCallContextFactory(dbi, clock));
 
+        final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+                                                                             "remove",
+                                                                             new NotificationQueueHandler() {
+                                                                                 @Override
+                                                                                 public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                     if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
+                                                                                         log.info("Received notification with key: " + notificationKey);
+                                                                                         eventsReceived++;
+                                                                                     }
+                                                                                 }
+                                                                             });
         queue.startQueue();
 
         final DateTime start = clock.getUTCNow().plusHours(1);
@@ -424,11 +423,26 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         queue.stopQueue();
     }
 
+
+    static NotificationQueueConfig getNotificationConfig(final boolean off, final long sleepTime) {
+        return new NotificationQueueConfig() {
+            @Override
+            public boolean isNotificationProcessingOff() {
+                return off;
+            }
+
+            @Override
+            public long getSleepTimeMs() {
+                return sleepTime;
+            }
+        };
+    }
+
     public static class TestNotificationQueueModule extends AbstractModule {
 
         @Override
         protected void configure() {
-            bind(Clock.class).to(ClockMock.class);
+            bind(Clock.class).to(ClockMock.class).asEagerSingleton();
 
             final MysqlTestingHelper helper = KillbillTestSuiteWithEmbeddedDB.getMysqlTestingHelper();
             bind(MysqlTestingHelper.class).toInstance(helper);
@@ -436,6 +450,8 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
             bind(IDBI.class).toInstance(dbi);
             final IDBI otherDbi = helper.getDBI();
             bind(IDBI.class).annotatedWith(Names.named("global-lock")).toInstance(otherDbi);
+            bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+            bind(NotificationQueueConfig.class).toInstance(getNotificationConfig(false, 100));
         }
     }
 }