killbill-uncached

analytics: revisit the parallelism Make sure the parallelism

5/2/2013 8:58:46 PM

Changes

osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/AdjustmentInvoiceItemForRepair.java 135(+0 -135)

Details

diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
index d177004..f3879c2 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
@@ -17,6 +17,7 @@
 package com.ning.billing.osgi.bundles.analytics;
 
 import java.util.Hashtable;
+import java.util.concurrent.Executor;
 
 import javax.servlet.Servlet;
 import javax.servlet.http.HttpServlet;
@@ -39,10 +40,12 @@ public class AnalyticsActivator extends KillbillActivatorBase {
     public void start(final BundleContext context) throws Exception {
         super.start(context);
 
-        analyticsListener = new AnalyticsListener(logService, killbillAPI, dataSource);
+        final Executor executor = BusinessExecutor.create(logService);
+
+        analyticsListener = new AnalyticsListener(logService, killbillAPI, dataSource, executor);
         dispatcher.registerEventHandler(analyticsListener);
 
-        final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, dataSource);
+        final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, dataSource, executor);
         final AnalyticsServlet analyticsServlet = new AnalyticsServlet(analyticsUserApi, logService);
         registerServlet(context, analyticsServlet);
     }
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsListener.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsListener.java
index 5e2afc3..c636c01 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsListener.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsListener.java
@@ -18,6 +18,7 @@ package com.ning.billing.osgi.bundles.analytics;
 
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -59,16 +60,17 @@ public class AnalyticsListener implements OSGIKillbillEventHandler {
 
     public AnalyticsListener(final OSGIKillbillLogService logService,
                              final OSGIKillbillAPI osgiKillbillAPI,
-                             final OSGIKillbillDataSource osgiKillbillDataSource) {
+                             final OSGIKillbillDataSource osgiKillbillDataSource,
+                             final Executor executor) {
         this.logService = logService;
 
-        this.bacDao = new BusinessAccountDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
-        this.bstDao = new BusinessSubscriptionTransitionDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao);
-        this.binAndBipDao = new BusinessInvoiceAndInvoicePaymentDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao);
-        this.bosDao = new BusinessOverdueStatusDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
+        this.bacDao = new BusinessAccountDao(logService, osgiKillbillAPI, osgiKillbillDataSource, executor);
+        this.bstDao = new BusinessSubscriptionTransitionDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao, executor);
+        this.binAndBipDao = new BusinessInvoiceAndInvoicePaymentDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao, executor);
+        this.bosDao = new BusinessOverdueStatusDao(logService, osgiKillbillAPI, osgiKillbillDataSource, executor);
         this.bFieldDao = new BusinessFieldDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
         this.bTagDao = new BusinessTagDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
-        this.allBusinessObjectsDao = new AllBusinessObjectsDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
+        this.allBusinessObjectsDao = new AllBusinessObjectsDao(logService, osgiKillbillAPI, osgiKillbillDataSource, executor);
 
         // TODO Do we still need it?
         this.locker = new MySqlGlobalLocker(osgiKillbillDataSource.getDataSource());
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/api/user/AnalyticsUserApi.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/api/user/AnalyticsUserApi.java
index b2ac8b4..f2cb9f2 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/api/user/AnalyticsUserApi.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/api/user/AnalyticsUserApi.java
@@ -18,6 +18,7 @@ package com.ning.billing.osgi.bundles.analytics.api.user;
 
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 
 import com.ning.billing.osgi.bundles.analytics.AnalyticsRefreshException;
 import com.ning.billing.osgi.bundles.analytics.api.BusinessAccount;
