killbill-memoizeit

Remove dependency on notificationQ when requests are IMM.

11/30/2012 8:27:54 PM

Details

diff --git a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
index d2c2802..c80e303 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
@@ -106,7 +106,7 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
         final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, config, internalCallContextFactory);
         final EntitlementDao entitlementDao = new DefaultEntitlementDao(dbi, clock, addonUtils, notificationQueueService, eventBus, catalogService);
         final PlanAligner planAligner = new PlanAligner(catalogService);
-        final DefaultSubscriptionApiService apiService = new DefaultSubscriptionApiService(clock, entitlementDao, catalogService, planAligner, internalCallContextFactory);
+        final DefaultSubscriptionApiService apiService = new DefaultSubscriptionApiService(clock, entitlementDao, catalogService, planAligner, addonUtils, internalCallContextFactory);
         entitlementApi = new DefaultEntitlementInternalApi(entitlementDao, apiService, clock, catalogService);
         entitlementUserApi = new DefaultEntitlementUserApi(clock, entitlementDao, catalogService, apiService, addonUtils, internalCallContextFactory);
         tagDao = new BusinessTagDao(accountTagSqlDao, invoicePaymentTagSqlDao, invoiceTagSqlDao, subscriptionTransitionTagSqlDao,
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
index 49e4e3f..db0af82 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
@@ -27,6 +27,7 @@ import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.InternalCallContext;
 
 public interface SubscriptionApiService {
 
@@ -54,4 +55,6 @@ public interface SubscriptionApiService {
     public boolean changePlanWithPolicy(SubscriptionData subscription, String productName, BillingPeriod term,
                                         String priceList, DateTime requestedDate, ActionPolicy policy, CallContext context)
             throws EntitlementUserApiException;
+
+    public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context);
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
index 1870bfe..9f5d397 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
@@ -16,12 +16,17 @@
 
 package com.ning.billing.entitlement.api.timeline;
 
+import org.joda.time.DateTime;
+
 import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
+import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.glue.DefaultEntitlementModule;
+import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
 
@@ -35,7 +40,14 @@ public class RepairSubscriptionApiService extends DefaultSubscriptionApiService 
                                         @Named(DefaultEntitlementModule.REPAIR_NAMED) final EntitlementDao dao,
                                         final CatalogService catalogService,
                                         final PlanAligner planAligner,
+                                        final AddonUtils addonUtils,
                                         final InternalCallContextFactory internalCallContextFactory) {
-        super(clock, dao, catalogService, planAligner, internalCallContextFactory);
+        super(clock, dao, catalogService, planAligner, addonUtils, internalCallContextFactory);
+    }
+
+    // Nothing to do for repair as we pass all the repair events in the stream
+    @Override
+    public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context) {
+        return 0;
     }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
index 32d968c..4272038 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 public class DefaultEffectiveSubscriptionEvent extends DefaultSubscriptionEvent implements EffectiveSubscriptionInternalEvent {
+
     public DefaultEffectiveSubscriptionEvent(final SubscriptionTransitionData in, final DateTime startDate, final Long accountRecordId, final Long tenantRecordId) {
         super(in, startDate, accountRecordId, tenantRecordId);
     }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
index 3a76c51..d60a898 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
@@ -17,6 +17,8 @@
 package com.ning.billing.entitlement.api.user;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 
@@ -37,10 +39,12 @@ import com.ning.billing.catalog.api.PlanSpecifier;
 import com.ning.billing.catalog.api.PriceList;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.api.Product;
+import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.alignment.TimedPhase;
 import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.phase.PhaseEvent;
@@ -67,18 +71,22 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
     private final EntitlementDao dao;
     private final CatalogService catalogService;
     private final PlanAligner planAligner;
+    private final AddonUtils addonUtils;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
     public DefaultSubscriptionApiService(final Clock clock, final EntitlementDao dao, final CatalogService catalogService,
-                                         final PlanAligner planAligner, final InternalCallContextFactory internalCallContextFactory) {
+                                         final PlanAligner planAligner, final AddonUtils addonUtils,
+                                         final InternalCallContextFactory internalCallContextFactory) {
         this.clock = clock;
         this.catalogService = catalogService;
         this.planAligner = planAligner;
         this.dao = dao;
+        this.addonUtils = addonUtils;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
+
     @Override
     public SubscriptionData createPlan(final SubscriptionBuilder builder, final Plan plan, final PhaseType initialPhase,
                                        final String realPriceList, final DateTime requestedDate, final DateTime effectiveDate, final DateTime processedDate,
@@ -214,6 +222,9 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
         final InternalCallContext internalCallContext = createCallContextFromBundleId(subscription.getBundleId(), context);
         dao.cancelSubscription(subscription, cancelEvent, internalCallContext, 0);
         subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId(), internalCallContext), catalogService.getFullCatalog());
+
+        cancelAddOnsIfRequired(subscription, effectiveDate, internalCallContext);
+
         return (policy == ActionPolicy.IMMEDIATE);
     }
 
