killbill-memoizeit

subscription: send effective events from transactions Relates

1/13/2016 6:34:49 PM

Details

diff --git a/subscription/src/main/java/org/killbill/billing/subscription/api/SubscriptionBaseApiService.java b/subscription/src/main/java/org/killbill/billing/subscription/api/SubscriptionBaseApiService.java
index a4fefd0..200a0fc 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/api/SubscriptionBaseApiService.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/api/SubscriptionBaseApiService.java
@@ -81,7 +81,7 @@ public interface SubscriptionBaseApiService {
                                          String priceList, List<PlanPhasePriceOverride> overrides, BillingActionPolicy policy, CallContext context)
             throws SubscriptionBaseApiException;
 
-    public int cancelAddOnsIfRequired(final Product baseProduct, final UUID bundleId, final DateTime effectiveDate, final CallContext context) throws CatalogApiException;
+    public int cancelAddOnsIfRequiredOnBasePlanEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final CallContext context) throws CatalogApiException;
 
     public PlanChangeResult getPlanChangeResult(final DefaultSubscriptionBase subscription, final String productName,
                                                 final BillingPeriod term, final String priceList, final DateTime effectiveDate, TenantContext context) throws SubscriptionBaseApiException;
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java b/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java
index cc18c43..cb7804e 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java
@@ -519,13 +519,14 @@ public class DefaultSubscriptionBaseApiService implements SubscriptionBaseApiSer
     }
 
     @Override
-    public int cancelAddOnsIfRequired(final Product baseProduct, final UUID bundleId, final DateTime effectiveDate, final CallContext context) throws CatalogApiException {
+    public int cancelAddOnsIfRequiredOnBasePlanEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final CallContext context) throws CatalogApiException {
+        final Product baseProduct = (subscription.getState() == EntitlementState.CANCELLED) ? null : subscription.getCurrentPlan().getProduct();
+
         final List<SubscriptionBaseEvent> cancelEvents = new LinkedList<SubscriptionBaseEvent>();
-        final InternalCallContext internalCallContext = createCallContextFromBundleId(bundleId, context);
-        final List<DefaultSubscriptionBase> subscriptionsToBeCancelled = computeAddOnsToCancel(cancelEvents, baseProduct, bundleId, effectiveDate, internalCallContext);
-        if (!subscriptionsToBeCancelled.isEmpty()) {
-            dao.cancelSubscriptions(subscriptionsToBeCancelled, cancelEvents, internalCallContext);
-        }
+        final InternalCallContext internalCallContext = createCallContextFromBundleId(subscription.getBundleId(), context);
+        final List<DefaultSubscriptionBase> subscriptionsToBeCancelled = computeAddOnsToCancel(cancelEvents, baseProduct, subscription.getBundleId(), event.getEffectiveDate(), internalCallContext);
+        dao.cancelSubscriptionsOnBasePlanEvent(subscription, event, subscriptionsToBeCancelled, cancelEvents, internalCallContext);
+
         return subscriptionsToBeCancelled.size();
     }
 
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/engine/core/DefaultSubscriptionBaseService.java b/subscription/src/main/java/org/killbill/billing/subscription/engine/core/DefaultSubscriptionBaseService.java
index 8b4fbe8..18dd082 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/engine/core/DefaultSubscriptionBaseService.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/engine/core/DefaultSubscriptionBaseService.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -23,10 +23,7 @@ import java.util.UUID;
 import org.joda.time.DateTime;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.catalog.api.CatalogApiException;
-import org.killbill.billing.catalog.api.Product;
 import org.killbill.billing.catalog.api.ProductCategory;
-import org.killbill.billing.entitlement.api.Entitlement.EntitlementState;
-import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
 import org.killbill.billing.platform.api.LifecycleHandlerType;
 import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
 import org.killbill.billing.subscription.alignment.PlanAligner;
@@ -41,12 +38,12 @@ import org.killbill.billing.subscription.events.SubscriptionBaseEvent;
 import org.killbill.billing.subscription.events.SubscriptionBaseEvent.EventType;
 import org.killbill.billing.subscription.events.phase.PhaseEvent;
 import org.killbill.billing.subscription.events.phase.PhaseEventData;
-import org.killbill.billing.subscription.events.user.ApiEvent;
 import org.killbill.billing.subscription.exceptions.SubscriptionBaseError;
 import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.bus.api.BusEvent;
 import org.killbill.bus.api.PersistentBus;
 import org.killbill.bus.api.PersistentBus.EventBusException;
 import org.killbill.clock.Clock;