@@ -43,9 +44,10 @@ public class AnalyticsUserApi {
 
     public AnalyticsUserApi(final OSGIKillbillLogService logService,
                             final OSGIKillbillAPI osgiKillbillAPI,
-                            final OSGIKillbillDataSource osgiKillbillDataSource) {
+                            final OSGIKillbillDataSource osgiKillbillDataSource,
+                            final Executor executor) {
         this.analyticsDao = new AnalyticsDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
-        this.allBusinessObjectsDao = new AllBusinessObjectsDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
+        this.allBusinessObjectsDao = new AllBusinessObjectsDao(logService, osgiKillbillAPI, osgiKillbillDataSource, executor);
     }
 
     public BusinessSnapshot getBusinessSnapshot(final UUID accountId, final TenantContext context) {
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
new file mode 100644
index 0000000..75c8a98
--- /dev/null
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2010-2013 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.osgi.bundles.analytics;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.osgi.service.log.LogService;
+
+import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
+
+public class BusinessExecutor extends ThreadPoolExecutor {
+
+    private static final Integer NB_THREADS = Integer.valueOf(System.getProperty("com.ning.billing.osgi.bundles.analytics.nb_threads", "100"));
+
+    private final OSGIKillbillLogService logService;
+
+    public static BusinessExecutor create(final OSGIKillbillLogService logService) {
+        return new BusinessExecutor(0,
+                                    NB_THREADS,
+                                    60L,
+                                    TimeUnit.SECONDS,
+                                    new SynchronousQueue<Runnable>(),
+                                    new NamedThreadFactory("osgi-analytics"),
+                                    logService);
+    }
+
+    public BusinessExecutor(final int corePoolSize,
+                            final int maximumPoolSize,
+                            final long keepAliveTime,
+                            final TimeUnit unit,
+                            final BlockingQueue<Runnable> workQueue,
+                            final ThreadFactory threadFactory,
+                            final OSGIKillbillLogService logService) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        this.logService = logService;
+    }
+
+    @Override
+    public <T> Future<T> submit(final Callable<T> task) {
+        return super.submit(WrappedCallable.wrap(logService, task));
+    }
+
+    @Override
+    public <T> Future<T> submit(final Runnable task, final T result) {
+        // HACK: assumes ThreadPoolExecutor will create a callable and call execute()
+        // (can't wrap the runnable here or exception isn't re-thrown when Future.get() is called)
+        return super.submit(task, result);
+    }
+
+    @Override
+    public Future<?> submit(final Runnable task) {
+        return super.submit(WrappedRunnable.wrap(logService, task));
+    }
+
+    @Override
+    public void execute(final Runnable command) {
+        super.execute(WrappedRunnable.wrap(logService, command));
+    }
+
+    private static class WrappedCallable<T> implements Callable<T> {
+
+        private final OSGIKillbillLogService logService;
+        private final Callable<T> callable;
+
+        private WrappedCallable(final OSGIKillbillLogService logService, final Callable<T> callable) {
+            this.logService = logService;
+            this.callable = callable;
+        }
+
+        public static <T> Callable<T> wrap(final OSGIKillbillLogService logService, final Callable<T> callable) {
+            return callable instanceof WrappedCallable ? callable : new WrappedCallable<T>(logService, callable);
+        }
+
+        @Override
+        public T call() throws Exception {
+            final Thread currentThread = Thread.currentThread();
+
+            try {
+                return callable.call();
+            } catch (Exception e) {
+                // since callables are expected to sometimes throw exceptions, log this at DEBUG instead of ERROR
+                logService.log(LogService.LOG_DEBUG, currentThread + " ended with an exception", e);
+
+                throw e;
+            } catch (Error e) {
+                logService.log(LogService.LOG_ERROR, currentThread + " ended with an exception", e);
+
+                throw e;
+            } finally {
+                logService.log(LogService.LOG_DEBUG, currentThread + " finished executing");
+            }
+        }
+    }
+
+    private static class WrappedRunnable implements Runnable {
+
+        private final OSGIKillbillLogService logService;
+        private final Runnable runnable;
+
+        private WrappedRunnable(final OSGIKillbillLogService logService, final Runnable runnable) {
+            this.logService = logService;
+            this.runnable = runnable;
+        }
+
+        public static Runnable wrap(final OSGIKillbillLogService logService, final Runnable runnable) {
+            return runnable instanceof WrappedRunnable ? runnable : new WrappedRunnable(logService, runnable);
+        }
+
+        @Override
+        public void run() {
+            final Thread currentThread = Thread.currentThread();
+
+            try {
+                runnable.run();
+            } catch (Throwable e) {
+                logService.log(LogService.LOG_ERROR, currentThread + " ended abnormally with an exception", e);
+            }
+
+            logService.log(LogService.LOG_DEBUG, currentThread + " finished executing");
+        }
+    }
+
+    /**
+     * Factory that sets the name of each thread it creates to {@code [name]-[id]}.
+     * This makes debugging stack traces much easier.
+     */
+    private static class NamedThreadFactory implements ThreadFactory {
+
+        private final AtomicInteger count = new AtomicInteger(0);
+        private final String name;
+
+        public NamedThreadFactory(final String name) {
+            this.name = name;
+        }
+
+        @Override
+        public Thread newThread(final Runnable runnable) {
+            final Thread thread = new Thread(runnable);
+
+            thread.setName(name + "-" + count.incrementAndGet());
+
+            return thread;
+        }
+    }
+}
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/AllBusinessObjectsDao.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/AllBusinessObjectsDao.java
index 49f95d8..13401e1 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/AllBusinessObjectsDao.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/AllBusinessObjectsDao.java
@@ -17,6 +17,7 @@
 package com.ning.billing.osgi.bundles.analytics.dao;
 
 import java.util.UUID;
+import java.util.concurrent.Executor;
 
 import org.osgi.service.log.LogService;
 
@@ -38,13 +39,14 @@ public class AllBusinessObjectsDao {
 
     public AllBusinessObjectsDao(final OSGIKillbillLogService logService,
                                  final OSGIKillbillAPI osgiKillbillAPI,
-                                 final OSGIKillbillDataSource osgiKillbillDataSource) {
+                                 final OSGIKillbillDataSource osgiKillbillDataSource,
+                                 final Executor executor) {
         this.logService = logService;
 
-        final BusinessAccountDao bacDao = new BusinessAccountDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
-        this.bstDao = new BusinessSubscriptionTransitionDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao);
-        this.binAndBipDao = new BusinessInvoiceAndInvoicePaymentDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao);
-        this.bosDao = new BusinessOverdueStatusDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
+        final BusinessAccountDao bacDao = new BusinessAccountDao(logService, osgiKillbillAPI, osgiKillbillDataSource, executor);
+        this.bstDao = new BusinessSubscriptionTransitionDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao, executor);
+        this.binAndBipDao = new BusinessInvoiceAndInvoicePaymentDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao, executor);
+        this.bosDao = new BusinessOverdueStatusDao(logService, osgiKillbillAPI, osgiKillbillDataSource, executor);
         this.bFieldDao = new BusinessFieldDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
         this.bTagDao = new BusinessTagDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
     }
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessAccountDao.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessAccountDao.java
index de5eb53..f28be62 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessAccountDao.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessAccountDao.java
@@ -17,6 +17,7 @@
 package com.ning.billing.osgi.bundles.analytics.dao;
 
 import java.util.UUID;
