killbill-memoizeit

entitlement: reimplement logic to send requested events Requested

7/2/2012 10:34:00 PM

Details

diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
index cdc6797..66cf27c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
@@ -165,7 +165,7 @@ public class SubscriptionDataRepair extends SubscriptionData {
                                                                                 .setRequestedDate(now)
                                                                                 .setUserToken(context.getUserToken())
                                                                                 .setFromDisk(true));
-                repairDao.cancelSubscription(cur.getId(), cancelEvent, context, 0);
+                repairDao.cancelSubscription(cur, cancelEvent, context, 0);
                 cur.rebuildTransitions(repairDao.getEventsForSubscription(cur.getId()), catalogService.getFullCatalog());
             }
         }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultRequestedSubscriptionEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultRequestedSubscriptionEvent.java
index f03ba35..38b6b98 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultRequestedSubscriptionEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultRequestedSubscriptionEvent.java
@@ -23,6 +23,7 @@ import org.joda.time.DateTime;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.ning.billing.entitlement.api.SubscriptionTransitionType;
+import com.ning.billing.entitlement.events.EntitlementEvent;
 
 public class DefaultRequestedSubscriptionEvent extends DefaultSubscriptionEvent implements RequestedSubscriptionEvent {
     public DefaultRequestedSubscriptionEvent(final SubscriptionTransitionData in, final DateTime startDate) {
@@ -52,4 +53,9 @@ public class DefaultRequestedSubscriptionEvent extends DefaultSubscriptionEvent 
               previousPhase, previousPriceList, nextState, nextPlan, nextPhase, nextPriceList, totalOrdering, userToken,
               transitionType, remainingEventsForUserOperation, startDate);
     }
+
+    public DefaultRequestedSubscriptionEvent(final SubscriptionData subscription, final EntitlementEvent nextEvent) {
+        this(nextEvent.getId(), nextEvent.getSubscriptionId(), subscription.getBundleId(), nextEvent.getRequestedDate(), nextEvent.getEffectiveDate(),
+             null, null, null, null, null, null, null, null, nextEvent.getTotalOrdering(), null, null, 0, null);
+    }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
index 4469661..99e3e58 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
@@ -139,7 +139,7 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
                 events.add(nextPhaseEvent);
             }
             if (reCreate) {
-                dao.recreateSubscription(subscription.getId(), events, context);
+                dao.recreateSubscription(subscription, events, context);
             } else {
                 dao.createSubscription(subscription, events, context);
             }
@@ -179,7 +179,7 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
                                                                             .setUserToken(context.getUserToken())
                                                                             .setFromDisk(true));
 
-            dao.cancelSubscription(subscription.getId(), cancelEvent, context, 0);
+            dao.cancelSubscription(subscription, cancelEvent, context, 0);
             subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId()), catalogService.getFullCatalog());
             return (policy == ActionPolicy.IMMEDIATE);
         } catch (CatalogApiException e) {
@@ -213,7 +213,7 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
             uncancelEvents.add(nextPhaseEvent);
         }
 
-        dao.uncancelSubscription(subscription.getId(), uncancelEvents, context);
+        dao.uncancelSubscription(subscription, uncancelEvents, context);
         subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId()), catalogService.getFullCatalog());
 
         return true;
@@ -285,7 +285,7 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
                 changeEvents.add(nextPhaseEvent);
             }
             changeEvents.add(changeEvent);
-            dao.changePlan(subscription.getId(), changeEvents, context);
+            dao.changePlan(subscription, changeEvents, context);
             subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId()), catalogService.getFullCatalog());
             return (policy == ActionPolicy.IMMEDIATE);
         } catch (CatalogApiException e) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index a4a926e..57b21d3 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -16,15 +16,11 @@
 
 package com.ning.billing.entitlement.engine.core;
 