@@ -353,9 +364,58 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
         dao.changePlan(subscription, changeEvents, internalCallContext);
         subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId(), internalCallContext), catalogService.getFullCatalog());
 
+        cancelAddOnsIfRequired(subscription, effectiveDate, internalCallContext);
+
         return (policy == ActionPolicy.IMMEDIATE);
     }
 
+
+    public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context) {
+
+        // If cancellation/change occur in the future, there is nothing to do
+        final DateTime now = clock.getUTCNow();
+        if (effectiveDate.compareTo(now) > 0) {
+            return 0;
+        }
+
+        final Product baseProduct = (baseSubscription.getState() == SubscriptionState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
+
+        final List<Subscription> subscriptions = dao.getSubscriptions(baseSubscription.getBundleId(), context);
+
+        final List<SubscriptionData> subscriptionsToBeCancelled = new LinkedList<SubscriptionData>();
+        final List<EntitlementEvent> cancelEvents = new LinkedList<EntitlementEvent>();
+
+        for (final Subscription subscription : subscriptions) {
+            final SubscriptionData cur = (SubscriptionData) subscription;
+            if (cur.getState() == SubscriptionState.CANCELLED ||
+                cur.getCategory() != ProductCategory.ADD_ON) {
+                continue;
+            }
+
+            final Plan addonCurrentPlan = cur.getCurrentPlan();
+            if (baseProduct == null ||
+                addonUtils.isAddonIncluded(baseProduct, addonCurrentPlan) ||
+                !addonUtils.isAddonAvailable(baseProduct, addonCurrentPlan)) {
+                //
+                // Perform AO cancellation using the effectiveDate of the BP
+                //
+                final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
+                                                                                .setSubscriptionId(cur.getId())
+                                                                                .setActiveVersion(cur.getActiveVersion())
+                                                                                .setProcessedDate(now)
+                                                                                .setEffectiveDate(effectiveDate)
+                                                                                .setRequestedDate(now)
+                                                                                .setUserToken(context.getUserToken())
+                                                                                .setFromDisk(true));
+                subscriptionsToBeCancelled.add(cur);
+                cancelEvents.add(cancelEvent);
+            }
+        }
+
+        dao.cancelSubscriptions(subscriptionsToBeCancelled, cancelEvents, context);
+        return subscriptionsToBeCancelled.size();
+    }
+
     private void validateRequestedDate(final SubscriptionData subscription, final DateTime now, final DateTime requestedDate)
             throws EntitlementUserApiException {
 
@@ -363,7 +423,7 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_FUTURE_DATE, requestedDate.toString());
         }
 
