killbill-memoizeit

analytics: proposal on how to speed up bst and bii* refreshes Signed-off-by:

4/30/2013 4:25:16 PM

Details

diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsRefreshException.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsRefreshException.java
index 5a54d5a..b3ab241 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsRefreshException.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsRefreshException.java
@@ -16,14 +16,12 @@
 
 package com.ning.billing.osgi.bundles.analytics;
 
-import com.ning.billing.BillingExceptionBase;
-
 public class AnalyticsRefreshException extends Exception {
 
     public AnalyticsRefreshException() {
     }
 
-    public AnalyticsRefreshException(final BillingExceptionBase e) {
+    public AnalyticsRefreshException(final Exception e) {
         super(e);
     }
 
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessFieldFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessFieldFactory.java
index ce8adab..9a8c0de 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessFieldFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessFieldFactory.java
@@ -48,6 +48,7 @@ public class BusinessFieldFactory extends BusinessFactoryBase {
         final Collection<CustomField> fields = getFieldsForAccount(account.getId(), context);
 
         final Collection<BusinessFieldModelDao> fieldModelDaos = new LinkedList<BusinessFieldModelDao>();
+        // We process custom fields sequentially: in practice, an account will be associated with a dozen fields at most
         for (final CustomField field : fields) {
             final Long customFieldRecordId = getFieldRecordId(field.getId(), context);
             final AuditLog creationAuditLog = getFieldCreationAuditLog(field.getId(), context);
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java
index 5dfcc22..5f85d6b 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java
@@ -21,6 +21,12 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
 
 import javax.annotation.Nullable;
 
@@ -59,9 +65,12 @@ import static com.ning.billing.osgi.bundles.analytics.utils.BusinessInvoiceUtils
 
 public class BusinessInvoiceFactory extends BusinessFactoryBase {
 
+    private final Executor executor;
+
     public BusinessInvoiceFactory(final OSGIKillbillLogService logService,
                                   final OSGIKillbillAPI osgiKillbillAPI) {
         super(logService, osgiKillbillAPI);
+        executor = Executors.newFixedThreadPool(20);
     }
 
     /**
@@ -93,26 +102,33 @@ public class BusinessInvoiceFactory extends BusinessFactoryBase {
         }
 
         // Create the business invoice items
+        final CompletionService<BusinessInvoiceItemBaseModelDao> completionService = new ExecutorCompletionService<BusinessInvoiceItemBaseModelDao>(executor);
         final Multimap<UUID, BusinessInvoiceItemBaseModelDao> businessInvoiceItemsForInvoiceId = ArrayListMultimap.<UUID, BusinessInvoiceItemBaseModelDao>create();
         for (final InvoiceItem invoiceItem : allInvoiceItems.values()) {
-            final Invoice invoice = invoiceIdToInvoiceMappings.get(invoiceItem.getInvoiceId());
-            final Collection<InvoiceItem> otherInvoiceItems = Collections2.filter(allInvoiceItems.values(),
-                                                                                  new Predicate<InvoiceItem>() {
-                                                                                      @Override
-                                                                                      public boolean apply(final InvoiceItem input) {
-                                                                                          return input.getId() != null && !input.getId().equals(invoiceItem.getId());
-                                                                                      }
-                                                                                  });
-            final BusinessInvoiceItemBaseModelDao businessInvoiceItem = createBusinessInvoiceItem(account,
-                                                                                                  invoice,
-                                                                                                  invoiceItem,
-                                                                                                  otherInvoiceItems,
-                                                                                                  accountRecordId,
-                                                                                                  tenantRecordId,
-                                                                                                  reportGroup,
-                                                                                                  context);
-            if (businessInvoiceItem != null) {
-                businessInvoiceItemsForInvoiceId.get(invoice.getId()).add(businessInvoiceItem);
+            completionService.submit(new Callable<BusinessInvoiceItemBaseModelDao>() {
+                @Override
+                public BusinessInvoiceItemBaseModelDao call() throws Exception {
+                    return createBusinessInvoiceItem(invoiceItem,
+                                                     allInvoiceItems,
+                                                     invoiceIdToInvoiceMappings,
+                                                     account,
+                                                     accountRecordId,
+                                                     tenantRecordId,
+                                                     reportGroup,
+                                                     context);
+                }
+            });
+        }
+        for (int i = 0; i < allInvoiceItems.values().size(); ++i) {
+            try {
+                final BusinessInvoiceItemBaseModelDao businessInvoiceItemModelDao = completionService.take().get();
+                if (businessInvoiceItemModelDao != null) {
+                    businessInvoiceItemsForInvoiceId.get(businessInvoiceItemModelDao.getInvoiceId()).add(businessInvoiceItemModelDao);
+                }
+            } catch (InterruptedException e) {
+                throw new AnalyticsRefreshException(e);
+            } catch (ExecutionException e) {
+                throw new AnalyticsRefreshException(e);
             }
         }
 
@@ -136,6 +152,32 @@ public class BusinessInvoiceFactory extends BusinessFactoryBase {
         return businessRecords;
     }
 
+    private BusinessInvoiceItemBaseModelDao createBusinessInvoiceItem(final InvoiceItem invoiceItem,
+                                                                      final Multimap<UUID, InvoiceItem> allInvoiceItems,
+                                                                      final Map<UUID, Invoice> invoiceIdToInvoiceMappings,
+                                                                      final Account account,
+                                                                      final Long accountRecordId,
+                                                                      final Long tenantRecordId,
+                                                                      final ReportGroup reportGroup,
+                                                                      final CallContext context) throws AnalyticsRefreshException {
+        final Invoice invoice = invoiceIdToInvoiceMappings.get(invoiceItem.getInvoiceId());
+        final Collection<InvoiceItem> otherInvoiceItems = Collections2.filter(allInvoiceItems.values(),
+                                                                              new Predicate<InvoiceItem>() {
+                                                                                  @Override
+                                                                                  public boolean apply(final InvoiceItem input) {
+                                                                                      return input.getId() != null && !input.getId().equals(invoiceItem.getId());
+                                                                                  }
+                                                                              });
+        return createBusinessInvoiceItem(account,
+                                         invoice,
+                                         invoiceItem,
+                                         otherInvoiceItems,
+                                         accountRecordId,
+                                         tenantRecordId,
+                                         reportGroup,
+                                         context);
+    }
+
     private BusinessInvoiceModelDao createBusinessInvoice(final Account account,
                                                           final Invoice invoice,
                                                           final Long accountRecordId,
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java
index 7cbc5f4..1a9c372 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java
@@ -20,6 +20,12 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
 
 import javax.annotation.Nullable;
 
@@ -39,45 +45,82 @@ import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
 
 public class BusinessSubscriptionTransitionFactory extends BusinessFactoryBase {
 
+    private final Executor executor;
+
     public BusinessSubscriptionTransitionFactory(final OSGIKillbillLogService logService,
                                                  final OSGIKillbillAPI osgiKillbillAPI) {
         super(logService, osgiKillbillAPI);
+        executor = Executors.newFixedThreadPool(20);
     }
 
     public Collection<BusinessSubscriptionTransitionModelDao> createBusinessSubscriptionTransitions(final UUID accountId,
                                                                                                     final Long accountRecordId,
                                                                                                     final Long tenantRecordId,
                                                                                                     final CallContext context) throws AnalyticsRefreshException {
+        // We build bsts for each subscription in parallel - we don't care about the overall ordering but we do care about ordering for
+        // a given subscription (we'd like the generated record ids to be sequential).
+        final CompletionService<Collection<BusinessSubscriptionTransitionModelDao>> completionService = new ExecutorCompletionService<Collection<BusinessSubscriptionTransitionModelDao>>(executor);
+
         final Account account = getAccount(accountId, context);
         final ReportGroup reportGroup = getReportGroup(account.getId(), context);
 
-        final Collection<BusinessSubscriptionTransitionModelDao> bsts = new LinkedList<BusinessSubscriptionTransitionModelDao>();
-
+        int nbSubscriptions = 0;
         final List<SubscriptionBundle> bundles = getSubscriptionBundlesForAccount(account.getId(), context);
         for (final SubscriptionBundle bundle : bundles) {
             final Collection<Subscription> subscriptions = getSubscriptionsForBundle(bundle.getId(), context);
+            nbSubscriptions += subscriptions.size();
+
             for (final Subscription subscription : subscriptions) {
-                final List<SubscriptionTransition> transitions = subscription.getAllTransitions();
-
-                BusinessSubscription prevNextSubscription = null;
-
-                // Ordered for us by entitlement
-                for (final SubscriptionTransition transition : transitions) {
-                    final BusinessSubscription nextSubscription = getBusinessSubscriptionFromTransition(account, transition);
-                    final BusinessSubscriptionTransitionModelDao bst = createBusinessSubscriptionTransition(account,
-                                                                                                            accountRecordId,
-                                                                                                            bundle,
-                                                                                                            transition,
-                                                                                                            prevNextSubscription,
-                                                                                                            nextSubscription,
-                                                                                                            tenantRecordId,
-                                                                                                            reportGroup,
-                                                                                                            context);
-                    if (bst != null) {
-                        bsts.add(bst);
-                        prevNextSubscription = nextSubscription;
+                completionService.submit(new Callable<Collection<BusinessSubscriptionTransitionModelDao>>() {
+                    @Override
+                    public Collection<BusinessSubscriptionTransitionModelDao> call() throws Exception {
+                        return buildTransitionsForSubscription(account, bundle, subscription, accountRecordId, tenantRecordId, reportGroup, context);
                     }
-                }
+                });
+            }
+        }
+
+        final Collection<BusinessSubscriptionTransitionModelDao> bsts = new LinkedList<BusinessSubscriptionTransitionModelDao>();
+        for (int i = 0; i < nbSubscriptions; ++i) {
+            try {
+                bsts.addAll(completionService.take().get());
+            } catch (InterruptedException e) {
+                throw new AnalyticsRefreshException(e);
+            } catch (ExecutionException e) {
+                throw new AnalyticsRefreshException(e);
+            }
+        }
+        return bsts;
+    }
+
+    private Collection<BusinessSubscriptionTransitionModelDao> buildTransitionsForSubscription(final Account account,
+                                                                                               final SubscriptionBundle bundle,
+                                                                                               final Subscription subscription,
+                                                                                               final Long accountRecordId,
+                                                                                               final Long tenantRecordId,
+                                                                                               @Nullable final ReportGroup reportGroup,
+                                                                                               final CallContext context) throws AnalyticsRefreshException {
+        final Collection<BusinessSubscriptionTransitionModelDao> bsts = new LinkedList<BusinessSubscriptionTransitionModelDao>();
+
+        final List<SubscriptionTransition> transitions = subscription.getAllTransitions();
+
+        BusinessSubscription prevNextSubscription = null;
+
+        // Ordered for us by entitlement
+        for (final SubscriptionTransition transition : transitions) {
+            final BusinessSubscription nextSubscription = getBusinessSubscriptionFromTransition(account, transition);
+            final BusinessSubscriptionTransitionModelDao bst = createBusinessSubscriptionTransition(account,
+                                                                                                    accountRecordId,
+                                                                                                    bundle,
+                                                                                                    transition,
+                                                                                                    prevNextSubscription,
+                                                                                                    nextSubscription,
+                                                                                                    tenantRecordId,
+                                                                                                    reportGroup,
+                                                                                                    context);
+            if (bst != null) {
+                bsts.add(bst);
+                prevNextSubscription = nextSubscription;
             }
         }
 
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessTagFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessTagFactory.java
index 1df645d..a365962 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessTagFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessTagFactory.java
@@ -49,6 +49,7 @@ public class BusinessTagFactory extends BusinessFactoryBase {
         final Collection<Tag> tags = getTagsForAccount(account.getId(), context);
 
         final Collection<BusinessTagModelDao> tagModelDaos = new LinkedList<BusinessTagModelDao>();
+        // We process tags sequentially: in practice, an account will be associated with a dozen tags at most
         for (final Tag tag : tags) {
             final Long tagRecordId = getTagRecordId(tag.getId(), context);
             final TagDefinition tagDefinition = getTagDefinition(tag.getTagDefinitionId(), context);