-
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import javax.swing.text.html.HTMLDocument.HTMLReader.IsindexAction;
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,8 +65,6 @@ import com.ning.billing.util.notificationq.NotificationQueueService.Notification
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 
 public class Engine implements EventListener, EntitlementService {
-
-
     public static final String NOTIFICATION_QUEUE_NAME = "subscription-events";
     public static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
 
@@ -81,21 +75,20 @@ public class Engine implements EventListener, EntitlementService {
     private final PlanAligner planAligner;
     private final AddonUtils addonUtils;
     private final Bus eventBus;
-
     private final EntitlementConfig config;
     private final NotificationQueueService notificationQueueService;
     private final CallContextFactory factory;
     private final SubscriptionFactory subscriptionFactory;
+
     private NotificationQueue subscriptionEventQueue;
 
     @Inject
     public Engine(final Clock clock, final EntitlementDao dao, final PlanAligner planAligner,
-            final EntitlementConfig config,
-            final AddonUtils addonUtils, final Bus eventBus,
-            final NotificationQueueService notificationQueueService,
-            final SubscriptionFactory subscriptionFactory,
-            final CallContextFactory factory) {
-        super();
+                  final EntitlementConfig config,
+                  final AddonUtils addonUtils, final Bus eventBus,
+                  final NotificationQueueService notificationQueueService,
+                  final SubscriptionFactory subscriptionFactory,
+                  final CallContextFactory factory) {
         this.clock = clock;
         this.dao = dao;
         this.planAligner = planAligner;
@@ -114,32 +107,29 @@ public class Engine implements EventListener, EntitlementService {
 
     @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
     public void initialize() {
-
         try {
-            subscriptionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
-                    NOTIFICATION_QUEUE_NAME,
-                    new NotificationQueueHandler() {
+            final NotificationQueueHandler queueHandler = new NotificationQueueHandler() {
                 @Override
                 public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime) {
-
-                    if (! (inputKey instanceof EntitlementNotificationKey)) {
+                    if (!(inputKey instanceof EntitlementNotificationKey)) {
                         log.error("Entitlement service received an unexpected event type {}" + inputKey.getClass().getName());
                         return;
                     }
-                    EntitlementNotificationKey key = (EntitlementNotificationKey) inputKey;
-                    
-                     final EntitlementEvent event = dao.getEventById(key.getEventId());
+
+                    final EntitlementNotificationKey key = (EntitlementNotificationKey) inputKey;
+                    final EntitlementEvent event = dao.getEventById(key.getEventId());
                     if (event == null) {
                         log.warn("Failed to extract event for notification key {}", inputKey);
                         return;
                     }
+
                     final UUID userToken = (event.getType() == EventType.API_USER) ? ((ApiEvent) event).getUserToken() : null;
                     final CallContext context = factory.createCallContext("SubscriptionEventQueue", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
                     processEventReady(event, key.getSeqId(), context);
                 }
-            },
-            new NotificationConfig() {
+            };
 
+            final NotificationConfig notificationConfig = new NotificationConfig() {
                 @Override
                 public long getSleepTimeMs() {
                     return config.getSleepTimeMs();
@@ -149,8 +139,12 @@ public class Engine implements EventListener, EntitlementService {
                 public boolean isNotificationProcessingOff() {
                     return config.isNotificationProcessingOff();
                 }
-            }
-            );
+            };
+
+            subscriptionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
+                                                                                      NOTIFICATION_QUEUE_NAME,
+                                                                                      queueHandler,
+                                                                                      notificationConfig);
         } catch (NotificationQueueAlreadyExists e) {
             throw new RuntimeException(e);
         }
@@ -169,12 +163,12 @@ public class Engine implements EventListener, EntitlementService {
         }
     }
 
-
     @Override
     public void processEventReady(final EntitlementEvent event, final int seqId, final CallContext context) {
         if (!event.isActive()) {
             return;
         }
+
         final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, event.getSubscriptionId());
         if (subscription == null) {
             log.warn("Failed to retrieve subscription for id %s", event.getSubscriptionId());
@@ -192,10 +186,10 @@ public class Engine implements EventListener, EntitlementService {
         int theRealSeqId = seqId;
         if (event.getType() == EventType.PHASE) {
             onPhaseEvent(subscription, context);
-        } else if (event.getType() == EventType.API_USER &&
-                subscription.getCategory() == ProductCategory.BASE) {
+        } else if (event.getType() == EventType.API_USER && subscription.getCategory() == ProductCategory.BASE) {
             theRealSeqId = onBasePlanEvent(subscription, (ApiEvent) event, context);
         }
+
         try {
             eventBus.post(subscription.getTransitionFromEvent(event, theRealSeqId));
         } catch (EventBusException e) {
@@ -203,41 +197,36 @@ public class Engine implements EventListener, EntitlementService {
         }
     }
 
-
     private void onPhaseEvent(final SubscriptionData subscription, final CallContext context) {
         try {
             final DateTime now = clock.getUTCNow();
             final TimedPhase nextTimedPhase = planAligner.getNextTimedPhase(subscription, now, now);
             final PhaseEvent nextPhaseEvent = (nextTimedPhase != null) ?
                     PhaseEventData.createNextPhaseEvent(nextTimedPhase.getPhase().getName(), subscription, now, nextTimedPhase.getStartPhase()) :
-                        null;
-                    if (nextPhaseEvent != null) {
-                        dao.createNextPhaseEvent(subscription.getId(), nextPhaseEvent, context);
-                    }
+                    null;
+            if (nextPhaseEvent != null) {
+                dao.createNextPhaseEvent(subscription, nextPhaseEvent, context);
+            }
         } catch (EntitlementError e) {
             log.error(String.format("Failed to insert next phase for subscription %s", subscription.getId()), e);
         }
     }
 
     private int onBasePlanEvent(final SubscriptionData baseSubscription, final ApiEvent event, final CallContext context) {
-
         final DateTime now = clock.getUTCNow();
-
-        final Product baseProduct = (baseSubscription.getState() == SubscriptionState.CANCELLED) ?
-                null : baseSubscription.getCurrentPlan().getProduct();
+        final Product baseProduct = (baseSubscription.getState() == SubscriptionState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
 
         final List<Subscription> subscriptions = dao.getSubscriptions(subscriptionFactory, baseSubscription.getBundleId());
 
-
         final Map<UUID, EntitlementEvent> addOnCancellations = new HashMap<UUID, EntitlementEvent>();
-
-        final Iterator<Subscription> it = subscriptions.iterator();
-        while (it.hasNext()) {
-            final SubscriptionData cur = (SubscriptionData) it.next();
+        final Map<UUID, SubscriptionData> addOnCancellationSubscriptions = new HashMap<UUID, SubscriptionData>();
+        for (final Subscription subscription : subscriptions) {
+            final SubscriptionData cur = (SubscriptionData) subscription;
             if (cur.getState() == SubscriptionState.CANCELLED ||
                     cur.getCategory() != ProductCategory.ADD_ON) {
                 continue;
             }
+
             final Plan addonCurrentPlan = cur.getCurrentPlan();
             if (baseProduct == null ||
                     addonUtils.isAddonIncluded(baseProduct, addonCurrentPlan) ||
@@ -246,23 +235,26 @@ public class Engine implements EventListener, EntitlementService {
                 // Perform AO cancellation using the effectiveDate of the BP
                 //
                 final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
-                .setSubscriptionId(cur.getId())
-                .setActiveVersion(cur.getActiveVersion())
-                .setProcessedDate(now)
-                .setEffectiveDate(event.getEffectiveDate())
-                .setRequestedDate(now)
-                .setUserToken(context.getUserToken())
-                .setFromDisk(true));
+                                                                                .setSubscriptionId(cur.getId())
+                                                                                .setActiveVersion(cur.getActiveVersion())
+                                                                                .setProcessedDate(now)
+                                                                                .setEffectiveDate(event.getEffectiveDate())
+                                                                                .setRequestedDate(now)
+                                                                                .setUserToken(context.getUserToken())
+                                                                                .setFromDisk(true));
 
                 addOnCancellations.put(cur.getId(), cancelEvent);
+                addOnCancellationSubscriptions.put(cur.getId(), cur);
             }
         }
+
         final int addOnSize = addOnCancellations.size();
         int cancelSeq = addOnSize - 1;
         for (final UUID key : addOnCancellations.keySet()) {
-            dao.cancelSubscription(key, addOnCancellations.get(key), context, cancelSeq);
+            dao.cancelSubscription(addOnCancellationSubscriptions.get(key), addOnCancellations.get(key), context, cancelSeq);
             cancelSeq--;
         }
+
         return addOnSize;
     }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
index 2d7f212..24c7cca 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
@@ -40,6 +40,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.inject.Inject;
 import com.ning.billing.ErrorCode;
+import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.entitlement.api.SubscriptionFactory;
@@ -49,6 +50,7 @@ import com.ning.billing.entitlement.api.migration.AccountMigrationData.Subscript
 import com.ning.billing.entitlement.api.timeline.DefaultRepairEntitlementEvent;
 import com.ning.billing.entitlement.api.timeline.RepairEntitlementEvent;
 import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
+import com.ning.billing.entitlement.api.user.DefaultRequestedSubscriptionEvent;
 import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
 import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
@@ -87,10 +89,11 @@ public class AuditedEntitlementDao implements EntitlementDao {
     private final NotificationQueueService notificationQueueService;
     private final AddonUtils addonUtils;
     private final Bus eventBus;
+    private final CatalogService catalogService;
 
     @Inject
     public AuditedEntitlementDao(final IDBI dbi, final Clock clock, final AddonUtils addonUtils,
-                                 final NotificationQueueService notificationQueueService, final Bus eventBus) {
+                                 final NotificationQueueService notificationQueueService, final Bus eventBus, final CatalogService catalogService) {
         this.clock = clock;
         this.subscriptionsDao = dbi.onDemand(SubscriptionSqlDao.class);
         this.eventsDao = dbi.onDemand(EntitlementEventSqlDao.class);
@@ -98,6 +101,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
         this.notificationQueueService = notificationQueueService;
         this.addonUtils = addonUtils;
         this.eventBus = eventBus;
+        this.catalogService = catalogService;
     }
 
     @Override
@@ -206,10 +210,11 @@ public class AuditedEntitlementDao implements EntitlementDao {
     }
 
     @Override
-    public void createNextPhaseEvent(final UUID subscriptionId, final EntitlementEvent nextPhase, final CallContext context) {
+    public void createNextPhaseEvent(final SubscriptionData subscription, final EntitlementEvent nextPhase, final CallContext context) {
         eventsDao.inTransaction(new Transaction<Void, EntitlementEventSqlDao>() {
             @Override
             public Void inTransaction(final EntitlementEventSqlDao transactional, final TransactionStatus status) throws Exception {
+                final UUID subscriptionId = subscription.getId();
                 cancelNextPhaseEventFromTransaction(subscriptionId, transactional, context);
                 transactional.insertEvent(nextPhase, context);
 
@@ -221,6 +226,9 @@ public class AuditedEntitlementDao implements EntitlementDao {
                                                         nextPhase.getEffectiveDate(),
                                                         new EntitlementNotificationKey(nextPhase.getId()));
 
+                // Notify the Bus of the requested change
+                notifyBusOfRequestedChange(transactional, subscription, nextPhase);
+
                 return null;
             }
         });
@@ -290,13 +298,19 @@ public class AuditedEntitlementDao implements EntitlementDao {
                 }
 
                 eventsDaoFromSameTransaction.insertAuditFromTransaction(audits, context);
+
+                // Notify the Bus of the latest requested change, if needed
+                if (initialEvents.size() > 0) {
+                    notifyBusOfRequestedChange(eventsDaoFromSameTransaction, subscription, initialEvents.get(initialEvents.size() - 1));
+                }
+
                 return null;
             }
         });
     }
 
     @Override
-    public void recreateSubscription(final UUID subscriptionId, final List<EntitlementEvent> recreateEvents, final CallContext context) {
+    public void recreateSubscription(final SubscriptionData subscription, final List<EntitlementEvent> recreateEvents, final CallContext context) {
         eventsDao.inTransaction(new Transaction<Void, EntitlementEventSqlDao>() {
             @Override
             public Void inTransaction(final EntitlementEventSqlDao transactional,
@@ -313,16 +327,21 @@ public class AuditedEntitlementDao implements EntitlementDao {
                 }
 
                 transactional.insertAuditFromTransaction(audits, context);
+
+                // Notify the Bus of the latest requested change
+                notifyBusOfRequestedChange(transactional, subscription, recreateEvents.get(recreateEvents.size() - 1));
+
                 return null;
             }
         });
     }
 
     @Override
-    public void cancelSubscription(final UUID subscriptionId, final EntitlementEvent cancelEvent, final CallContext context, final int seqId) {
+    public void cancelSubscription(final SubscriptionData subscription, final EntitlementEvent cancelEvent, final CallContext context, final int seqId) {
         eventsDao.inTransaction(new Transaction<Void, EntitlementEventSqlDao>() {
             @Override
             public Void inTransaction(final EntitlementEventSqlDao transactional, final TransactionStatus status) throws Exception {
+                final UUID subscriptionId = subscription.getId();
                 cancelNextCancelEventFromTransaction(subscriptionId, transactional, context);
                 cancelNextChangeEventFromTransaction(subscriptionId, transactional, context);
                 cancelNextPhaseEventFromTransaction(subscriptionId, transactional, context);
@@ -336,16 +355,21 @@ public class AuditedEntitlementDao implements EntitlementDao {
                 recordFutureNotificationFromTransaction(transactional,
                                                         cancelEvent.getEffectiveDate(),
                                                         new EntitlementNotificationKey(cancelEvent.getId(), seqId));
+
+                // Notify the Bus of the requested change
+                notifyBusOfRequestedChange(transactional, subscription, cancelEvent);
+
                 return null;
             }
         });
     }
 
     @Override
-    public void uncancelSubscription(final UUID subscriptionId, final List<EntitlementEvent> uncancelEvents, final CallContext context) {
+    public void uncancelSubscription(final SubscriptionData subscription, final List<EntitlementEvent> uncancelEvents, final CallContext context) {
         eventsDao.inTransaction(new Transaction<Void, EntitlementEventSqlDao>() {
             @Override
             public Void inTransaction(final EntitlementEventSqlDao transactional, final TransactionStatus status) throws Exception {
+                final UUID subscriptionId = subscription.getId();
                 EntitlementEvent cancelledEvent = null;
                 final Date now = clock.getUTCNow().toDate();
                 final List<EntitlementEvent> events = transactional.getFutureActiveEventForSubscription(subscriptionId.toString(), now);
@@ -377,17 +401,22 @@ public class AuditedEntitlementDao implements EntitlementDao {
                     }
 
                     transactional.insertAuditFromTransaction(eventAudits, context);
+
+                    // Notify the Bus of the latest requested change
+                    notifyBusOfRequestedChange(transactional, subscription, uncancelEvents.get(uncancelEvents.size() - 1));
                 }
+
                 return null;
             }
         });
     }
 
     @Override
-    public void changePlan(final UUID subscriptionId, final List<EntitlementEvent> changeEvents, final CallContext context) {
+    public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents, final CallContext context) {
         eventsDao.inTransaction(new Transaction<Void, EntitlementEventSqlDao>() {
             @Override
             public Void inTransaction(final EntitlementEventSqlDao transactional, final TransactionStatus status) throws Exception {
+                final UUID subscriptionId = subscription.getId();
                 cancelNextChangeEventFromTransaction(subscriptionId, transactional, context);
                 cancelNextPhaseEventFromTransaction(subscriptionId, transactional, context);
 
@@ -403,6 +432,11 @@ public class AuditedEntitlementDao implements EntitlementDao {
                 }
 
                 transactional.insertAuditFromTransaction(eventAudits, context);
+
+                // Notify the Bus of the latest requested change
+                final EntitlementEvent finalEvent = changeEvents.get(changeEvents.size() - 1);
+                notifyBusOfRequestedChange(transactional, subscription, finalEvent);
+
                 return null;
             }
         });
@@ -559,7 +593,6 @@ public class AuditedEntitlementDao implements EntitlementDao {
                     final SubscriptionBundleData bundleData = curBundle.getData();
 
                     for (final SubscriptionMigrationData curSubscription : curBundle.getSubscriptions()) {
-
                         final SubscriptionData subData = curSubscription.getData();
                         for (final EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
                             transactional.insertEvent(curEvent, context);
@@ -573,7 +606,12 @@ public class AuditedEntitlementDao implements EntitlementDao {
                         transSubDao.insertSubscription(subData, context);
                         recordId = transSubDao.getRecordId(subData.getId().toString());
                         audits.add(new EntityAudit(TableName.SUBSCRIPTIONS, recordId, ChangeType.INSERT));
+
+                        // Notify the Bus of the latest requested change
+                        final EntitlementEvent finalEvent = curSubscription.getInitialEvents().get(curSubscription.getInitialEvents().size() - 1);
+                        notifyBusOfRequestedChange(transactional, subData, finalEvent);
                     }
+
                     transBundleDao.insertBundle(bundleData, context);
                     recordId = transBundleDao.getRecordId(bundleData.getId().toString());
                     audits.add(new EntityAudit(TableName.BUNDLES, recordId, ChangeType.INSERT));
@@ -608,6 +646,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
                 }
 
                 try {
+                    // Note: we don't send a requested change event here, but a repair event
                     final RepairEntitlementEvent busEvent = new DefaultRepairEntitlementEvent(context.getUserToken(), accountId, bundleId, clock.getUTCNow());
                     eventBus.postFromTransaction(busEvent, transactional);
                 } catch (EventBusException e) {
@@ -641,4 +680,12 @@ public class AuditedEntitlementDao implements EntitlementDao {
             throw new RuntimeException(e);
         }
     }
+
+    private void notifyBusOfRequestedChange(final EntitlementEventSqlDao transactional, final SubscriptionData subscription, final EntitlementEvent nextEvent) {
+        try {
+            eventBus.postFromTransaction(new DefaultRequestedSubscriptionEvent(subscription, nextEvent), transactional);
+        } catch (EventBusException e) {
+            log.warn("Failed to post requested change event for subscription " + subscription.getId(), e);
+        }
+    }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
index 710391b..b85a47a 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
@@ -56,7 +56,7 @@ public interface EntitlementDao {
     public void updateChargedThroughDate(final SubscriptionData subscription, final CallContext context);
 
     // Event apis
-    public void createNextPhaseEvent(final UUID subscriptionId, final EntitlementEvent nextPhase, final CallContext context);
+    public void createNextPhaseEvent(final SubscriptionData subscription, final EntitlementEvent nextPhase, final CallContext context);
 
     public EntitlementEvent getEventById(final UUID eventId);
 
@@ -69,13 +69,13 @@ public interface EntitlementDao {
     // Subscription creation, cancellation, changePlan apis
     public void createSubscription(final SubscriptionData subscription, final List<EntitlementEvent> initialEvents, final CallContext context);
 
-    public void recreateSubscription(final UUID subscriptionId, final List<EntitlementEvent> recreateEvents, final CallContext context);
+    public void recreateSubscription(final SubscriptionData subscription, final List<EntitlementEvent> recreateEvents, final CallContext context);
 
-    public void cancelSubscription(final UUID subscriptionId, final EntitlementEvent cancelEvent, final CallContext context, final int cancelSeq);
+    public void cancelSubscription(final SubscriptionData subscription, final EntitlementEvent cancelEvent, final CallContext context, final int cancelSeq);
 
-    public void uncancelSubscription(final UUID subscriptionId, final List<EntitlementEvent> uncancelEvents, final CallContext context);
+    public void uncancelSubscription(final SubscriptionData subscription, final List<EntitlementEvent> uncancelEvents, final CallContext context);
 
-    public void changePlan(final UUID subscriptionId, final List<EntitlementEvent> changeEvents, final CallContext context);
+    public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents, final CallContext context);
 
     public void migrate(final UUID accountId, final AccountMigrationData data, final CallContext context);
 
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
index 653e1ca..ed9cf13 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
@@ -92,14 +92,15 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
     }
 
     @Override
-    public void recreateSubscription(final UUID subscriptionId,
+    public void recreateSubscription(final SubscriptionData subscription,
                                      final List<EntitlementEvent> recreateEvents, final CallContext context) {
-        addEvents(subscriptionId, recreateEvents);
+        addEvents(subscription.getId(), recreateEvents);
     }
 
     @Override
-    public void cancelSubscription(final UUID subscriptionId,
+    public void cancelSubscription(final SubscriptionData subscription,
                                    final EntitlementEvent cancelEvent, final CallContext context, final int cancelSeq) {
+        final UUID subscriptionId = subscription.getId();
         final long activeVersion = cancelEvent.getActiveVersion();
         addEvents(subscriptionId, Collections.singletonList(cancelEvent));
         final SubscriptionRepairEvent target = getRepairSubscriptionEvents(subscriptionId);
@@ -114,9 +115,9 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
     }
 
     @Override
-    public void changePlan(final UUID subscriptionId,
+    public void changePlan(final SubscriptionData subscription,
                            final List<EntitlementEvent> changeEvents, final CallContext context) {
-        addEvents(subscriptionId, changeEvents);
+        addEvents(subscription.getId(), changeEvents);
     }
 
     @Override
@@ -142,7 +143,7 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
     }
 
     @Override
-    public void uncancelSubscription(final UUID subscriptionId,
+    public void uncancelSubscription(final SubscriptionData subscription,
                                      final List<EntitlementEvent> uncancelEvents, final CallContext context) {
         throw new EntitlementError(NOT_IMPLEMENTED);
     }
@@ -204,7 +205,7 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
     }
 
     @Override
-    public void createNextPhaseEvent(final UUID subscriptionId,
+    public void createNextPhaseEvent(final SubscriptionData subscription,
                                      final EntitlementEvent nextPhase, final CallContext context) {
         throw new EntitlementError(NOT_IMPLEMENTED);
     }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index ec5c17e..06032f7 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -169,7 +169,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
     }
 
     @Override
-    public void recreateSubscription(final UUID subscriptionId,
+    public void recreateSubscription(final SubscriptionData subscription,
                                      final List<EntitlementEvent> recreateEvents, final CallContext context) {
 
         synchronized (events) {
@@ -233,9 +233,9 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
     }
 
     @Override
-    public void createNextPhaseEvent(final UUID subscriptionId, final EntitlementEvent nextPhase,
+    public void createNextPhaseEvent(final SubscriptionData subscription, final EntitlementEvent nextPhase,
                                      final CallContext context) {
-        cancelNextPhaseEvent(subscriptionId);
+        cancelNextPhaseEvent(subscription.getId());
         insertEvent(nextPhase);
     }
 
@@ -271,20 +271,20 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
     }
 
     @Override
-    public void cancelSubscription(final UUID subscriptionId, final EntitlementEvent cancelEvent,
+    public void cancelSubscription(final SubscriptionData subscription, final EntitlementEvent cancelEvent,
                                    final CallContext context, final int seqId) {
         synchronized (events) {
-            cancelNextPhaseEvent(subscriptionId);
+            cancelNextPhaseEvent(subscription.getId());
             insertEvent(cancelEvent);
         }
     }
 
     @Override
-    public void changePlan(final UUID subscriptionId, final List<EntitlementEvent> changeEvents,
+    public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents,
                            final CallContext context) {
         synchronized (events) {
-            cancelNextChangeEvent(subscriptionId);
-            cancelNextPhaseEvent(subscriptionId);
+            cancelNextChangeEvent(subscription.getId());
+            cancelNextPhaseEvent(subscription.getId());
             events.addAll(changeEvents);
             for (final EntitlementEvent cur : changeEvents) {
                 recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new EntitlementNotificationKey(cur.getId()));
@@ -348,7 +348,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
     }
 
     @Override
-    public void uncancelSubscription(final UUID subscriptionId, final List<EntitlementEvent> uncancelEvents,
+    public void uncancelSubscription(final SubscriptionData subscription, final List<EntitlementEvent> uncancelEvents,
                                      final CallContext context) {
 
         synchronized (events) {
@@ -356,7 +356,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
             final Iterator<EntitlementEvent> it = events.descendingIterator();
             while (it.hasNext()) {
                 final EntitlementEvent cur = it.next();
-                if (cur.getSubscriptionId() != subscriptionId) {
+                if (cur.getSubscriptionId() != subscription.getId()) {
                     continue;
                 }
                 if (cur.getType() == EventType.API_USER &&
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
index 929f03d..6a93d3c 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
@@ -24,6 +24,7 @@ import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
 import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
 
 import com.google.inject.Inject;
+import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.entitlement.engine.addon.AddonUtils;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.clock.Clock;
@@ -35,8 +36,8 @@ public class MockEntitlementDaoSql extends AuditedEntitlementDao implements Mock
 
     @Inject
     public MockEntitlementDaoSql(final IDBI dbi, final Clock clock, final AddonUtils addonUtils, final NotificationQueueService notificationQueueService,
-                                 final Bus eventBus) {
-        super(dbi, clock, addonUtils, notificationQueueService, eventBus);
+                                 final Bus eventBus, final CatalogService catalogService) {
+        super(dbi, clock, addonUtils, notificationQueueService, eventBus, catalogService);
         this.resetDao = dbi.onDemand(ResetSqlDao.class);
     }