-        final SubscriptionTransition  previousTransition = subscription.getPreviousTransition();
+        final SubscriptionTransition previousTransition = subscription.getPreviousTransition();
         if (previousTransition != null && previousTransition.getEffectiveTransitionTime().isAfter(requestedDate)) {
             throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_DATE,
                                                   requestedDate.toString(), previousTransition.getEffectiveTransitionTime());
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
index 0421c1a..7d88024 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
@@ -507,8 +507,7 @@ public class SubscriptionData extends EntityBase implements Subscription {
                 "Failed to find CurrentPhaseStart id = %s", getId().toString()));
     }
 
-    public void rebuildTransitions(final List<EntitlementEvent> inputEvents,
-            final Catalog catalog) {
+    public void rebuildTransitions(final List<EntitlementEvent> inputEvents, final Catalog catalog) {
 
         if (inputEvents == null) {
             return;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 24d6975..35010b6 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -31,6 +31,7 @@ import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.alignment.PlanAligner;
 import com.ning.billing.entitlement.alignment.TimedPhase;
 import com.ning.billing.entitlement.api.EntitlementService;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
 import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
 import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
@@ -80,11 +81,14 @@ public class Engine implements EventListener, EntitlementService {
     private final NotificationQueueService notificationQueueService;
     private final InternalCallContextFactory internalCallContextFactory;
     private NotificationQueue subscriptionEventQueue;
+    private final SubscriptionApiService apiService;
 
     @Inject
     public Engine(final Clock clock, final EntitlementDao dao, final PlanAligner planAligner,
                   final AddonUtils addonUtils, final InternalBus eventBus,
-                  final NotificationQueueService notificationQueueService, final InternalCallContextFactory internalCallContextFactory) {
+                  final NotificationQueueService notificationQueueService,
+                  final InternalCallContextFactory internalCallContextFactory,
+                  final SubscriptionApiService apiService) {
         this.clock = clock;
         this.dao = dao;
         this.planAligner = planAligner;
@@ -92,6 +96,7 @@ public class Engine implements EventListener, EntitlementService {
         this.eventBus = eventBus;
         this.notificationQueueService = notificationQueueService;
         this.internalCallContextFactory = internalCallContextFactory;
+        this.apiService = apiService;
     }
 
     @Override
@@ -163,7 +168,6 @@ public class Engine implements EventListener, EntitlementService {
         //
         // Do any internal processing on that event before we send the event to the bus
         //
-
         int theRealSeqId = seqId;
         if (event.getType() == EventType.PHASE) {
             onPhaseEvent(subscription, context);
@@ -174,7 +178,7 @@ public class Engine implements EventListener, EntitlementService {
         try {
             final SubscriptionTransitionData transition = (subscription.getTransitionFromEvent(event, theRealSeqId));
             final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, subscription.getAlignStartDate(),
-                    context.getAccountRecordId(), context.getTenantRecordId());
+                                                                                                      context.getAccountRecordId(), context.getTenantRecordId());
             eventBus.post(busEvent, context);
         } catch (EventBusException e) {
             log.warn("Failed to post entitlement event " + event, e);
@@ -197,47 +201,8 @@ public class Engine implements EventListener, EntitlementService {
     }
 
     private int onBasePlanEvent(final SubscriptionData baseSubscription, final ApiEvent event, final InternalCallContext context) {
-        final DateTime now = clock.getUTCNow();
-        final Product baseProduct = (baseSubscription.getState() == SubscriptionState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
-
-        final List<Subscription> subscriptions = dao.getSubscriptions(baseSubscription.getBundleId(), context);
-
-        final Map<UUID, EntitlementEvent> addOnCancellations = new HashMap<UUID, EntitlementEvent>();
-        final Map<UUID, SubscriptionData> addOnCancellationSubscriptions = new HashMap<UUID, SubscriptionData>();
-        for (final Subscription subscription : subscriptions) {
-            final SubscriptionData cur = (SubscriptionData) subscription;
-            if (cur.getState() == SubscriptionState.CANCELLED ||
-                cur.getCategory() != ProductCategory.ADD_ON) {
-                continue;
-            }
+        return apiService.cancelAddOnsIfRequired(baseSubscription, event.getEffectiveDate(), context);
+    }
 
-            final Plan addonCurrentPlan = cur.getCurrentPlan();
-            if (baseProduct == null ||
-                addonUtils.isAddonIncluded(baseProduct, addonCurrentPlan) ||
-                !addonUtils.isAddonAvailable(baseProduct, addonCurrentPlan)) {
-                //
-                // Perform AO cancellation using the effectiveDate of the BP
-                //
-                final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
-                                                                                .setSubscriptionId(cur.getId())
-                                                                                .setActiveVersion(cur.getActiveVersion())
-                                                                                .setProcessedDate(now)
-                                                                                .setEffectiveDate(event.getEffectiveDate())
-                                                                                .setRequestedDate(now)
-                                                                                .setUserToken(context.getUserToken())
-                                                                                .setFromDisk(true));
-
-                addOnCancellations.put(cur.getId(), cancelEvent);
-                addOnCancellationSubscriptions.put(cur.getId(), cur);
-            }
-        }
 
-        final int addOnSize = addOnCancellations.size();
-        int cancelSeq = addOnSize - 1;
-        for (final UUID key : addOnCancellations.keySet()) {
-            dao.cancelSubscription(addOnCancellationSubscriptions.get(key), addOnCancellations.get(key), context, cancelSeq);
-            cancelSeq--;
-        }
-        return addOnSize;
-    }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
index 7f813f8..c62db19 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -45,12 +46,14 @@ import com.ning.billing.entitlement.api.migration.AccountMigrationData.Subscript
 import com.ning.billing.entitlement.api.timeline.DefaultRepairEntitlementEvent;
 import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
 import com.ning.billing.entitlement.api.transfer.TransferCancelData;
+import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
 import com.ning.billing.entitlement.api.user.DefaultRequestedSubscriptionEvent;
 import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.entitlement.engine.core.Engine;
 import com.ning.billing.entitlement.engine.core.EntitlementNotificationKey;
@@ -75,6 +78,7 @@ import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import com.ning.billing.util.events.EffectiveSubscriptionInternalEvent;
 import com.ning.billing.util.events.RepairEntitlementInternalEvent;
 import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
@@ -387,10 +391,10 @@ public class DefaultEntitlementDao implements EntitlementDao {
                 final EntitlementEventSqlDao eventsDaoFromSameTransaction = entitySqlDaoWrapperFactory.become(EntitlementEventSqlDao.class);
                 for (final EntitlementEvent cur : initialEvents) {
                     eventsDaoFromSameTransaction.create(new EntitlementEventModelDao(cur), context);
-                    recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                            cur.getEffectiveDate(),
-                                                            new EntitlementNotificationKey(cur.getId()),
-                                                            context);
+
+                    final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+                    recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
+
                 }
                 // Notify the Bus of the latest requested change, if needed
                 if (initialEvents.size() > 0) {
@@ -401,6 +405,8 @@ public class DefaultEntitlementDao implements EntitlementDao {
         });
     }
 
+
+
     @Override
     public void recreateSubscription(final SubscriptionData subscription, final List<EntitlementEvent> recreateEvents, final InternalCallContext context) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -410,11 +416,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
 
                 for (final EntitlementEvent cur : recreateEvents) {
                     transactional.create(new EntitlementEventModelDao(cur), context);
-                    recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                            cur.getEffectiveDate(),
-                                                            new EntitlementNotificationKey(cur.getId()),
-                                                            context);
 
+                    final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+                    recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
                 }
 
                 // Notify the Bus of the latest requested change
@@ -426,6 +430,24 @@ public class DefaultEntitlementDao implements EntitlementDao {
     }
 
     @Override
+    public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+
+        transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+            @Override
+            public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                for (int i = 0; i < subscriptions.size(); i++) {
+                    final SubscriptionData subscription = subscriptions.get(i);
+                    final EntitlementEvent cancelEvent = cancelEvents.get(i);
+                    cancelSubscriptionFromTransaction(subscription, cancelEvent, entitySqlDaoWrapperFactory, context, i);
+                }
+                return null;
+            }
+        });
+    }
+
+
+
+    @Override
     public void cancelSubscription(final SubscriptionData subscription, final EntitlementEvent cancelEvent, final InternalCallContext context, final int seqId) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
             @Override
