killbill-uncached
Changes
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java 7(+5 -2)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsListener.java 14(+8 -6)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/api/user/AnalyticsUserApi.java 6(+4 -2)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java 166(+166 -0)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/AllBusinessObjectsDao.java 12(+7 -5)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessAccountDao.java 6(+4 -2)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessBundleSummaryDao.java 3(+2 -1)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessInvoiceAndInvoicePaymentDao.java 8(+5 -3)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessOverdueStatusDao.java 6(+4 -2)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessSubscriptionTransitionDao.java 10(+6 -4)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/AdjustmentInvoiceItemForRepair.java 135(+0 -135)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessAccountFactory.java 45(+37 -8)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessBundleSummaryFactory.java 43(+34 -9)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java 8(+3 -5)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessOverdueStatusFactory.java 33(+30 -3)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java 8(+3 -5)
osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java 3(+2 -1)
Details
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
index d177004..f3879c2 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
@@ -17,6 +17,7 @@
package com.ning.billing.osgi.bundles.analytics;
import java.util.Hashtable;
+import java.util.concurrent.Executor;
import javax.servlet.Servlet;
import javax.servlet.http.HttpServlet;
@@ -39,10 +40,12 @@ public class AnalyticsActivator extends KillbillActivatorBase {
public void start(final BundleContext context) throws Exception {
super.start(context);
- analyticsListener = new AnalyticsListener(logService, killbillAPI, dataSource);
+ final Executor executor = BusinessExecutor.create(logService);
+
+ analyticsListener = new AnalyticsListener(logService, killbillAPI, dataSource, executor);
dispatcher.registerEventHandler(analyticsListener);
- final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, dataSource);
+ final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, dataSource, executor);
final AnalyticsServlet analyticsServlet = new AnalyticsServlet(analyticsUserApi, logService);
registerServlet(context, analyticsServlet);
}
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsListener.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsListener.java
index 5e2afc3..c636c01 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsListener.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsListener.java
@@ -18,6 +18,7 @@ package com.ning.billing.osgi.bundles.analytics;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -59,16 +60,17 @@ public class AnalyticsListener implements OSGIKillbillEventHandler {
public AnalyticsListener(final OSGIKillbillLogService logService,
final OSGIKillbillAPI osgiKillbillAPI,
- final OSGIKillbillDataSource osgiKillbillDataSource) {
+ final OSGIKillbillDataSource osgiKillbillDataSource,
+ final Executor executor) {
this.logService = logService;
- this.bacDao = new BusinessAccountDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
- this.bstDao = new BusinessSubscriptionTransitionDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao);
- this.binAndBipDao = new BusinessInvoiceAndInvoicePaymentDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao);
- this.bosDao = new BusinessOverdueStatusDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
+ this.bacDao = new BusinessAccountDao(logService, osgiKillbillAPI, osgiKillbillDataSource, executor);
+ this.bstDao = new BusinessSubscriptionTransitionDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao, executor);
+ this.binAndBipDao = new BusinessInvoiceAndInvoicePaymentDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao, executor);
+ this.bosDao = new BusinessOverdueStatusDao(logService, osgiKillbillAPI, osgiKillbillDataSource, executor);
this.bFieldDao = new BusinessFieldDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
this.bTagDao = new BusinessTagDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
- this.allBusinessObjectsDao = new AllBusinessObjectsDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
+ this.allBusinessObjectsDao = new AllBusinessObjectsDao(logService, osgiKillbillAPI, osgiKillbillDataSource, executor);
// TODO Do we still need it?
this.locker = new MySqlGlobalLocker(osgiKillbillDataSource.getDataSource());
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/api/user/AnalyticsUserApi.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/api/user/AnalyticsUserApi.java
index b2ac8b4..f2cb9f2 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/api/user/AnalyticsUserApi.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/api/user/AnalyticsUserApi.java
@@ -18,6 +18,7 @@ package com.ning.billing.osgi.bundles.analytics.api.user;
import java.util.Collection;
import java.util.UUID;
+import java.util.concurrent.Executor;
import com.ning.billing.osgi.bundles.analytics.AnalyticsRefreshException;
import com.ning.billing.osgi.bundles.analytics.api.BusinessAccount;
@@ -43,9 +44,10 @@ public class AnalyticsUserApi {
public AnalyticsUserApi(final OSGIKillbillLogService logService,
final OSGIKillbillAPI osgiKillbillAPI,
- final OSGIKillbillDataSource osgiKillbillDataSource) {
+ final OSGIKillbillDataSource osgiKillbillDataSource,
+ final Executor executor) {
this.analyticsDao = new AnalyticsDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
- this.allBusinessObjectsDao = new AllBusinessObjectsDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
+ this.allBusinessObjectsDao = new AllBusinessObjectsDao(logService, osgiKillbillAPI, osgiKillbillDataSource, executor);
}
public BusinessSnapshot getBusinessSnapshot(final UUID accountId, final TenantContext context) {
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
new file mode 100644
index 0000000..75c8a98
--- /dev/null
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2010-2013 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.osgi.bundles.analytics;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.osgi.service.log.LogService;
+
+import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
+
+public class BusinessExecutor extends ThreadPoolExecutor {
+
+ private static final Integer NB_THREADS = Integer.valueOf(System.getProperty("com.ning.billing.osgi.bundles.analytics.nb_threads", "100"));
+
+ private final OSGIKillbillLogService logService;
+
+ public static BusinessExecutor create(final OSGIKillbillLogService logService) {
+ return new BusinessExecutor(0,
+ NB_THREADS,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ new NamedThreadFactory("osgi-analytics"),
+ logService);
+ }
+
+ public BusinessExecutor(final int corePoolSize,
+ final int maximumPoolSize,
+ final long keepAliveTime,
+ final TimeUnit unit,
+ final BlockingQueue<Runnable> workQueue,
+ final ThreadFactory threadFactory,
+ final OSGIKillbillLogService logService) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ this.logService = logService;
+ }
+
+ @Override
+ public <T> Future<T> submit(final Callable<T> task) {
+ return super.submit(WrappedCallable.wrap(logService, task));
+ }
+
+ @Override
+ public <T> Future<T> submit(final Runnable task, final T result) {
+ // HACK: assumes ThreadPoolExecutor will create a callable and call execute()
+ // (can't wrap the runnable here or exception isn't re-thrown when Future.get() is called)
+ return super.submit(task, result);
+ }
+
+ @Override
+ public Future<?> submit(final Runnable task) {
+ return super.submit(WrappedRunnable.wrap(logService, task));
+ }
+
+ @Override
+ public void execute(final Runnable command) {
+ super.execute(WrappedRunnable.wrap(logService, command));
+ }
+
+ private static class WrappedCallable<T> implements Callable<T> {
+
+ private final OSGIKillbillLogService logService;
+ private final Callable<T> callable;
+
+ private WrappedCallable(final OSGIKillbillLogService logService, final Callable<T> callable) {
+ this.logService = logService;
+ this.callable = callable;
+ }
+
+ public static <T> Callable<T> wrap(final OSGIKillbillLogService logService, final Callable<T> callable) {
+ return callable instanceof WrappedCallable ? callable : new WrappedCallable<T>(logService, callable);
+ }
+
+ @Override
+ public T call() throws Exception {
+ final Thread currentThread = Thread.currentThread();
+
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ // since callables are expected to sometimes throw exceptions, log this at DEBUG instead of ERROR
+ logService.log(LogService.LOG_DEBUG, currentThread + " ended with an exception", e);
+
+ throw e;
+ } catch (Error e) {
+ logService.log(LogService.LOG_ERROR, currentThread + " ended with an exception", e);
+
+ throw e;
+ } finally {
+ logService.log(LogService.LOG_DEBUG, currentThread + " finished executing");
+ }
+ }
+ }
+
+ private static class WrappedRunnable implements Runnable {
+
+ private final OSGIKillbillLogService logService;
+ private final Runnable runnable;
+
+ private WrappedRunnable(final OSGIKillbillLogService logService, final Runnable runnable) {
+ this.logService = logService;
+ this.runnable = runnable;
+ }
+
+ public static Runnable wrap(final OSGIKillbillLogService logService, final Runnable runnable) {
+ return runnable instanceof WrappedRunnable ? runnable : new WrappedRunnable(logService, runnable);
+ }
+
+ @Override
+ public void run() {
+ final Thread currentThread = Thread.currentThread();
+
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ logService.log(LogService.LOG_ERROR, currentThread + " ended abnormally with an exception", e);
+ }
+
+ logService.log(LogService.LOG_DEBUG, currentThread + " finished executing");
+ }
+ }
+
+ /**
+ * Factory that sets the name of each thread it creates to {@code [name]-[id]}.
+ * This makes debugging stack traces much easier.
+ */
+ private static class NamedThreadFactory implements ThreadFactory {
+
+ private final AtomicInteger count = new AtomicInteger(0);
+ private final String name;
+
+ public NamedThreadFactory(final String name) {
+ this.name = name;
+ }
+
+ @Override
+ public Thread newThread(final Runnable runnable) {
+ final Thread thread = new Thread(runnable);
+
+ thread.setName(name + "-" + count.incrementAndGet());
+
+ return thread;
+ }
+ }
+}
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/AllBusinessObjectsDao.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/AllBusinessObjectsDao.java
index 49f95d8..13401e1 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/AllBusinessObjectsDao.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/AllBusinessObjectsDao.java
@@ -17,6 +17,7 @@
package com.ning.billing.osgi.bundles.analytics.dao;
import java.util.UUID;
+import java.util.concurrent.Executor;
import org.osgi.service.log.LogService;
@@ -38,13 +39,14 @@ public class AllBusinessObjectsDao {
public AllBusinessObjectsDao(final OSGIKillbillLogService logService,
final OSGIKillbillAPI osgiKillbillAPI,
- final OSGIKillbillDataSource osgiKillbillDataSource) {
+ final OSGIKillbillDataSource osgiKillbillDataSource,
+ final Executor executor) {
this.logService = logService;
- final BusinessAccountDao bacDao = new BusinessAccountDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
- this.bstDao = new BusinessSubscriptionTransitionDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao);
- this.binAndBipDao = new BusinessInvoiceAndInvoicePaymentDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao);
- this.bosDao = new BusinessOverdueStatusDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
+ final BusinessAccountDao bacDao = new BusinessAccountDao(logService, osgiKillbillAPI, osgiKillbillDataSource, executor);
+ this.bstDao = new BusinessSubscriptionTransitionDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao, executor);
+ this.binAndBipDao = new BusinessInvoiceAndInvoicePaymentDao(logService, osgiKillbillAPI, osgiKillbillDataSource, bacDao, executor);
+ this.bosDao = new BusinessOverdueStatusDao(logService, osgiKillbillAPI, osgiKillbillDataSource, executor);
this.bFieldDao = new BusinessFieldDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
this.bTagDao = new BusinessTagDao(logService, osgiKillbillAPI, osgiKillbillDataSource);
}
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessAccountDao.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessAccountDao.java
index de5eb53..f28be62 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessAccountDao.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessAccountDao.java
@@ -17,6 +17,7 @@
package com.ning.billing.osgi.bundles.analytics.dao;
import java.util.UUID;
+import java.util.concurrent.Executor;
import org.osgi.service.log.LogService;
import org.skife.jdbi.v2.Transaction;
@@ -36,9 +37,10 @@ public class BusinessAccountDao extends BusinessAnalyticsDaoBase {
public BusinessAccountDao(final OSGIKillbillLogService logService,
final OSGIKillbillAPI osgiKillbillAPI,
- final OSGIKillbillDataSource osgiKillbillDataSource) {
+ final OSGIKillbillDataSource osgiKillbillDataSource,
+ final Executor executor) {
super(logService, osgiKillbillDataSource);
- bacFactory = new BusinessAccountFactory(logService, osgiKillbillAPI);
+ bacFactory = new BusinessAccountFactory(logService, osgiKillbillAPI, executor);
}
public void update(final UUID accountId, final CallContext context) throws AnalyticsRefreshException {
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessBundleSummaryDao.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessBundleSummaryDao.java
index 7a92f05..b8d23a0 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessBundleSummaryDao.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessBundleSummaryDao.java
@@ -25,7 +25,8 @@ import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
public class BusinessBundleSummaryDao extends BusinessAnalyticsDaoBase {
- public BusinessBundleSummaryDao(final OSGIKillbillLogService logService, final OSGIKillbillDataSource osgiKillbillDataSource) {
+ public BusinessBundleSummaryDao(final OSGIKillbillLogService logService,
+ final OSGIKillbillDataSource osgiKillbillDataSource) {
super(logService, osgiKillbillDataSource);
}
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessInvoiceAndInvoicePaymentDao.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessInvoiceAndInvoicePaymentDao.java
index 3f14461..dced685 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessInvoiceAndInvoicePaymentDao.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessInvoiceAndInvoicePaymentDao.java
@@ -20,6 +20,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Executor;
import org.osgi.service.log.LogService;
import org.skife.jdbi.v2.Transaction;
@@ -61,13 +62,14 @@ public class BusinessInvoiceAndInvoicePaymentDao extends BusinessAnalyticsDaoBas
public BusinessInvoiceAndInvoicePaymentDao(final OSGIKillbillLogService logService,
final OSGIKillbillAPI osgiKillbillAPI,
final OSGIKillbillDataSource osgiKillbillDataSource,
- final BusinessAccountDao businessAccountDao) {
+ final BusinessAccountDao businessAccountDao,
+ final Executor executor) {
super(logService, osgiKillbillDataSource);
this.businessAccountDao = businessAccountDao;
this.businessInvoiceDao = new BusinessInvoiceDao(logService, osgiKillbillDataSource);
this.businessInvoicePaymentDao = new BusinessInvoicePaymentDao(logService, osgiKillbillDataSource);
- bacFactory = new BusinessAccountFactory(logService, osgiKillbillAPI);
- binFactory = new BusinessInvoiceFactory(logService, osgiKillbillAPI);
+ bacFactory = new BusinessAccountFactory(logService, osgiKillbillAPI, executor);
+ binFactory = new BusinessInvoiceFactory(logService, osgiKillbillAPI, executor);
bipFactory = new BusinessInvoicePaymentFactory(logService, osgiKillbillAPI);
}
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessOverdueStatusDao.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessOverdueStatusDao.java
index d1cc126..4678dbc 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessOverdueStatusDao.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessOverdueStatusDao.java
@@ -18,6 +18,7 @@ package com.ning.billing.osgi.bundles.analytics.dao;
import java.util.Collection;
import java.util.UUID;
+import java.util.concurrent.Executor;
import org.osgi.service.log.LogService;
import org.skife.jdbi.v2.Transaction;
@@ -39,10 +40,11 @@ public class BusinessOverdueStatusDao extends BusinessAnalyticsDaoBase {
public BusinessOverdueStatusDao(final OSGIKillbillLogService logService,
final OSGIKillbillAPI osgiKillbillAPI,
- final OSGIKillbillDataSource osgiKillbillDataSource) {
+ final OSGIKillbillDataSource osgiKillbillDataSource,
+ final Executor executor) {
super(logService, osgiKillbillDataSource);
this.logService = logService;
- bosFactory = new BusinessOverdueStatusFactory(logService, osgiKillbillAPI);
+ bosFactory = new BusinessOverdueStatusFactory(logService, osgiKillbillAPI, executor);
}
public void update(final UUID accountId, final ObjectType objectType, final CallContext context) throws AnalyticsRefreshException {
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessSubscriptionTransitionDao.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessSubscriptionTransitionDao.java
index fc77ed9..ac7b8c1 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessSubscriptionTransitionDao.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/BusinessSubscriptionTransitionDao.java
@@ -18,6 +18,7 @@ package com.ning.billing.osgi.bundles.analytics.dao;
import java.util.Collection;
import java.util.UUID;
+import java.util.concurrent.Executor;
import org.osgi.service.log.LogService;
import org.skife.jdbi.v2.Transaction;
@@ -46,13 +47,14 @@ public class BusinessSubscriptionTransitionDao extends BusinessAnalyticsDaoBase
public BusinessSubscriptionTransitionDao(final OSGIKillbillLogService logService,
final OSGIKillbillAPI osgiKillbillAPI,
final OSGIKillbillDataSource osgiKillbillDataSource,
- final BusinessAccountDao businessAccountDao) {
+ final BusinessAccountDao businessAccountDao,
+ final Executor executor) {
super(logService, osgiKillbillDataSource);
this.businessAccountDao = businessAccountDao;
this.businessBundleSummaryDao = new BusinessBundleSummaryDao(logService, osgiKillbillDataSource);
- bacFactory = new BusinessAccountFactory(logService, osgiKillbillAPI);
- bbsFactory = new BusinessBundleSummaryFactory(logService, osgiKillbillAPI);
- bstFactory = new BusinessSubscriptionTransitionFactory(logService, osgiKillbillAPI);
+ bacFactory = new BusinessAccountFactory(logService, osgiKillbillAPI, executor);
+ bbsFactory = new BusinessBundleSummaryFactory(logService, osgiKillbillAPI, executor);
+ bstFactory = new BusinessSubscriptionTransitionFactory(logService, osgiKillbillAPI, executor);
}
public void update(final UUID accountId, final CallContext context) throws AnalyticsRefreshException {
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessAccountFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessAccountFactory.java
index 3154362..982f725 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessAccountFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessAccountFactory.java
@@ -20,6 +20,12 @@ import java.math.BigDecimal;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.atomic.AtomicInteger;
import com.ning.billing.account.api.Account;
import com.ning.billing.catalog.api.ProductCategory;
@@ -37,9 +43,13 @@ import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
public class BusinessAccountFactory extends BusinessFactoryBase {
+ private final Executor executor;
+
public BusinessAccountFactory(final OSGIKillbillLogService logService,
- final OSGIKillbillAPI osgiKillbillAPI) {
+ final OSGIKillbillAPI osgiKillbillAPI,
+ final Executor executor) {
super(logService, osgiKillbillAPI);
+ this.executor = executor;
}
public BusinessAccountModelDao createBusinessAccount(final UUID accountId,
@@ -72,15 +82,34 @@ public class BusinessAccountFactory extends BusinessFactoryBase {
}
}
+ // We fetch the subscriptions in parallel as these can be very large on a per account basis (@see BusinessSubscriptionTransitionFactory)
+ final CompletionService<Void> completionService = new ExecutorCompletionService<Void>(executor);
final List<SubscriptionBundle> bundles = getSubscriptionBundlesForAccount(account.getId(), context);
- int nbActiveBundles = 0;
+ final AtomicInteger nbActiveBundles = new AtomicInteger(0);
for (final SubscriptionBundle bundle : bundles) {
- final Collection<Subscription> subscriptionsForBundle = getSubscriptionsForBundle(bundle.getId(), context);
- for (final Subscription subscription : subscriptionsForBundle) {
- if (ProductCategory.BASE.equals(subscription.getCategory()) &&
- !(subscription.getEndDate() != null && !subscription.getEndDate().isAfterNow())) {
- nbActiveBundles++;
+ completionService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ final Collection<Subscription> subscriptionsForBundle = getSubscriptionsForBundle(bundle.getId(), context);
+ for (final Subscription subscription : subscriptionsForBundle) {
+ if (ProductCategory.BASE.equals(subscription.getCategory()) &&
+ !(subscription.getEndDate() != null && !subscription.getEndDate().isAfterNow())) {
+ nbActiveBundles.incrementAndGet();
+ break;
+ }
+ }
+
+ return null;
}
+ });
+ }
+ for (final SubscriptionBundle ignored : bundles) {
+ try {
+ completionService.take().get();
+ } catch (InterruptedException e) {
+ throw new AnalyticsRefreshException(e);
+ } catch (ExecutionException e) {
+ throw new AnalyticsRefreshException(e);
}
}
@@ -93,7 +122,7 @@ public class BusinessAccountFactory extends BusinessFactoryBase {
accountBalance,
lastInvoice,
lastPayment,
- nbActiveBundles,
+ nbActiveBundles.get(),
creationAuditLog,
tenantRecordId,
reportGroup);
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessBundleSummaryFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessBundleSummaryFactory.java
index 57a7599..4c25d75 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessBundleSummaryFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessBundleSummaryFactory.java
@@ -22,6 +22,11 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
import com.ning.billing.account.api.Account;
import com.ning.billing.catalog.api.ProductCategory;
@@ -42,9 +47,13 @@ import com.google.common.collect.Ordering;
public class BusinessBundleSummaryFactory extends BusinessFactoryBase {
+ private final Executor executor;
+
public BusinessBundleSummaryFactory(final OSGIKillbillLogService logService,
- final OSGIKillbillAPI osgiKillbillAPI) {
+ final OSGIKillbillAPI osgiKillbillAPI,
+ final Executor executor) {
super(logService, osgiKillbillAPI);
+ this.executor = executor;
}
public Collection<BusinessBundleSummaryModelDao> createBusinessBundleSummaries(final UUID accountId,
@@ -59,17 +68,33 @@ public class BusinessBundleSummaryFactory extends BusinessFactoryBase {
final Map<UUID, BusinessSubscriptionTransitionModelDao> bstForBundle = new LinkedHashMap<UUID, BusinessSubscriptionTransitionModelDao>();
filterBstsForBasePlans(bsts, rankForBundle, bstForBundle);
+ // We fetch the bundles in parallel as these can be very large on a per account basis (@see BusinessSubscriptionTransitionFactory)
+ final CompletionService<BusinessBundleSummaryModelDao> completionService = new ExecutorCompletionService<BusinessBundleSummaryModelDao>(executor);
final Collection<BusinessBundleSummaryModelDao> bbss = new LinkedList<BusinessBundleSummaryModelDao>();
for (final BusinessSubscriptionTransitionModelDao bst : bstForBundle.values()) {
- final BusinessBundleSummaryModelDao bbs = buildBBS(account,
- accountRecordId,
- bst,
- rankForBundle.get(bst.getBundleId()),
- tenantRecordId,
- reportGroup,
- context);
- bbss.add(bbs);
+ completionService.submit(new Callable<BusinessBundleSummaryModelDao>() {
+ @Override
+ public BusinessBundleSummaryModelDao call() throws Exception {
+ return buildBBS(account,
+ accountRecordId,
+ bst,
+ rankForBundle.get(bst.getBundleId()),
+ tenantRecordId,
+ reportGroup,
+ context);
+ }
+ });
+ }
+ for (final BusinessSubscriptionTransitionModelDao ignored : bstForBundle.values()) {
+ try {
+ bbss.add(completionService.take().get());
+ } catch (InterruptedException e) {
+ throw new AnalyticsRefreshException(e);
+ } catch (ExecutionException e) {
+ throw new AnalyticsRefreshException(e);
+ }
}
+
return bbss;
}
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java
index 3174f6d..e2942b1 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessInvoiceFactory.java
@@ -26,7 +26,6 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Executors;
import javax.annotation.Nullable;
@@ -65,14 +64,13 @@ import static com.ning.billing.osgi.bundles.analytics.utils.BusinessInvoiceUtils
public class BusinessInvoiceFactory extends BusinessFactoryBase {
- private static final int NB_THREADS = 20;
-
private final Executor executor;
public BusinessInvoiceFactory(final OSGIKillbillLogService logService,
- final OSGIKillbillAPI osgiKillbillAPI) {
+ final OSGIKillbillAPI osgiKillbillAPI,
+ final Executor executor) {
super(logService, osgiKillbillAPI);
- executor = Executors.newFixedThreadPool(NB_THREADS);
+ this.executor = executor;
}
/**
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessOverdueStatusFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessOverdueStatusFactory.java
index d346cc1..94a395a 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessOverdueStatusFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessOverdueStatusFactory.java
@@ -20,6 +20,11 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
import org.joda.time.DateTime;
@@ -39,18 +44,40 @@ import com.google.common.collect.Lists;
public class BusinessOverdueStatusFactory extends BusinessFactoryBase {
+ private final Executor executor;
+
public BusinessOverdueStatusFactory(final OSGIKillbillLogService logService,
- final OSGIKillbillAPI osgiKillbillAPI) {
+ final OSGIKillbillAPI osgiKillbillAPI,
+ final Executor executor) {
super(logService, osgiKillbillAPI);
+ this.executor = executor;
}
public Collection<BusinessOverdueStatusModelDao> createBusinessOverdueStatuses(final UUID accountId,
final CallContext context) throws AnalyticsRefreshException {
+ // We fetch the bundles in parallel as these can be very large on a per account basis (@see BusinessSubscriptionTransitionFactory)
+ // We don't care about the overall ordering but we do care about ordering for
+ // a given bundle (we'd like the generated record ids to be sequential).
+ final CompletionService<Collection<BusinessOverdueStatusModelDao>> completionService = new ExecutorCompletionService<Collection<BusinessOverdueStatusModelDao>>(executor);
final Collection<SubscriptionBundle> bundles = getSubscriptionBundlesForAccount(accountId, context);
final Collection<BusinessOverdueStatusModelDao> businessOverdueStatuses = new LinkedList<BusinessOverdueStatusModelDao>();
for (final SubscriptionBundle bundle : bundles) {
- // Recompute all blocking states for that bundle
- businessOverdueStatuses.addAll(createBusinessOverdueStatusesForBundle(accountId, bundle, context));
+ completionService.submit(new Callable<Collection<BusinessOverdueStatusModelDao>>() {
+ @Override
+ public Collection<BusinessOverdueStatusModelDao> call() throws Exception {
+ // Recompute all blocking states for that bundle
+ return createBusinessOverdueStatusesForBundle(accountId, bundle, context);
+ }
+ });
+ }
+ for (final SubscriptionBundle ignored : bundles) {
+ try {
+ businessOverdueStatuses.addAll(completionService.take().get());
+ } catch (InterruptedException e) {
+ throw new AnalyticsRefreshException(e);
+ } catch (ExecutionException e) {
+ throw new AnalyticsRefreshException(e);
+ }
}
return businessOverdueStatuses;
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java
index dfc2750..b84a0a6 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/dao/factory/BusinessSubscriptionTransitionFactory.java
@@ -26,7 +26,6 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Executors;
import javax.annotation.Nullable;
@@ -46,14 +45,13 @@ import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
public class BusinessSubscriptionTransitionFactory extends BusinessFactoryBase {
- private static final int NB_THREADS = 20;
-
private final Executor executor;
public BusinessSubscriptionTransitionFactory(final OSGIKillbillLogService logService,
- final OSGIKillbillAPI osgiKillbillAPI) {
+ final OSGIKillbillAPI osgiKillbillAPI,
+ final Executor executor) {
super(logService, osgiKillbillAPI);
- executor = Executors.newFixedThreadPool(NB_THREADS);
+ this.executor = executor;
}
public Collection<BusinessSubscriptionTransitionModelDao> createBusinessSubscriptionTransitions(final UUID accountId,
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java
index 7a6f40c..7cbae91 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java
@@ -22,6 +22,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import com.ning.billing.osgi.bundles.analytics.AnalyticsTestSuiteWithEmbeddedDB;
+import com.ning.billing.osgi.bundles.analytics.BusinessExecutor;
import com.ning.billing.osgi.bundles.analytics.api.BusinessAccount;
import com.ning.billing.osgi.bundles.analytics.api.BusinessSnapshot;
import com.ning.billing.osgi.bundles.analytics.dao.model.BusinessAccountModelDao;
@@ -41,7 +42,7 @@ public class TestDefaultAnalyticsUserApi extends AnalyticsTestSuiteWithEmbeddedD
reportGroup);
analyticsSqlDao.create(accountModelDao.getTableName(), accountModelDao, callContext);
- final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, killbillDataSource);
+ final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, killbillDataSource, BusinessExecutor.create(logService));
final BusinessSnapshot businessSnapshot = analyticsUserApi.getBusinessSnapshot(account.getId(), callContext);
Assert.assertEquals(businessSnapshot.getBusinessAccount(), new BusinessAccount(accountModelDao));
}
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java
index 84e9339..a909c5d 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java
@@ -40,6 +40,7 @@ import com.ning.billing.catalog.api.Product;
import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.osgi.bundles.analytics.AnalyticsTestSuiteNoDB;
+import com.ning.billing.osgi.bundles.analytics.BusinessExecutor;
import com.ning.billing.osgi.bundles.analytics.dao.model.BusinessSubscription;
import com.ning.billing.osgi.bundles.analytics.dao.model.BusinessSubscriptionEvent;
import com.ning.billing.osgi.bundles.analytics.dao.model.BusinessSubscriptionTransitionModelDao;
@@ -71,7 +72,7 @@ public class TestBusinessBundleSummaryFactory extends AnalyticsTestSuiteNoDB {
}
}).when(osgiKillbillLogService).log(Mockito.anyInt(), Mockito.anyString());
- bundleSummaryDao = new BusinessBundleSummaryFactory(osgiKillbillLogService, null);
+ bundleSummaryDao = new BusinessBundleSummaryFactory(osgiKillbillLogService, null, BusinessExecutor.create(osgiKillbillLogService));
}
@Test(groups = "fast")
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java
index a153d52..77bfc4e 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java
@@ -32,6 +32,7 @@ import org.testng.annotations.Test;
import com.ning.billing.invoice.api.InvoiceItem;
import com.ning.billing.invoice.api.InvoiceItemType;
import com.ning.billing.osgi.bundles.analytics.AnalyticsTestSuiteNoDB;
+import com.ning.billing.osgi.bundles.analytics.BusinessExecutor;
import com.ning.billing.osgi.bundles.analytics.dao.model.BusinessInvoiceItemBaseModelDao;
import com.ning.billing.osgi.bundles.analytics.dao.model.BusinessInvoiceItemBaseModelDao.ItemSource;
import com.ning.billing.osgi.bundles.analytics.utils.BusinessInvoiceUtils;
@@ -63,7 +64,7 @@ public class TestBusinessInvoiceFactory extends AnalyticsTestSuiteNoDB {
}
}).when(osgiKillbillLogService).log(Mockito.anyInt(), Mockito.anyString());
- invoiceFactory = new BusinessInvoiceFactory(osgiKillbillLogService, null);
+ invoiceFactory = new BusinessInvoiceFactory(osgiKillbillLogService, null, BusinessExecutor.create(osgiKillbillLogService));
}
@Test(groups = "fast")