@@ -74,9 +71,10 @@ public class DefaultSubscriptionBaseService implements EventListener, Subscripti
     private final PersistentBus eventBus;
     private final NotificationQueueService notificationQueueService;
     private final InternalCallContextFactory internalCallContextFactory;
-    private NotificationQueue subscriptionEventQueue;
     private final SubscriptionBaseApiService apiService;
 
+    private NotificationQueue subscriptionEventQueue;
+
     @Inject
     public DefaultSubscriptionBaseService(final Clock clock, final SubscriptionDao dao, final PlanAligner planAligner,
                                           final PersistentBus eventBus,
@@ -159,22 +157,24 @@ public class DefaultSubscriptionBaseService implements EventListener, Subscripti
                 return;
             }
 
-            //
-            // Do any internal processing on that event before we send the event to the bus
-            //
-            int theRealSeqId = seqId;
+            boolean eventSent = false;
             if (event.getType() == EventType.PHASE) {
-                onPhaseEvent(subscription, context);
+                eventSent = onPhaseEvent(subscription, event, context);
             } else if (event.getType() == EventType.API_USER && subscription.getCategory() == ProductCategory.BASE) {
                 final CallContext callContext = internalCallContextFactory.createCallContext(context);
-                theRealSeqId = onBasePlanEvent(subscription, (ApiEvent) event, callContext);
+                eventSent = onBasePlanEvent(subscription, event, callContext);
             }
 
-            final SubscriptionBaseTransitionData transition = (subscription.getTransitionFromEvent(event, theRealSeqId));
-            final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, subscription.getAlignStartDate(),
-                                                                                                      context.getUserToken(),
-                                                                                                      context.getAccountRecordId(), context.getTenantRecordId());
-            eventBus.post(busEvent);
+            if (!eventSent) {
+                // Methods above invoking the DAO will send this event directly from the transaction
+                final SubscriptionBaseTransitionData transition = subscription.getTransitionFromEvent(event, seqId);
+                final BusEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition,
+                                                                                subscription.getAlignStartDate(),
+                                                                                context.getUserToken(),
+                                                                                context.getAccountRecordId(),
+                                                                                context.getTenantRecordId());
+                eventBus.post(busEvent);
+            }
         } catch (final EventBusException e) {
             log.warn("Failed to post subscription event " + event, e);
         } catch (final CatalogApiException e) {
@@ -182,7 +182,7 @@ public class DefaultSubscriptionBaseService implements EventListener, Subscripti
         }
     }
 
-    private void onPhaseEvent(final DefaultSubscriptionBase subscription, final InternalCallContext context) {
+    private boolean onPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent readyPhaseEvent, final InternalCallContext context) {
         try {
             final DateTime now = clock.getUTCNow();
             final TimedPhase nextTimedPhase = planAligner.getNextTimedPhase(subscription, now, context);
@@ -191,15 +191,18 @@ public class DefaultSubscriptionBaseService implements EventListener, Subscripti
                                                                                   nextTimedPhase.getPhase().getName(), nextTimedPhase.getStartPhase()) :
                                               null;
             if (nextPhaseEvent != null) {
-                dao.createNextPhaseEvent(subscription, nextPhaseEvent, context);
+                dao.createNextPhaseEvent(subscription, readyPhaseEvent, nextPhaseEvent, context);
+                return true;
             }
         } catch (final SubscriptionBaseError e) {
             log.error(String.format("Failed to insert next phase for subscription %s", subscription.getId()), e);
         }
+
+        return false;
     }
 
-    private int onBasePlanEvent(final DefaultSubscriptionBase baseSubscription, final ApiEvent event, final CallContext context) throws CatalogApiException {
-        final Product baseProduct = (baseSubscription.getState() == EntitlementState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
-        return apiService.cancelAddOnsIfRequired(baseProduct, baseSubscription.getBundleId(), event.getEffectiveDate(), context);
+    private boolean onBasePlanEvent(final DefaultSubscriptionBase baseSubscription, final SubscriptionBaseEvent event, final CallContext context) throws CatalogApiException {
+        apiService.cancelAddOnsIfRequiredOnBasePlanEvent(baseSubscription, event, context);
+        return true;
     }
 }
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java b/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java
index b6768f5..1e6c118 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java
@@ -45,7 +45,6 @@ import org.killbill.billing.catalog.api.Plan;
 import org.killbill.billing.catalog.api.ProductCategory;
 import org.killbill.billing.entitlement.api.SubscriptionApiException;
 import org.killbill.billing.entity.EntityPersistenceException;
-import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
 import org.killbill.billing.subscription.api.SubscriptionBase;
 import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
 import org.killbill.billing.subscription.api.migration.AccountMigrationData;
@@ -86,6 +85,7 @@ import org.killbill.billing.util.entity.dao.EntityDaoBase;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import org.killbill.bus.api.BusEvent;
 import org.killbill.bus.api.PersistentBus;
 import org.killbill.bus.api.PersistentBus.EventBusException;
 import org.killbill.clock.Clock;
@@ -405,20 +405,21 @@ public class DefaultSubscriptionDao extends EntityDaoBase<SubscriptionBundleMode
     }
 
     @Override
