killbill-memoizeit

entitlement: optimize EventsStreamBuilder Speed it up

11/21/2013 10:08:01 PM

Details

diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/block/BlockingChecker.java b/entitlement/src/main/java/com/ning/billing/entitlement/block/BlockingChecker.java
index c466378..e91dc45 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/block/BlockingChecker.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/block/BlockingChecker.java
@@ -16,11 +16,13 @@
 
 package com.ning.billing.entitlement.block;
 
+import java.util.List;
 import java.util.UUID;
 
 import com.ning.billing.callcontext.InternalTenantContext;
 import com.ning.billing.entitlement.api.Blockable;
 import com.ning.billing.entitlement.api.BlockingApiException;
+import com.ning.billing.entitlement.api.BlockingState;
 import com.ning.billing.entitlement.api.BlockingStateType;
 
 public interface BlockingChecker {
@@ -33,15 +35,17 @@ public interface BlockingChecker {
     public static final Object ACTION_ENTITLEMENT = "Entitlement";
     public static final Object ACTION_BILLING = "Billing";
 
-
     public interface BlockingAggregator {
+
         public boolean isBlockChange();
+
         public boolean isBlockEntitlement();
+
         public boolean isBlockBilling();
     }
 
-    // Only throws if we can't find the blockable enties
-    public BlockingAggregator getBlockedStatus(Blockable blockable, InternalTenantContext context) throws BlockingApiException;
+    public BlockingAggregator getBlockedStatus(List<BlockingState> currentAccountEntitlementStatePerService, List<BlockingState> currentBundleEntitlementStatePerService,
+                                               List<BlockingState> currentSubscriptionEntitlementStatePerService, InternalTenantContext internalTenantContext);
 
     public BlockingAggregator getBlockedStatus(final UUID blockableId, final BlockingStateType type, final InternalTenantContext context) throws BlockingApiException;
 
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/block/DefaultBlockingChecker.java b/entitlement/src/main/java/com/ning/billing/entitlement/block/DefaultBlockingChecker.java
index 0cfe5ea..9d87e3c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/block/DefaultBlockingChecker.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/block/DefaultBlockingChecker.java
@@ -19,6 +19,8 @@ package com.ning.billing.entitlement.block;
 import java.util.List;
 import java.util.UUID;
 
+import javax.annotation.Nullable;
+
 import com.ning.billing.ErrorCode;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.callcontext.InternalTenantContext;
@@ -32,6 +34,7 @@ import com.ning.billing.subscription.api.SubscriptionBaseInternalApi;
 import com.ning.billing.subscription.api.user.SubscriptionBaseApiException;
 import com.ning.billing.subscription.api.user.SubscriptionBaseBundle;
 
+import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 
 public class DefaultBlockingChecker implements BlockingChecker {
@@ -110,7 +113,6 @@ public class DefaultBlockingChecker implements BlockingChecker {
         return result;
     }
 
-
     private DefaultBlockingAggregator getBlockedStateBundleId(final UUID bundleId, final InternalTenantContext context) throws BlockingApiException {
 
         final SubscriptionBaseBundle bundle;
@@ -122,7 +124,6 @@ public class DefaultBlockingChecker implements BlockingChecker {
         }
     }
 
-
     private DefaultBlockingAggregator getBlockedStateBundle(final SubscriptionBaseBundle bundle, final InternalTenantContext context) {
         final DefaultBlockingAggregator result = getBlockedStateAccountId(bundle.getAccountId(), context);
         final DefaultBlockingAggregator bundleState = getBlockedStateForId(bundle.getId(), BlockingStateType.SUBSCRIPTION_BUNDLE, context);
@@ -143,14 +144,21 @@ public class DefaultBlockingChecker implements BlockingChecker {
         return getBlockedStateForId(accountId, BlockingStateType.ACCOUNT, context);
     }
 
-    private DefaultBlockingAggregator getBlockedStateForId(final UUID blockableId, final BlockingStateType blockingStateType, final InternalTenantContext context) {
-        final DefaultBlockingAggregator result = new DefaultBlockingAggregator();
+    private DefaultBlockingAggregator getBlockedStateForId(@Nullable final UUID blockableId, final BlockingStateType blockingStateType, final InternalTenantContext context) {
+        // Last states across services
+        final List<BlockingState> blockableState;
         if (blockableId != null) {
-            // Last states across services
-            final List<BlockingState> blockableState = dao.getBlockingState(blockableId, blockingStateType, context);
-            for (BlockingState cur : blockableState) {
-                result.or(cur);
-            }
+            blockableState = dao.getBlockingState(blockableId, blockingStateType, context);
+        } else {
+            blockableState = ImmutableList.<BlockingState>of();
+        }
+        return getBlockedState(blockableState);
+    }
+
+    private DefaultBlockingAggregator getBlockedState(final Iterable<BlockingState> currentBlockableStatePerService) {
+        final DefaultBlockingAggregator result = new DefaultBlockingAggregator();
+        for (final BlockingState cur : currentBlockableStatePerService) {
+            result.or(cur);
         }
         return result;
     }
@@ -167,14 +175,11 @@ public class DefaultBlockingChecker implements BlockingChecker {
     }
 
     @Override
-    public BlockingAggregator getBlockedStatus(final Blockable blockable, final InternalTenantContext context) throws BlockingApiException {
-            if (blockable instanceof SubscriptionBase) {
-                return getBlockedStateSubscription((SubscriptionBase) blockable, context);
-            } else if (blockable instanceof SubscriptionBaseBundle) {
-                return getBlockedStateBundle((SubscriptionBaseBundle) blockable, context);
-            } else { //(blockable instanceof Account) {
-                return getBlockedStateAccount((Account) blockable, context);
-            }
+    public BlockingAggregator getBlockedStatus(final List<BlockingState> accountEntitlementStates, final List<BlockingState> bundleEntitlementStates, final List<BlockingState> subscriptionEntitlementStates, final InternalTenantContext internalTenantContext) {
+        final DefaultBlockingAggregator result = getBlockedState(subscriptionEntitlementStates);
+        result.or(getBlockedState(bundleEntitlementStates));
+        result.or(getBlockedState(accountEntitlementStates));
+        return result;
     }
 
     @Override
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/dao/ProxyBlockingStateDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/dao/ProxyBlockingStateDao.java
index a154d28..11df88d 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/dao/ProxyBlockingStateDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/dao/ProxyBlockingStateDao.java
@@ -218,16 +218,30 @@ public class ProxyBlockingStateDao implements BlockingStateDao {
         // Retrieve the cancellation blocking state on disk, if it exists (will be used later)
         final BlockingState cancellationBlockingStateOnDisk = findEntitlementCancellationBlockingState(blockableId, blockingStatesOnDiskCopy);
 
+        final List<EventsStream> eventsStreams;
+        try {
+            if (blockingStateType == null) {
+                // We're coming from getBlockingAllForAccountRecordId
+                eventsStreams = eventsStreamBuilder.buildForAccount(context);
+            } else {
+                // We're coming from getBlockingHistoryForService / getBlockingAll
+                eventsStreams = ImmutableList.<EventsStream>of(eventsStreamBuilder.buildForEntitlement(baseSubscriptionsToConsider.iterator().next().getId(), context));
+            }
+        } catch (EntitlementApiException e) {
+            log.error("Error computing blocking states for addons for account record id " + context.getAccountRecordId(), e);
+            throw new RuntimeException(e);
+        }
+
         // Compute the blocking states not on disk for all base subscriptions
         final DateTime now = clock.getUTCNow();
         for (final SubscriptionBase baseSubscription : baseSubscriptionsToConsider) {
-            final EventsStream eventsStream;
-            try {
-                eventsStream = eventsStreamBuilder.buildForEntitlement(baseSubscription.getId(), context);
-            } catch (EntitlementApiException e) {
-                log.error("Error computing blocking states for addons for account record id " + context.getAccountRecordId(), e);
-                throw new RuntimeException(e);
-            }
+            final EventsStream eventsStream = Iterables.<EventsStream>find(eventsStreams,
+                                                                           new Predicate<EventsStream>() {
+                                                                               @Override
+                                                                               public boolean apply(final EventsStream input) {
+                                                                                   return input.getSubscription().getId().equals(baseSubscription.getId());
+                                                                               }
+                                                                           });
 
             // First, check to see if the base entitlement is cancelled. If so, cancel the
             final Collection<BlockingState> blockingStatesNotOnDisk = eventsStream.computeAddonsBlockingStatesForFutureSubscriptionBaseEvents();
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventsStreamBuilder.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventsStreamBuilder.java
index 0c6e8eb..ef6aa70 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventsStreamBuilder.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventsStreamBuilder.java
@@ -16,12 +16,17 @@
 
 package com.ning.billing.entitlement.engine.core;
 
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
 
+import org.joda.time.DateTime;
+
 import com.ning.billing.ObjectType;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
@@ -30,7 +35,6 @@ import com.ning.billing.callcontext.InternalTenantContext;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.clock.Clock;
 import com.ning.billing.entitlement.EntitlementService;
-import com.ning.billing.entitlement.api.BlockingApiException;
 import com.ning.billing.entitlement.api.BlockingState;
 import com.ning.billing.entitlement.api.BlockingStateType;
 import com.ning.billing.entitlement.api.EntitlementApiException;
@@ -43,8 +47,10 @@ import com.ning.billing.subscription.api.user.SubscriptionBaseApiException;
 import com.ning.billing.subscription.api.user.SubscriptionBaseBundle;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.TenantContext;
+import com.ning.billing.util.dao.NonEntityDao;
 
 import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
 @Singleton
@@ -55,17 +61,19 @@ public class EventsStreamBuilder {
     private final BlockingChecker checker;
     private final BlockingStateDao blockingStateDao;
     private final Clock clock;
+    private final NonEntityDao nonEntityDao;
     private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
     public EventsStreamBuilder(final AccountInternalApi accountInternalApi, final SubscriptionBaseInternalApi subscriptionInternalApi,
                                final BlockingChecker checker, final BlockingStateDao blockingStateDao,
-                               final Clock clock, final InternalCallContextFactory internalCallContextFactory) {
+                               final Clock clock, final NonEntityDao nonEntityDao, final InternalCallContextFactory internalCallContextFactory) {
         this.accountInternalApi = accountInternalApi;
         this.subscriptionInternalApi = subscriptionInternalApi;
         this.checker = checker;
         this.blockingStateDao = blockingStateDao;
         this.clock = clock;
+        this.nonEntityDao = nonEntityDao;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
@@ -90,6 +98,86 @@ public class EventsStreamBuilder {
         return buildForEntitlement(entitlementId, internalTenantContext);
     }
 
+    public List<EventsStream> buildForAccount(final InternalTenantContext internalTenantContext) throws EntitlementApiException {
+        // Retrieve the subscriptions (map bundle id -> subscriptions)
+        final Map<UUID, List<SubscriptionBase>> subscriptions = subscriptionInternalApi.getSubscriptionsForAccount(internalTenantContext);
+        if (subscriptions.isEmpty()) {
+            // Bail early
+            return ImmutableList.<EventsStream>of();
+        }
+
+        // Retrieve the account
+        final UUID accountId = nonEntityDao.retrieveIdFromObject(internalTenantContext.getAccountRecordId(), ObjectType.ACCOUNT);
+        final Account account;
+        try {
+            account = accountInternalApi.getAccountById(accountId, internalTenantContext);
+        } catch (AccountApiException e) {
+            throw new EntitlementApiException(e);
+        }
+
+        // Retrieve the bundles
+        final List<SubscriptionBaseBundle> bundles = subscriptionInternalApi.getBundlesForAccount(accountId, internalTenantContext);
+        // Map bundle id -> bundles
+        final Map<UUID, SubscriptionBaseBundle> bundlesPerId = new HashMap<UUID, SubscriptionBaseBundle>();
+        for (final SubscriptionBaseBundle bundle : bundles) {
+            bundlesPerId.put(bundle.getId(), bundle);
+        }
+
+        // Retrieve the blocking states
+        final List<BlockingState> blockingStatesForAccount = blockingStateDao.getBlockingAllForAccountRecordId(internalTenantContext);
+        // Copy fully the list (avoid lazy loading)
+        final List<BlockingState> accountEntitlementStates = ImmutableList.<BlockingState>copyOf(Iterables.<BlockingState>filter(blockingStatesForAccount,
+                                                                                                                                 new Predicate<BlockingState>() {
+                                                                                                                                     @Override
+                                                                                                                                     public boolean apply(final BlockingState input) {
+                                                                                                                                         return BlockingStateType.ACCOUNT.equals(input.getType()) &&
+                                                                                                                                                EntitlementService.ENTITLEMENT_SERVICE_NAME.equals(input.getService()) &&
+                                                                                                                                                accountId.equals(input.getBlockedId());
+                                                                                                                                     }
+                                                                                                                                 }));
+
+        // Build the EventsStream objects
+        final List<EventsStream> results = new LinkedList<EventsStream>();
+        for (final UUID bundleId : subscriptions.keySet()) {
+            final SubscriptionBaseBundle bundle = bundlesPerId.get(bundleId);
+            final List<SubscriptionBase> allSubscriptionsForBundle = subscriptions.get(bundleId);
+            final SubscriptionBase baseSubscription = Iterables.<SubscriptionBase>tryFind(allSubscriptionsForBundle,
+                                                                                          new Predicate<SubscriptionBase>() {
+                                                                                              @Override
+                                                                                              public boolean apply(final SubscriptionBase input) {
+                                                                                                  return ProductCategory.BASE.equals(input.getLastActiveProduct().getCategory());
+                                                                                              }
+                                                                                          }).orNull();
+            // Copy fully the list (avoid lazy loading)
+            final List<BlockingState> bundleEntitlementStates = ImmutableList.<BlockingState>copyOf(Iterables.<BlockingState>filter(blockingStatesForAccount,
+                                                                                                                                    new Predicate<BlockingState>() {
+                                                                                                                                        @Override
+                                                                                                                                        public boolean apply(final BlockingState input) {
+                                                                                                                                            return BlockingStateType.SUBSCRIPTION_BUNDLE.equals(input.getType()) &&
+                                                                                                                                                   EntitlementService.ENTITLEMENT_SERVICE_NAME.equals(input.getService()) &&
+                                                                                                                                                   bundle.getId().equals(input.getBlockedId());
+                                                                                                                                        }
+                                                                                                                                    }));
+
+            for (final SubscriptionBase subscriptionBase : allSubscriptionsForBundle) {
+                // Copy fully the list (avoid lazy loading)
+                final List<BlockingState> subscriptionEntitlementStates = ImmutableList.<BlockingState>copyOf(Iterables.<BlockingState>filter(blockingStatesForAccount,
+                                                                                                                                              new Predicate<BlockingState>() {
+                                                                                                                                                  @Override
+                                                                                                                                                  public boolean apply(final BlockingState input) {
+                                                                                                                                                      return BlockingStateType.SUBSCRIPTION.equals(input.getType()) &&
+                                                                                                                                                             EntitlementService.ENTITLEMENT_SERVICE_NAME.equals(input.getService()) &&
+                                                                                                                                                             subscriptionBase.getId().equals(input.getBlockedId());
+                                                                                                                                                  }
+                                                                                                                                              }));
+
+                results.add(buildForEntitlement(account, bundle, baseSubscription, subscriptionBase, allSubscriptionsForBundle, subscriptionEntitlementStates, bundleEntitlementStates, accountEntitlementStates, internalTenantContext));
+            }
+        }
+
+        return results;
+    }
+
     public EventsStream buildForEntitlement(final UUID entitlementId, final InternalTenantContext internalTenantContext) throws EntitlementApiException {
         final SubscriptionBaseBundle bundle;
         final SubscriptionBase subscription;
@@ -125,16 +213,26 @@ public class EventsStreamBuilder {
             throw new EntitlementApiException(e);
         }
 
-        final List<BlockingState> subscriptionEntitlementStates = blockingStateDao.getBlockingHistoryForService(subscription.getId(), BlockingStateType.SUBSCRIPTION, EntitlementService.ENTITLEMENT_SERVICE_NAME, internalTenantContext);
         final List<BlockingState> bundleEntitlementStates = blockingStateDao.getBlockingHistoryForService(bundle.getId(), BlockingStateType.SUBSCRIPTION_BUNDLE, EntitlementService.ENTITLEMENT_SERVICE_NAME, internalTenantContext);
         final List<BlockingState> accountEntitlementStates = blockingStateDao.getBlockingHistoryForService(account.getId(), BlockingStateType.ACCOUNT, EntitlementService.ENTITLEMENT_SERVICE_NAME, internalTenantContext);
+        final List<BlockingState> subscriptionEntitlementStates = blockingStateDao.getBlockingHistoryForService(subscription.getId(), BlockingStateType.SUBSCRIPTION, EntitlementService.ENTITLEMENT_SERVICE_NAME, internalTenantContext);
 
-        final BlockingAggregator blockingAggregator;
-        try {
-            blockingAggregator = checker.getBlockedStatus(subscription, internalTenantContext);
-        } catch (BlockingApiException e) {
-            throw new EntitlementApiException(e);
-        }
+        return buildForEntitlement(account, bundle, baseSubscription, subscription, allSubscriptionsForBundle, subscriptionEntitlementStates, bundleEntitlementStates, accountEntitlementStates, internalTenantContext);
+    }
+
+    private EventsStream buildForEntitlement(final Account account,
+                                             final SubscriptionBaseBundle bundle,
+                                             final SubscriptionBase baseSubscription,
+                                             final SubscriptionBase subscription,
+                                             final List<SubscriptionBase> allSubscriptionsForBundle,
+                                             final List<BlockingState> subscriptionEntitlementStates,
+                                             final List<BlockingState> bundleEntitlementStates,
+                                             final List<BlockingState> accountEntitlementStates,
+                                             final InternalTenantContext internalTenantContext) throws EntitlementApiException {
+        final BlockingAggregator blockingAggregator = checker.getBlockedStatus(filterCurrentBlockableStatePerService(accountEntitlementStates),
+                                                                               filterCurrentBlockableStatePerService(bundleEntitlementStates),
+                                                                               filterCurrentBlockableStatePerService(subscriptionEntitlementStates),
+                                                                               internalTenantContext);
 
         return new EventsStream(account,
                                 bundle,
@@ -148,4 +246,22 @@ public class EventsStreamBuilder {
                                 internalTenantContext,
                                 clock.getUTCNow());
     }
+
+    private List<BlockingState> filterCurrentBlockableStatePerService(final Iterable<BlockingState> allBlockingStates) {
+        final DateTime now = clock.getUTCNow();
+
+        final Map<String, BlockingState> currentBlockingStatePerService = new HashMap<String, BlockingState>();
+        for (final BlockingState blockingState : allBlockingStates) {
+            if (blockingState.getEffectiveDate().isAfter(now)) {
+                continue;
+            }
+
+            if (currentBlockingStatePerService.get(blockingState.getService()) == null ||
+                currentBlockingStatePerService.get(blockingState.getService()).getEffectiveDate().isBefore(blockingState.getEffectiveDate())) {
+                currentBlockingStatePerService.put(blockingState.getService(), blockingState);
+            }
+        }
+
+        return ImmutableList.<BlockingState>copyOf(currentBlockingStatePerService.values());
+    }
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/block/MockBlockingChecker.java b/entitlement/src/test/java/com/ning/billing/entitlement/block/MockBlockingChecker.java
index 1a97df2..c5232db 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/block/MockBlockingChecker.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/block/MockBlockingChecker.java
@@ -16,17 +16,19 @@
 
 package com.ning.billing.entitlement.block;
 
+import java.util.List;
 import java.util.UUID;
 
 import com.ning.billing.callcontext.InternalTenantContext;
 import com.ning.billing.entitlement.api.Blockable;
 import com.ning.billing.entitlement.api.BlockingApiException;
+import com.ning.billing.entitlement.api.BlockingState;
 import com.ning.billing.entitlement.api.BlockingStateType;
 
 public class MockBlockingChecker implements BlockingChecker {
 
     @Override
-    public BlockingAggregator getBlockedStatus(final Blockable blockable, final InternalTenantContext context) throws BlockingApiException {
+    public BlockingAggregator getBlockedStatus(final List<BlockingState> accountEntitlementStates, final List<BlockingState> bundleEntitlementStates, final List<BlockingState> subscriptionEntitlementStates, final InternalTenantContext internalTenantContext) {
         return null;
     }