+import java.util.concurrent.Executor;
 
 import org.osgi.service.log.LogService;
 import org.skife.jdbi.v2.Transaction;
@@ -36,9 +37,10 @@ public class BusinessAccountDao extends BusinessAnalyticsDaoBase {
 
     public BusinessAccountDao(final OSGIKillbillLogService logService,
                               final OSGIKillbillAPI osgiKillbillAPI,
-                              final OSGIKillbillDataSource osgiKillbillDataSource) {
+                              final OSGIKillbillDataSource osgiKillbillDataSource,
+                              final Executor executor) {
         super(logService, osgiKillbillDataSource);
-        bacFactory = new BusinessAccountFactory(logService, osgiKillbillAPI);
+        bacFactory = new BusinessAccountFactory(logService, osgiKillbillAPI, executor);
     }
 
     public void update(final UUID accountId, final CallContext context) throws AnalyticsRefreshException {
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessBundleSummaryDao.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessBundleSummaryDao.java
index 7a92f05..b8d23a0 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessBundleSummaryDao.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessBundleSummaryDao.java
@@ -25,7 +25,8 @@ import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
 
 public class BusinessBundleSummaryDao extends BusinessAnalyticsDaoBase {
 
-    public BusinessBundleSummaryDao(final OSGIKillbillLogService logService, final OSGIKillbillDataSource osgiKillbillDataSource) {
+    public BusinessBundleSummaryDao(final OSGIKillbillLogService logService,
+                                    final OSGIKillbillDataSource osgiKillbillDataSource) {
         super(logService, osgiKillbillDataSource);
     }
 
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessInvoiceAndInvoicePaymentDao.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessInvoiceAndInvoicePaymentDao.java
index 3f14461..dced685 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessInvoiceAndInvoicePaymentDao.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessInvoiceAndInvoicePaymentDao.java
@@ -20,6 +20,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 
 import org.osgi.service.log.LogService;
 import org.skife.jdbi.v2.Transaction;
@@ -61,13 +62,14 @@ public class BusinessInvoiceAndInvoicePaymentDao extends BusinessAnalyticsDaoBas
     public BusinessInvoiceAndInvoicePaymentDao(final OSGIKillbillLogService logService,
                                                final OSGIKillbillAPI osgiKillbillAPI,
                                                final OSGIKillbillDataSource osgiKillbillDataSource,
-                                               final BusinessAccountDao businessAccountDao) {
+                                               final BusinessAccountDao businessAccountDao,
+                                               final Executor executor) {
         super(logService, osgiKillbillDataSource);
         this.businessAccountDao = businessAccountDao;
         this.businessInvoiceDao = new BusinessInvoiceDao(logService, osgiKillbillDataSource);
         this.businessInvoicePaymentDao = new BusinessInvoicePaymentDao(logService, osgiKillbillDataSource);
-        bacFactory = new BusinessAccountFactory(logService, osgiKillbillAPI);
-        binFactory = new BusinessInvoiceFactory(logService, osgiKillbillAPI);
+        bacFactory = new BusinessAccountFactory(logService, osgiKillbillAPI, executor);
+        binFactory = new BusinessInvoiceFactory(logService, osgiKillbillAPI, executor);
         bipFactory = new BusinessInvoicePaymentFactory(logService, osgiKillbillAPI);
     }
 
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessOverdueStatusDao.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessOverdueStatusDao.java
index d1cc126..4678dbc 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessOverdueStatusDao.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessOverdueStatusDao.java
@@ -18,6 +18,7 @@ package com.ning.billing.osgi.bundles.analytics.dao;
 
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 
 import org.osgi.service.log.LogService;
 import org.skife.jdbi.v2.Transaction;
@@ -39,10 +40,11 @@ public class BusinessOverdueStatusDao extends BusinessAnalyticsDaoBase {
 
     public BusinessOverdueStatusDao(final OSGIKillbillLogService logService,
                                     final OSGIKillbillAPI osgiKillbillAPI,
-                                    final OSGIKillbillDataSource osgiKillbillDataSource) {
+                                    final OSGIKillbillDataSource osgiKillbillDataSource,
+                                    final Executor executor) {
         super(logService, osgiKillbillDataSource);
         this.logService = logService;
-        bosFactory = new BusinessOverdueStatusFactory(logService, osgiKillbillAPI);
+        bosFactory = new BusinessOverdueStatusFactory(logService, osgiKillbillAPI, executor);
     }
 
     public void update(final UUID accountId, final ObjectType objectType, final CallContext context) throws AnalyticsRefreshException {
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessSubscriptionTransitionDao.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessSubscriptionTransitionDao.java
index fc77ed9..ac7b8c1 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessSubscriptionTransitionDao.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessSubscriptionTransitionDao.java
@@ -18,6 +18,7 @@ package com.ning.billing.osgi.bundles.analytics.dao;
 
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 
 import org.osgi.service.log.LogService;
 import org.skife.jdbi.v2.Transaction;
@@ -46,13 +47,14 @@ public class BusinessSubscriptionTransitionDao extends BusinessAnalyticsDaoBase 
     public BusinessSubscriptionTransitionDao(final OSGIKillbillLogService logService,
                                              final OSGIKillbillAPI osgiKillbillAPI,
                                              final OSGIKillbillDataSource osgiKillbillDataSource,
-                                             final BusinessAccountDao businessAccountDao) {
+                                             final BusinessAccountDao businessAccountDao,
+                                             final Executor executor) {
         super(logService, osgiKillbillDataSource);
         this.businessAccountDao = businessAccountDao;
         this.businessBundleSummaryDao = new BusinessBundleSummaryDao(logService, osgiKillbillDataSource);
-        bacFactory = new BusinessAccountFactory(logService, osgiKillbillAPI);
-        bbsFactory = new BusinessBundleSummaryFactory(logService, osgiKillbillAPI);
-        bstFactory = new BusinessSubscriptionTransitionFactory(logService, osgiKillbillAPI);
+        bacFactory = new BusinessAccountFactory(logService, osgiKillbillAPI, executor);
+        bbsFactory = new BusinessBundleSummaryFactory(logService, osgiKillbillAPI, executor);
+        bstFactory = new BusinessSubscriptionTransitionFactory(logService, osgiKillbillAPI, executor);
     }
 
     public void update(final UUID accountId, final CallContext context) throws AnalyticsRefreshException {
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessAccountFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessAccountFactory.java
index 3154362..982f725 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessAccountFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessAccountFactory.java
@@ -20,6 +20,12 @@ import java.math.BigDecimal;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.ning.billing.account.api.Account;
 import com.ning.billing.catalog.api.ProductCategory;
@@ -37,9 +43,13 @@ import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
 
 public class BusinessAccountFactory extends BusinessFactoryBase {
 
+    private final Executor executor;
+
     public BusinessAccountFactory(final OSGIKillbillLogService logService,
-                                  final OSGIKillbillAPI osgiKillbillAPI) {
+                                  final OSGIKillbillAPI osgiKillbillAPI,
+                                  final Executor executor) {
         super(logService, osgiKillbillAPI);
+        this.executor = executor;
     }
 
     public BusinessAccountModelDao createBusinessAccount(final UUID accountId,
@@ -72,15 +82,34 @@ public class BusinessAccountFactory extends BusinessFactoryBase {
             }
         }
 
+        // We fetch the subscriptions in parallel as these can be very large on a per account basis (@see BusinessSubscriptionTransitionFactory)
+        final CompletionService<Void> completionService = new ExecutorCompletionService<Void>(executor);
         final List<SubscriptionBundle> bundles = getSubscriptionBundlesForAccount(account.getId(), context);
-        int nbActiveBundles = 0;
+        final AtomicInteger nbActiveBundles = new AtomicInteger(0);
         for (final SubscriptionBundle bundle : bundles) {
-            final Collection<Subscription> subscriptionsForBundle = getSubscriptionsForBundle(bundle.getId(), context);
-            for (final Subscription subscription : subscriptionsForBundle) {
-                if (ProductCategory.BASE.equals(subscription.getCategory()) &&
-                    !(subscription.getEndDate() != null && !subscription.getEndDate().isAfterNow())) {
-                    nbActiveBundles++;
+            completionService.submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    final Collection<Subscription> subscriptionsForBundle = getSubscriptionsForBundle(bundle.getId(), context);
+                    for (final Subscription subscription : subscriptionsForBundle) {
+                        if (ProductCategory.BASE.equals(subscription.getCategory()) &&
+                            !(subscription.getEndDate() != null && !subscription.getEndDate().isAfterNow())) {
+                            nbActiveBundles.incrementAndGet();
+                            break;
+                        }
+                    }
+
+                    return null;
                 }
+            });
+        }
+        for (final SubscriptionBundle ignored : bundles) {
+            try {
+                completionService.take().get();
+            } catch (InterruptedException e) {
+                throw new AnalyticsRefreshException(e);
+            } catch (ExecutionException e) {
+                throw new AnalyticsRefreshException(e);
             }
         }
 
@@ -93,7 +122,7 @@ public class BusinessAccountFactory extends BusinessFactoryBase {
                                            accountBalance,
                                            lastInvoice,
                                            lastPayment,
-                                           nbActiveBundles,
+                                           nbActiveBundles.get(),
                                            creationAuditLog,
                                            tenantRecordId,
                                            reportGroup);
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessBundleSummaryFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessBundleSummaryFactory.java
index 57a7599..4c25d75 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessBundleSummaryFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessBundleSummaryFactory.java
@@ -22,6 +22,11 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
 
 import com.ning.billing.account.api.Account;
 import com.ning.billing.catalog.api.ProductCategory;
@@ -42,9 +47,13 @@ import com.google.common.collect.Ordering;
 
 public class BusinessBundleSummaryFactory extends BusinessFactoryBase {
 
+    private final Executor executor;
+
     public BusinessBundleSummaryFactory(final OSGIKillbillLogService logService,
-                                        final OSGIKillbillAPI osgiKillbillAPI) {
+                                        final OSGIKillbillAPI osgiKillbillAPI,
+                                        final Executor executor) {
         super(logService, osgiKillbillAPI);
+        this.executor = executor;
     }
 
     public Collection<BusinessBundleSummaryModelDao> createBusinessBundleSummaries(final UUID accountId,
@@ -59,17 +68,33 @@ public class BusinessBundleSummaryFactory extends BusinessFactoryBase {
         final Map<UUID, BusinessSubscriptionTransitionModelDao> bstForBundle = new LinkedHashMap<UUID, BusinessSubscriptionTransitionModelDao>();
         filterBstsForBasePlans(bsts, rankForBundle, bstForBundle);
 
+        // We fetch the bundles in parallel as these can be very large on a per account basis (@see BusinessSubscriptionTransitionFactory)
+        final CompletionService<BusinessBundleSummaryModelDao> completionService = new ExecutorCompletionService<BusinessBundleSummaryModelDao>(executor);
         final Collection<BusinessBundleSummaryModelDao> bbss = new LinkedList<BusinessBundleSummaryModelDao>();
         for (final BusinessSubscriptionTransitionModelDao bst : bstForBundle.values()) {
-            final BusinessBundleSummaryModelDao bbs = buildBBS(account,
-                                                               accountRecordId,
-                                                               bst,
-                                                               rankForBundle.get(bst.getBundleId()),
-                                                               tenantRecordId,
-                                                               reportGroup,
-                                                               context);
-            bbss.add(bbs);
+            completionService.submit(new Callable<BusinessBundleSummaryModelDao>() {
+                @Override
+                public BusinessBundleSummaryModelDao call() throws Exception {
+                    return buildBBS(account,
+                                    accountRecordId,
+                                    bst,
+                                    rankForBundle.get(bst.getBundleId()),
+                                    tenantRecordId,
+                                    reportGroup,
+                                    context);
+                }
+            });
+        }
+        for (final BusinessSubscriptionTransitionModelDao ignored : bstForBundle.values()) {
+            try {
+                bbss.add(completionService.take().get());
+            } catch (InterruptedException e) {
+                throw new AnalyticsRefreshException(e);
+            } catch (ExecutionException e) {
+                throw new AnalyticsRefreshException(e);
+            }
         }
+
         return bbss;
     }
 
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java
index 3174f6d..e2942b1 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java
@@ -26,7 +26,6 @@ import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Executors;
 
 import javax.annotation.Nullable;
 
@@ -65,14 +64,13 @@ import static com.ning.billing.osgi.bundles.analytics.utils.BusinessInvoiceUtils
 
 public class BusinessInvoiceFactory extends BusinessFactoryBase {
 
-    private static final int NB_THREADS = 20;
-
     private final Executor executor;
 
     public BusinessInvoiceFactory(final OSGIKillbillLogService logService,
-                                  final OSGIKillbillAPI osgiKillbillAPI) {
+                                  final OSGIKillbillAPI osgiKillbillAPI,
+                                  final Executor executor) {
         super(logService, osgiKillbillAPI);
-        executor = Executors.newFixedThreadPool(NB_THREADS);
+        this.executor = executor;
     }
 
     /**
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessOverdueStatusFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessOverdueStatusFactory.java
index d346cc1..94a395a 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessOverdueStatusFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessOverdueStatusFactory.java
@@ -20,6 +20,11 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
 
 import org.joda.time.DateTime;
 
@@ -39,18 +44,40 @@ import com.google.common.collect.Lists;
 
 public class BusinessOverdueStatusFactory extends BusinessFactoryBase {
 
+    private final Executor executor;
+
     public BusinessOverdueStatusFactory(final OSGIKillbillLogService logService,
-                                        final OSGIKillbillAPI osgiKillbillAPI) {
+                                        final OSGIKillbillAPI osgiKillbillAPI,
+                                        final Executor executor) {
         super(logService, osgiKillbillAPI);
+        this.executor = executor;
     }
 
     public Collection<BusinessOverdueStatusModelDao> createBusinessOverdueStatuses(final UUID accountId,
                                                                                    final CallContext context) throws AnalyticsRefreshException {
+        // We fetch the bundles in parallel as these can be very large on a per account basis (@see BusinessSubscriptionTransitionFactory)
+        // We don't care about the overall ordering but we do care about ordering for
+        // a given bundle (we'd like the generated record ids to be sequential).
+        final CompletionService<Collection<BusinessOverdueStatusModelDao>> completionService = new ExecutorCompletionService<Collection<BusinessOverdueStatusModelDao>>(executor);
         final Collection<SubscriptionBundle> bundles = getSubscriptionBundlesForAccount(accountId, context);
         final Collection<BusinessOverdueStatusModelDao> businessOverdueStatuses = new LinkedList<BusinessOverdueStatusModelDao>();
         for (final SubscriptionBundle bundle : bundles) {
-            // Recompute all blocking states for that bundle
-            businessOverdueStatuses.addAll(createBusinessOverdueStatusesForBundle(accountId, bundle, context));
+            completionService.submit(new Callable<Collection<BusinessOverdueStatusModelDao>>() {
+                @Override
+                public Collection<BusinessOverdueStatusModelDao> call() throws Exception {
+                    // Recompute all blocking states for that bundle
+                    return createBusinessOverdueStatusesForBundle(accountId, bundle, context);
+                }
+            });
+        }
+        for (final SubscriptionBundle ignored : bundles) {
+            try {
+                businessOverdueStatuses.addAll(completionService.take().get());
+            } catch (InterruptedException e) {
+                throw new AnalyticsRefreshException(e);
+            } catch (ExecutionException e) {
+                throw new AnalyticsRefreshException(e);
+            }
         }
 
         return businessOverdueStatuses;
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java
index dfc2750..b84a0a6 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java
@@ -26,7 +26,6 @@ import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Executors;
 
 import javax.annotation.Nullable;
 
@@ -46,14 +45,13 @@ import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
 
 public class BusinessSubscriptionTransitionFactory extends BusinessFactoryBase {
 
-    private static final int NB_THREADS = 20;
-
     private final Executor executor;
 
     public BusinessSubscriptionTransitionFactory(final OSGIKillbillLogService logService,
-                                                 final OSGIKillbillAPI osgiKillbillAPI) {
+                                                 final OSGIKillbillAPI osgiKillbillAPI,
+                                                 final Executor executor) {
         super(logService, osgiKillbillAPI);
-        executor = Executors.newFixedThreadPool(NB_THREADS);
+        this.executor = executor;
     }
 
     public Collection<BusinessSubscriptionTransitionModelDao> createBusinessSubscriptionTransitions(final UUID accountId,
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java
index 7a6f40c..7cbae91 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java
@@ -22,6 +22,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.ning.billing.osgi.bundles.analytics.AnalyticsTestSuiteWithEmbeddedDB;
+import com.ning.billing.osgi.bundles.analytics.BusinessExecutor;
 import com.ning.billing.osgi.bundles.analytics.api.BusinessAccount;
 import com.ning.billing.osgi.bundles.analytics.api.BusinessSnapshot;
 import com.ning.billing.osgi.bundles.analytics.dao.model.BusinessAccountModelDao;
@@ -41,7 +42,7 @@ public class TestDefaultAnalyticsUserApi extends AnalyticsTestSuiteWithEmbeddedD
                                                                                     reportGroup);
         analyticsSqlDao.create(accountModelDao.getTableName(), accountModelDao, callContext);
 
-        final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, killbillDataSource);
+        final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, killbillDataSource, BusinessExecutor.create(logService));
         final BusinessSnapshot businessSnapshot = analyticsUserApi.getBusinessSnapshot(account.getId(), callContext);
         Assert.assertEquals(businessSnapshot.getBusinessAccount(), new BusinessAccount(accountModelDao));
     }
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java
index 84e9339..a909c5d 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java
@@ -40,6 +40,7 @@ import com.ning.billing.catalog.api.Product;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.osgi.bundles.analytics.AnalyticsTestSuiteNoDB;
+import com.ning.billing.osgi.bundles.analytics.BusinessExecutor;
 import com.ning.billing.osgi.bundles.analytics.dao.model.BusinessSubscription;
 import com.ning.billing.osgi.bundles.analytics.dao.model.BusinessSubscriptionEvent;
 import com.ning.billing.osgi.bundles.analytics.dao.model.BusinessSubscriptionTransitionModelDao;
@@ -71,7 +72,7 @@ public class TestBusinessBundleSummaryFactory extends AnalyticsTestSuiteNoDB {
             }
         }).when(osgiKillbillLogService).log(Mockito.anyInt(), Mockito.anyString());
 
-        bundleSummaryDao = new BusinessBundleSummaryFactory(osgiKillbillLogService, null);
+        bundleSummaryDao = new BusinessBundleSummaryFactory(osgiKillbillLogService, null, BusinessExecutor.create(osgiKillbillLogService));
     }
 
     @Test(groups = "fast")
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java
index a153d52..77bfc4e 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java
@@ -32,6 +32,7 @@ import org.testng.annotations.Test;
 import com.ning.billing.invoice.api.InvoiceItem;
 import com.ning.billing.invoice.api.InvoiceItemType;
 import com.ning.billing.osgi.bundles.analytics.AnalyticsTestSuiteNoDB;
+import com.ning.billing.osgi.bundles.analytics.BusinessExecutor;
 import com.ning.billing.osgi.bundles.analytics.dao.model.BusinessInvoiceItemBaseModelDao;
 import com.ning.billing.osgi.bundles.analytics.dao.model.BusinessInvoiceItemBaseModelDao.ItemSource;
 import com.ning.billing.osgi.bundles.analytics.utils.BusinessInvoiceUtils;
@@ -63,7 +64,7 @@ public class TestBusinessInvoiceFactory extends AnalyticsTestSuiteNoDB {
             }
         }).when(osgiKillbillLogService).log(Mockito.anyInt(), Mockito.anyString());
 
-        invoiceFactory = new BusinessInvoiceFactory(osgiKillbillLogService, null);
+        invoiceFactory = new BusinessInvoiceFactory(osgiKillbillLogService, null, BusinessExecutor.create(osgiKillbillLogService));
     }
 
     @Test(groups = "fast")