-    public void createNextPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent nextPhase, final InternalCallContext context) {
+    public void createNextPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent readyPhaseEvent, final SubscriptionBaseEvent nextPhaseEvent, final InternalCallContext context) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
             @Override
             public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
                 final SubscriptionEventSqlDao transactional = entitySqlDaoWrapperFactory.become(SubscriptionEventSqlDao.class);
                 final UUID subscriptionId = subscription.getId();
                 cancelNextPhaseEventFromTransaction(subscriptionId, entitySqlDaoWrapperFactory, context);
-                transactional.create(new SubscriptionEventModelDao(nextPhase), context);
+                transactional.create(new SubscriptionEventModelDao(nextPhaseEvent), context);
                 recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                        nextPhase.getEffectiveDate(),
-                                                        new SubscriptionNotificationKey(nextPhase.getId()), context);
+                                                        nextPhaseEvent.getEffectiveDate(),
+                                                        new SubscriptionNotificationKey(nextPhaseEvent.getId()), context);
 
-                // Notify the Bus of the requested change
-                notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, nextPhase, SubscriptionBaseTransitionType.PHASE, context);
+                // Notify the Bus
+                notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, nextPhaseEvent, SubscriptionBaseTransitionType.PHASE, context);
+                notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, readyPhaseEvent, 0, context);
 
                 return null;
             }
@@ -561,6 +562,19 @@ public class DefaultSubscriptionDao extends EntityDaoBase<SubscriptionBundleMode
     }
 
     @Override
+    public void cancelSubscriptionsOnBasePlanEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final List<DefaultSubscriptionBase> subscriptions, final List<SubscriptionBaseEvent> cancelEvents, final InternalCallContext context) {
+        transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+            @Override
+            public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
+                cancelSubscriptionsFromTransaction(entitySqlDaoWrapperFactory, subscriptions, cancelEvents, context);
+                // Make sure to always send the event, even if there were no subscriptions to cancel
+                notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, subscriptions.size(), context);
+                return null;
+            }
+        });
+    }
+
+    @Override
     public void cancelSubscriptions(final List<DefaultSubscriptionBase> subscriptions, final List<SubscriptionBaseEvent> cancelEvents, final InternalCallContext context) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
             @Override
@@ -1046,12 +1060,12 @@ public class DefaultSubscriptionDao extends EntityDaoBase<SubscriptionBundleMode
     }
 
     //
-    // Either records a notfication or sends a bus event is operation is immediate
+    // Either records a notification or sends a bus event if operation is immediate
     //
     private void recordBusOrFutureNotificationFromTransaction(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final boolean busEvent,
                                                               final int seqId, final InternalCallContext context) {
         if (busEvent) {
-            notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, seqId, context);
+            rebuildSubscriptionAndNotifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, seqId, context);
         } else {
             recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
                                                     event.getEffectiveDate(),
@@ -1060,25 +1074,29 @@ public class DefaultSubscriptionDao extends EntityDaoBase<SubscriptionBundleMode
         }
     }
 
-    //
-    // Sends bus notification for event on effecfive date-- only used for operation that happen immediately:
-    // - CREATE,
-    // - IMM CANCEL or CHANGE
-    //
-    private void notifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final DefaultSubscriptionBase subscription,
-                                                     final SubscriptionBaseEvent immediateEvent, final int seqId, final InternalCallContext context) {
+    // Sends bus notification for event on effective date -- only used for operation that happen immediately
+    private void rebuildSubscriptionAndNotifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final DefaultSubscriptionBase subscription,
+                                                                           final SubscriptionBaseEvent immediateEvent, final int seqId, final InternalCallContext context) {
         try {
             final DefaultSubscriptionBase upToDateSubscription = createSubscriptionWithNewEvent(subscription, immediateEvent, context);
+            notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, upToDateSubscription, immediateEvent, seqId, context);
+        } catch (final CatalogApiException e) {
+            log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
+        }
+    }
 