@@ -496,10 +518,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
                 for (final EntitlementEvent cur : changeEventsTweakedWithMigrateBilling) {
 
                     transactional.create(new EntitlementEventModelDao(cur), context);
-                    recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                            cur.getEffectiveDate(),
-                                                            new EntitlementNotificationKey(cur.getId()),
-                                                            context);
+
+                    final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+                    recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
                 }
 
                 // Notify the Bus of the latest requested change
@@ -606,10 +627,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
         final UUID subscriptionId = subscription.getId();
         cancelFutureEventsFromTransaction(subscriptionId, entitySqlDaoWrapperFactory, context);
         entitySqlDaoWrapperFactory.become(EntitlementEventSqlDao.class).create(new EntitlementEventModelDao(cancelEvent), context);
-        recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
-                                                cancelEvent.getEffectiveDate(),
-                                                new EntitlementNotificationKey(cancelEvent.getId(), seqId),
-                                                context);
+
+        final boolean isBusEvent = cancelEvent.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0;
+        recordBusOrFutureNotificationFromTransaction(subscription, cancelEvent, entitySqlDaoWrapperFactory, isBusEvent, seqId, context);
 
         // Notify the Bus of the requested change
         notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, cancelEvent, context);
@@ -763,15 +783,6 @@ public class DefaultEntitlementDao implements EntitlementDao {
         return result;
     }
 
