killbill-memoizeit

analytics: start listening to future events Due to ordering

6/29/2012 7:06:50 PM

Details

diff --git a/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java b/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
index 87a7060..4daeb39 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
@@ -24,6 +24,7 @@ import com.ning.billing.account.api.AccountCreationEvent;
 import com.ning.billing.entitlement.api.timeline.RepairEntitlementEvent;
 import com.ning.billing.entitlement.api.user.EffectiveSubscriptionEvent;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import com.ning.billing.entitlement.api.user.RequestedSubscriptionEvent;
 import com.ning.billing.invoice.api.EmptyInvoiceEvent;
 import com.ning.billing.invoice.api.InvoiceCreationEvent;
 import com.ning.billing.payment.api.PaymentErrorEvent;
@@ -84,6 +85,13 @@ public class AnalyticsListener {
     }
 
     @Subscribe
+    public void handleFutureSubscriptionTransitionChange(final RequestedSubscriptionEvent eventRequested) throws AccountApiException, EntitlementUserApiException {
+        if (eventRequested.getEffectiveTransitionTime().isAfter(eventRequested.getRequestedTransitionTime())) {
+            bstRecorder.subscriptionChanged(eventRequested);
+        }
+    }
+
+    @Subscribe
     public void handleAccountCreation(final AccountCreationEvent event) {
         bacRecorder.accountCreated(event.getData());
     }
diff --git a/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java b/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java
index feaac55..1d00cf0 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java
@@ -36,7 +36,9 @@ import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.entitlement.api.user.EffectiveSubscriptionEvent;
 import com.ning.billing.entitlement.api.user.EntitlementUserApi;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import com.ning.billing.entitlement.api.user.RequestedSubscriptionEvent;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