-            final SubscriptionBaseTransitionData transition = upToDateSubscription.getTransitionFromEvent(immediateEvent, seqId);
-            final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, upToDateSubscription.getAlignStartDate(),
-                                                                                                      context.getUserToken(),
-                                                                                                      context.getAccountRecordId(), context.getTenantRecordId());
+    private void notifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final DefaultSubscriptionBase subscription,
+                                                     final SubscriptionBaseEvent immediateEvent, final int seqId, final InternalCallContext context) {
+        try {
+            final SubscriptionBaseTransitionData transition = subscription.getTransitionFromEvent(immediateEvent, seqId);
+            final BusEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition,
+                                                                            subscription.getAlignStartDate(),
+                                                                            context.getUserToken(),
+                                                                            context.getAccountRecordId(),
+                                                                            context.getTenantRecordId());
 
             eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory.getHandle().getConnection());
-        } catch (EventBusException e) {
-            log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
-        } catch (CatalogApiException e) {
+        } catch (final EventBusException e) {
             log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
         }
     }
@@ -1087,7 +1105,7 @@ public class DefaultSubscriptionDao extends EntityDaoBase<SubscriptionBundleMode
                                             final SubscriptionBaseEvent nextEvent, final SubscriptionBaseTransitionType transitionType, final InternalCallContext context) {
         try {
             eventBus.postFromTransaction(new DefaultRequestedSubscriptionEvent(subscription, nextEvent, transitionType, context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken()), entitySqlDaoWrapperFactory.getHandle().getConnection());
-        } catch (EventBusException e) {
+        } catch (final EventBusException e) {
             log.warn("Failed to post requested change event for subscription " + subscription.getId(), e);
         }
     }
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/SubscriptionDao.java b/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/SubscriptionDao.java
index 12178d8..487fae3 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/SubscriptionDao.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/SubscriptionDao.java
@@ -69,7 +69,7 @@ public interface SubscriptionDao extends EntityDao<SubscriptionBundleModelDao, S
     public void updateChargedThroughDate(DefaultSubscriptionBase subscription, InternalCallContext context);
 
     // Event apis
-    public void createNextPhaseEvent(DefaultSubscriptionBase subscription, SubscriptionBaseEvent nextPhase, InternalCallContext context);
+    public void createNextPhaseEvent(DefaultSubscriptionBase subscription, SubscriptionBaseEvent readyPhaseEvent, SubscriptionBaseEvent nextPhase, InternalCallContext context);
 
     public SubscriptionBaseEvent getEventById(UUID eventId, InternalTenantContext context);
 
@@ -86,6 +86,8 @@ public interface SubscriptionDao extends EntityDao<SubscriptionBundleModelDao, S
 
     public void recreateSubscription(DefaultSubscriptionBase subscription, List<SubscriptionBaseEvent> recreateEvents, InternalCallContext context);
 
+    public void cancelSubscriptionsOnBasePlanEvent(DefaultSubscriptionBase subscription, SubscriptionBaseEvent event, List<DefaultSubscriptionBase> subscriptions, List<SubscriptionBaseEvent> cancelEvents, InternalCallContext context);
+
     public void cancelSubscriptions(List<DefaultSubscriptionBase> subscriptions, List<SubscriptionBaseEvent> cancelEvents, InternalCallContext context);
 
     public void uncancelSubscription(DefaultSubscriptionBase subscription, List<SubscriptionBaseEvent> uncancelEvents, InternalCallContext context);
diff --git a/subscription/src/test/java/org/killbill/billing/subscription/engine/dao/MockSubscriptionDaoMemory.java b/subscription/src/test/java/org/killbill/billing/subscription/engine/dao/MockSubscriptionDaoMemory.java
index 57113b5..f7947f5 100644
--- a/subscription/src/test/java/org/killbill/billing/subscription/engine/dao/MockSubscriptionDaoMemory.java
+++ b/subscription/src/test/java/org/killbill/billing/subscription/engine/dao/MockSubscriptionDaoMemory.java
@@ -42,9 +42,11 @@ import org.killbill.billing.subscription.api.migration.AccountMigrationData;
 import org.killbill.billing.subscription.api.migration.AccountMigrationData.BundleMigrationData;
 import org.killbill.billing.subscription.api.migration.AccountMigrationData.SubscriptionMigrationData;
 import org.killbill.billing.subscription.api.transfer.TransferCancelData;