-
-    private SubscriptionData createSubscriptionForInternalUse(final Subscription shellSubscription, final List<EntitlementEvent> events) {
-        final SubscriptionData result = new SubscriptionData(new SubscriptionBuilder(((SubscriptionData) shellSubscription)), null, clock);
-        if (events.size() > 0) {
-            result.rebuildTransitions(events, catalogService.getFullCatalog());
-        }
-        return result;
-    }
-
     @Override
     public void migrate(final UUID accountId, final AccountMigrationData accountData, final InternalCallContext context) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -845,6 +856,14 @@ public class DefaultEntitlementDao implements EntitlementDao {
         });
     }
 
+    private SubscriptionData createSubscriptionForInternalUse(final Subscription shellSubscription, final List<EntitlementEvent> events) {
+        final SubscriptionData result = new SubscriptionData(new SubscriptionBuilder(((SubscriptionData) shellSubscription)), null, clock);
+        if (events.size() > 0) {
+            result.rebuildTransitions(events, catalogService.getFullCatalog());
+        }
+        return result;
+    }
+
     private Subscription getBaseSubscription(final UUID bundleId, final boolean rebuildSubscription, final InternalTenantContext context) {
         final List<Subscription> subscriptions = getSubscriptionFromBundleId(bundleId, context);
         for (final Subscription cur : subscriptions) {
@@ -855,16 +874,41 @@ public class DefaultEntitlementDao implements EntitlementDao {
         return null;
     }
 
-    private void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final DateTime effectiveDate,
-                                                         final NotificationKey notificationKey, final InternalCallContext context) {
+
+    //
+    // Either records a notfication or sends a bus event is operation is immediate
+    //
+    private void recordBusOrFutureNotificationFromTransaction(final SubscriptionData subscription, final EntitlementEvent event, final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final boolean busEvent,
+                                                              final int seqId, final InternalCallContext context) {
+        if (busEvent) {
+            notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, seqId, context);
+        } else {
+            recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
+                                                    event.getEffectiveDate(),
+                                                    new EntitlementNotificationKey(event.getId()),
+                                                    context);
+        }
+    }
+
+    //
+    // Sends bus notification for event on effecfive date-- only used for operation that happen immediately:
+    // - CREATE,
+    // - IMM CANCEL or CHANGE
+    //
+    private void notifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final SubscriptionData subscription,
+                                            final EntitlementEvent immediateEvent, final int seqId, final InternalCallContext context) {
         try {
-            final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
-                                                                                                           Engine.NOTIFICATION_QUEUE_NAME);
-            subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, null, notificationKey, context);
-        } catch (NoSuchNotificationQueue e) {
-            throw new RuntimeException(e);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+
+            final SubscriptionData upToDateSubscription = createSubscriptionWithNewEvent(subscription, immediateEvent);
+
+            final SubscriptionTransitionData transition = upToDateSubscription.getTransitionFromEvent(immediateEvent, seqId);
+            final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, upToDateSubscription.getAlignStartDate(),
+                                                                                                      context.getAccountRecordId(), context.getTenantRecordId());
+
+
+            eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory, context);
+        } catch (EventBusException e) {
+            log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
         }
     }
 