+import com.ning.billing.entitlement.api.user.SubscriptionEvent;
 
 public class BusinessSubscriptionTransitionRecorder {
     private static final Logger log = LoggerFactory.getLogger(BusinessSubscriptionTransitionRecorder.class);
@@ -54,25 +56,27 @@ public class BusinessSubscriptionTransitionRecorder {
         this.accountApi = accountApi;
     }
 
-
     public void subscriptionCreated(final EffectiveSubscriptionEvent created) throws AccountApiException, EntitlementUserApiException {
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionCreated(created.getNextPlan(), catalogService.getFullCatalog(), created.getEffectiveTransitionTime(), created.getSubscriptionStartDate());
         recordTransition(event, created);
     }
 
-
     public void subscriptionRecreated(final EffectiveSubscriptionEvent recreated) throws AccountApiException, EntitlementUserApiException {
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionRecreated(recreated.getNextPlan(), catalogService.getFullCatalog(), recreated.getEffectiveTransitionTime(), recreated.getSubscriptionStartDate());
         recordTransition(event, recreated);
     }
 
-
     public void subscriptionCancelled(final EffectiveSubscriptionEvent cancelled) throws AccountApiException, EntitlementUserApiException {
         // cancelled.getNextPlan() is null here - need to look at the previous one to create the correct event name
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionCancelled(cancelled.getPreviousPlan(), catalogService.getFullCatalog(), cancelled.getEffectiveTransitionTime(), cancelled.getSubscriptionStartDate());
         recordTransition(event, cancelled);
     }
 
+    public void subscriptionChanged(final RequestedSubscriptionEvent changed) throws EntitlementUserApiException, AccountApiException {
+        // Future change
+        final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionChanged(changed.getNextPlan(), catalogService.getFullCatalog(), changed.getEffectiveTransitionTime(), changed.getSubscriptionStartDate());
+        recordTransition(event, changed);
+    }
 
     public void subscriptionChanged(final EffectiveSubscriptionEvent changed) throws AccountApiException, EntitlementUserApiException {
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionChanged(changed.getNextPlan(), catalogService.getFullCatalog(), changed.getEffectiveTransitionTime(), changed.getSubscriptionStartDate());
@@ -84,8 +88,7 @@ public class BusinessSubscriptionTransitionRecorder {
         recordTransition(event, phaseChanged);
     }
 
-    void recordTransition(final BusinessSubscriptionEvent event, final EffectiveSubscriptionEvent transition)
-            throws AccountApiException, EntitlementUserApiException {
+    void recordTransition(final BusinessSubscriptionEvent event, final SubscriptionEvent transition) throws AccountApiException, EntitlementUserApiException {
         Currency currency = null;
         String externalKey = null;
         String accountKey = null;
@@ -102,8 +105,8 @@ public class BusinessSubscriptionTransitionRecorder {
             }
         }
 
-        // The ISubscriptionTransition interface gives us all the prev/next information we need but the start date
-        // of the previous plan. We need to retrieve it from our own transitions table
+        // The SubscriptionEvent interface gives us all the prev/next information we need but the start date
+        // of the previous phase. We need to retrieve it from our own transitions table
         DateTime previousEffectiveTransitionTime = null;
         // For creation events, the prev subscription will always be null
         if (event.getEventType() != BusinessSubscriptionEvent.EventType.ADD) {
@@ -132,7 +135,34 @@ public class BusinessSubscriptionTransitionRecorder {
             nextSubscription = new BusinessSubscription(transition.getNextPriceList(), transition.getNextPlan(), transition.getNextPhase(), currency, transition.getEffectiveTransitionTime(), transition.getNextState(), transition.getSubscriptionId(), transition.getBundleId(), catalogService.getFullCatalog());
         }
 
-        record(transition.getSubscriptionId(), transition.getTotalOrdering(), externalKey, accountKey, transition.getRequestedTransitionTime(), event, prevSubscription, nextSubscription);
+        catchUpIfNeededAndRecord(transition.getSubscriptionId(), transition.getTotalOrdering(), externalKey, accountKey, transition.getRequestedTransitionTime(), event, prevSubscription, nextSubscription);
+    }
+
+    public void catchUpIfNeededAndRecord(final UUID subscriptionId, final Long totalOrdering, final String externalKey, final String accountKey, final DateTime requestedDateTime,
+                                         final BusinessSubscriptionEvent event, final BusinessSubscription prevSubscription, final BusinessSubscription nextSubscription) {
+        // There is no ordering guaranteed with events on the bus. This can be problematic on e.g. subscription creation:
+        // the requested future change from trial to evergreen could be received before the actual creation event.
+        // In this case, we would have two subscriptions in BST, with both null for the previous transition.
+        // To work around this, we need to update bst as we go
+        if (BusinessSubscriptionEvent.EventType.ADD.equals(event.getEventType())) {
+            final List<BusinessSubscriptionTransition> transitions = sqlDao.getTransitionForSubscription(subscriptionId.toString());
+            final BusinessSubscriptionTransition firstTransition = transitions.get(0);
+            if (firstTransition.getPreviousSubscription() == null) {
+                final BusinessSubscriptionTransition updatedFirstTransition = new BusinessSubscriptionTransition(
+                        firstTransition.getSubscriptionId(),
+                        firstTransition.getTotalOrdering(),
+                        firstTransition.getExternalKey(),
+                        firstTransition.getAccountKey(),
+                        firstTransition.getRequestedTimestamp(),
+                        firstTransition.getEvent(),
+                        nextSubscription,
+                        firstTransition.getNextSubscription()
+                );
+                sqlDao.updateTransition(updatedFirstTransition.getTotalOrdering(), updatedFirstTransition);
+            }
+        }
+
+        record(subscriptionId, totalOrdering, externalKey, accountKey, requestedDateTime, event, prevSubscription, nextSubscription);
     }
 
     // Public for internal reasons
diff --git a/analytics/src/main/java/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionSqlDao.java b/analytics/src/main/java/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionSqlDao.java
index fe90321..9e35630 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionSqlDao.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionSqlDao.java
@@ -32,9 +32,15 @@ public interface BusinessSubscriptionTransitionSqlDao {
     @SqlQuery
     List<BusinessSubscriptionTransition> getTransitions(@Bind("external_key") final String externalKey);
 
+    @SqlQuery
+    List<BusinessSubscriptionTransition> getTransitionForSubscription(@Bind("subscription_id") final String subscriptionId);
+
     @SqlUpdate
     int createTransition(@BusinessSubscriptionTransitionBinder final BusinessSubscriptionTransition transition);
 
     @SqlUpdate
+    void updateTransition(@Bind("total_ordering") long totalOrdering, @BusinessSubscriptionTransitionBinder BusinessSubscriptionTransition updatedFirstTransition);
+
+    @SqlUpdate
     void test();
 }
diff --git a/analytics/src/main/resources/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionSqlDao.sql.stg b/analytics/src/main/resources/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionSqlDao.sql.stg
index ca971e0..c22bde9 100644
--- a/analytics/src/main/resources/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionSqlDao.sql.stg
+++ b/analytics/src/main/resources/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionSqlDao.sql.stg
@@ -42,6 +42,48 @@ getTransitions(external_key) ::= <<
   ;
 >>
 
+getTransitionForSubscription(subscription_id) ::= <<
+  select
+    subscription_id
+  , total_ordering
+  , external_key
+  , account_key
+  , requested_timestamp
+  , event
+  , prev_product_name
+  , prev_product_type
+  , prev_product_category
+  , prev_slug
+  , prev_phase
+  , prev_billing_period
+  , prev_price
+  , prev_price_list
+  , prev_mrr
+  , prev_currency
+  , prev_start_date
+  , prev_state
+  , prev_subscription_id
+  , prev_bundle_id
+  , next_product_name
+  , next_product_type
+  , next_product_category
+  , next_slug
+  , next_phase
+  , next_billing_period
+  , next_price
+  , next_price_list
+  , next_mrr
+  , next_currency
+  , next_start_date
+  , next_state
+  , next_subscription_id
+  , next_bundle_id
+  from bst
+  where subscription_id =: subscription_id
+  order by requested_timestamp asc
+  ;
+>>
+
 createTransition() ::= <<
   insert ignore into bst(
     subscription_id
@@ -116,6 +158,45 @@ createTransition() ::= <<
   );
 >>
 
+updateTransition() ::= <<
+  update bst set
+    subscription_id = :subscription_id
+  , total_ordering = :total_ordering
+  , external_key = :external_key
+  , account_key = :account_key
+  , requested_timestamp = :requested_timestamp
+  , event = :event
+  , prev_product_name = :prev_product_name
+  , prev_product_type = :prev_product_type
+  , prev_product_category = :prev_product_category
+  , prev_slug = :prev_slug
+  , prev_phase = :prev_phase
+  , prev_billing_period = :prev_billing_period
+  , prev_price = :prev_price
+  , prev_price_list = :prev_price_list
+  , prev_mrr = :prev_mrr
+  , prev_currency = :prev_currency
+  , prev_start_date = :prev_start_date
+  , prev_state = :prev_state
+  , prev_subscription_id = :prev_subscription_id
+  , prev_bundle_id = :prev_bundle_id
+  , next_product_name = :next_product_name
+  , next_product_type = :next_product_type
+  , next_product_category = :next_product_category
+  , next_slug = :next_slug
+  , next_phase = :next_phase
+  , next_billing_period = :next_billing_period
+  , next_price = :next_price
+  , next_price_list = :next_price_list
+  , next_mrr = :next_mrr
+  , next_currency = :next_currency
+  , next_start_date = :next_start_date
+  , next_state = :next_state
+  , next_subscription_id = :next_subscription_id
+  , next_bundle_id = :next_bundle_id
+  where total_ordering = :total_ordering
+>>
+
 test() ::= <<
   select 1 from bst;
 >>
diff --git a/analytics/src/test/java/com/ning/billing/analytics/MockBusinessSubscriptionTransitionSqlDao.java b/analytics/src/test/java/com/ning/billing/analytics/MockBusinessSubscriptionTransitionSqlDao.java
index a442f2a..5f3a16d 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/MockBusinessSubscriptionTransitionSqlDao.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/MockBusinessSubscriptionTransitionSqlDao.java
@@ -36,6 +36,11 @@ public class MockBusinessSubscriptionTransitionSqlDao implements BusinessSubscri
     }
 
     @Override
+    public List<BusinessSubscriptionTransition> getTransitionForSubscription(@Bind("subscription_id") final String subscriptionId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public int createTransition(@BusinessSubscriptionTransitionBinder final BusinessSubscriptionTransition transition) {
         if (content.get(transition.getExternalKey()) == null) {
             content.put(transition.getExternalKey(), new ArrayList<BusinessSubscriptionTransition>());
@@ -45,6 +50,11 @@ public class MockBusinessSubscriptionTransitionSqlDao implements BusinessSubscri
     }
 
     @Override
+    public void updateTransition(@Bind("total_ordering") final long totalOrdering, @BusinessSubscriptionTransitionBinder final BusinessSubscriptionTransition updatedFirstTransition) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public void test() {
     }
 }