+import org.killbill.billing.subscription.api.user.DefaultEffectiveSubscriptionEvent;
 import org.killbill.billing.subscription.api.user.DefaultSubscriptionBase;
 import org.killbill.billing.subscription.api.user.DefaultSubscriptionBaseBundle;
 import org.killbill.billing.subscription.api.user.SubscriptionBaseBundle;
+import org.killbill.billing.subscription.api.user.SubscriptionBaseTransitionData;
 import org.killbill.billing.subscription.api.user.SubscriptionBuilder;
 import org.killbill.billing.subscription.engine.core.DefaultSubscriptionBaseService;
 import org.killbill.billing.subscription.engine.core.SubscriptionNotificationKey;
@@ -57,6 +59,9 @@ import org.killbill.billing.util.entity.DefaultPagination;
 import org.killbill.billing.util.entity.Pagination;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import org.killbill.billing.util.entity.dao.MockEntityDaoBase;
+import org.killbill.bus.api.BusEvent;
+import org.killbill.bus.api.PersistentBus;
+import org.killbill.bus.api.PersistentBus.EventBusException;
 import org.killbill.clock.Clock;
 import org.killbill.notificationq.api.NotificationEvent;
 import org.killbill.notificationq.api.NotificationQueue;
@@ -78,18 +83,21 @@ public class MockSubscriptionDaoMemory extends MockEntityDaoBase<SubscriptionBun
     private final MockNonEntityDao mockNonEntityDao;
     private final Clock clock;
     private final NotificationQueueService notificationQueueService;
+    private final PersistentBus eventBus;
     private final CatalogService catalogService;
 
     @Inject
     public MockSubscriptionDaoMemory(final MockNonEntityDao mockNonEntityDao,
                                      final Clock clock,
                                      final NotificationQueueService notificationQueueService,
+                                     final PersistentBus eventBus,
                                      final CatalogService catalogService) {
         super();
         this.mockNonEntityDao = mockNonEntityDao;
         this.clock = clock;
         this.catalogService = catalogService;
         this.notificationQueueService = notificationQueueService;
+        this.eventBus = eventBus;
         this.bundles = new ArrayList<SubscriptionBaseBundle>();
         this.subscriptions = new ArrayList<SubscriptionBase>();
         this.events = new TreeSet<SubscriptionBaseEvent>();
@@ -305,9 +313,10 @@ public class MockSubscriptionDaoMemory extends MockEntityDaoBase<SubscriptionBun
     }
 
     @Override
-    public void createNextPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent nextPhase, final InternalCallContext context) {
+    public void createNextPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent readyPhaseEvent, final SubscriptionBaseEvent nextPhase, final InternalCallContext context) {
         cancelNextPhaseEvent(subscription.getId(), context);
         insertEvent(nextPhase, context);
+        notifyBusOfEffectiveImmediateChange(subscription, readyPhaseEvent, 0, context);
     }
 
     private SubscriptionBase buildSubscription(final DefaultSubscriptionBase in, final InternalTenantContext context) {
@@ -341,6 +350,12 @@ public class MockSubscriptionDaoMemory extends MockEntityDaoBase<SubscriptionBun
     }
 
     @Override
+    public void cancelSubscriptionsOnBasePlanEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final List<DefaultSubscriptionBase> subscriptions, final List<SubscriptionBaseEvent> cancelEvents, final InternalCallContext context) {
+        cancelSubscriptions(subscriptions, cancelEvents, context);
+        notifyBusOfEffectiveImmediateChange(subscription, event, subscriptions.size(), context);
+    }
+
+    @Override
     public void cancelSubscriptions(final List<DefaultSubscriptionBase> subscriptions, final List<SubscriptionBaseEvent> cancelEvents, final InternalCallContext context) {
         synchronized (events) {
             for (int i = 0; i < subscriptions.size(); i++) {
@@ -493,6 +508,21 @@ public class MockSubscriptionDaoMemory extends MockEntityDaoBase<SubscriptionBun
         }
     }
 
+    private void notifyBusOfEffectiveImmediateChange(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent immediateEvent, final int seqId, final InternalCallContext context) {
+        try {
+            final SubscriptionBaseTransitionData transition = subscription.getTransitionFromEvent(immediateEvent, seqId);
+            final BusEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition,
+                                                                            subscription.getAlignStartDate(),
+                                                                            context.getUserToken(),
+                                                                            context.getAccountRecordId(),
+                                                                            context.getTenantRecordId());
+
+            eventBus.post(busEvent);
+        } catch (final EventBusException e) {
+            log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
+        }
+    }
+
     @Override
     public Iterable<SubscriptionBaseEvent> getFutureEventsForAccount(final InternalTenantContext context) {
         return null;