@@ -877,6 +921,18 @@ public class DefaultEntitlementDao implements EntitlementDao {
         }
     }
 
+    private void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final DateTime effectiveDate,
+                                                         final NotificationKey notificationKey, final InternalCallContext context) {
+        try {
+            final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+                                                                                                           Engine.NOTIFICATION_QUEUE_NAME);
+            subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, null, notificationKey, context);
+        } catch (NoSuchNotificationQueue e) {
+            throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
     private void migrateBundleDataFromTransaction(final BundleMigrationData bundleTransferData, final EntitlementEventSqlDao transactional,
                                                   final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final InternalCallContext context) throws EntityPersistenceException {
 
@@ -909,4 +965,20 @@ public class DefaultEntitlementDao implements EntitlementDao {
 
         transBundleDao.create(new SubscriptionBundleModelDao(bundleData), context);
     }
+
+    //
+    // Creates a copy of the existing subscriptions whose 'transitions' will reflect the new event
+    //
+    private SubscriptionData createSubscriptionWithNewEvent(final SubscriptionData subscription, EntitlementEvent newEvent) {
+
+        final SubscriptionData subscriptionWithNewEvent = new SubscriptionData(subscription, null, clock);
+        final List<EntitlementEvent> allEvents = new LinkedList<EntitlementEvent>();
+        if (subscriptionWithNewEvent.getEvents() != null) {
+            allEvents.addAll(subscriptionWithNewEvent.getEvents());
+        }
+        allEvents.add(newEvent);
+        subscriptionWithNewEvent.rebuildTransitions(allEvents, catalogService.getFullCatalog());
+        return subscriptionWithNewEvent;
+    }
+
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
index 841d78f..2b83ab2 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
@@ -78,6 +78,8 @@ public interface EntitlementDao {
 
     public void cancelSubscription(SubscriptionData subscription, EntitlementEvent cancelEvent, InternalCallContext context, int cancelSeq);
 
+    public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context);
+
     public void uncancelSubscription(SubscriptionData subscription, List<EntitlementEvent> uncancelEvents, InternalCallContext context);
 
     public void changePlan(SubscriptionData subscription, List<EntitlementEvent> changeEvents, InternalCallContext context);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
index 9093390..38695ac 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
@@ -175,6 +175,10 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
     }
 
     @Override
+    public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+    }
+
+    @Override
     public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents, final InternalCallContext context) {
         addEvents(subscription.getId(), changeEvents);
     }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index 0b39cea..6ef7523 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -280,6 +280,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
     }
 
     @Override
+    public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+        synchronized (events) {
+            for (int i = 0; i < subscriptions.size(); i++) {
+                cancelSubscription(subscriptions.get(i), cancelEvents.get(i), context, 0);
+            }
+        }
+    }
+
+    @Override
     public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents, final InternalCallContext context) {
         synchronized (events) {
             cancelNextChangeEvent(subscription.getId());