killbill-uncached
Changes
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java 2(+1 -1)
osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java 151(+14 -137)
osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java 2(+1 -1)
osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java 2(+1 -1)
osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java 2(+1 -1)
osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/TestBusinessExecutor.java 3(+2 -1)
osgi-bundles/bundles/jruby/pom.xml 4(+4 -0)
osgi-bundles/bundles/jruby/src/main/java/com/ning/billing/osgi/bundles/jruby/JRubyActivator.java 6(+3 -3)
pom.xml 7(+6 -1)
Details
diff --git a/osgi-bundles/bundles/analytics/pom.xml b/osgi-bundles/bundles/analytics/pom.xml
index 55bc399..d39c94d 100644
--- a/osgi-bundles/bundles/analytics/pom.xml
+++ b/osgi-bundles/bundles/analytics/pom.xml
@@ -56,6 +56,10 @@
</dependency>
<dependency>
<groupId>com.ning.billing.commons</groupId>
+ <artifactId>killbill-concurrent</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.ning.billing.commons</groupId>
<artifactId>killbill-embeddeddb</artifactId>
</dependency>
<dependency>
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
index f3879c2..4c52a44 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
@@ -40,7 +40,7 @@ public class AnalyticsActivator extends KillbillActivatorBase {
public void start(final BundleContext context) throws Exception {
super.start(context);
- final Executor executor = BusinessExecutor.create(logService);
+ final Executor executor = BusinessExecutor.newCachedThreadPool();
analyticsListener = new AnalyticsListener(logService, killbillAPI, dataSource, executor);
dispatcher.registerEventHandler(analyticsListener);
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
index a496f78..773d6ac 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
@@ -16,155 +16,32 @@
package com.ning.billing.osgi.bundles.analytics;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.osgi.service.log.LogService;
-
-import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
+import com.ning.billing.commons.concurrent.Executors;
import com.google.common.annotations.VisibleForTesting;
-public class BusinessExecutor extends ThreadPoolExecutor {
+public class BusinessExecutor {
@VisibleForTesting
static final Integer NB_THREADS = Integer.valueOf(System.getProperty("com.ning.billing.osgi.bundles.analytics.nb_threads", "100"));
- private final OSGIKillbillLogService logService;
-
- public static BusinessExecutor create(final OSGIKillbillLogService logService) {
- return new BusinessExecutor(0,
- NB_THREADS,
- 60L,
- TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(),
- new NamedThreadFactory("osgi-analytics"),
- logService);
- }
-
- public BusinessExecutor(final int corePoolSize,
- final int maximumPoolSize,
- final long keepAliveTime,
- final TimeUnit unit,
- final BlockingQueue<Runnable> workQueue,
- final ThreadFactory threadFactory,
- final OSGIKillbillLogService logService) {
+ public static Executor newCachedThreadPool() {
// Note: we don't use the default rejection handler here (AbortPolicy) as we always want the tasks to be executed
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new CallerRunsPolicy());
- this.logService = logService;
- }
-
- @Override
- public <T> Future<T> submit(final Callable<T> task) {
- return super.submit(WrappedCallable.wrap(logService, task));
- }
-
- @Override
- public <T> Future<T> submit(final Runnable task, final T result) {
- // HACK: assumes ThreadPoolExecutor will create a callable and call execute()
- // (can't wrap the runnable here or exception isn't re-thrown when Future.get() is called)
- return super.submit(task, result);
- }
+ return Executors.newCachedThreadPool(0,
+ NB_THREADS,
+ "osgi-analytics-refresh",
+ 60L,
+ TimeUnit.SECONDS,
+ new CallerRunsPolicy());
- @Override
- public Future<?> submit(final Runnable task) {
- return super.submit(WrappedRunnable.wrap(logService, task));
}
- @Override
- public void execute(final Runnable command) {
- super.execute(WrappedRunnable.wrap(logService, command));
- }
-
- private static class WrappedCallable<T> implements Callable<T> {
-
- private final OSGIKillbillLogService logService;
- private final Callable<T> callable;
-
- private WrappedCallable(final OSGIKillbillLogService logService, final Callable<T> callable) {
- this.logService = logService;
- this.callable = callable;
- }
-
- public static <T> Callable<T> wrap(final OSGIKillbillLogService logService, final Callable<T> callable) {
- return callable instanceof WrappedCallable ? callable : new WrappedCallable<T>(logService, callable);
- }
-
- @Override
- public T call() throws Exception {
- final Thread currentThread = Thread.currentThread();
-
- try {
- return callable.call();
- } catch (Exception e) {
- // since callables are expected to sometimes throw exceptions, log this at DEBUG instead of ERROR
- logService.log(LogService.LOG_DEBUG, currentThread + " ended with an exception", e);
-
- throw e;
- } catch (Error e) {
- logService.log(LogService.LOG_ERROR, currentThread + " ended with an exception", e);
-
- throw e;
- } finally {
- logService.log(LogService.LOG_DEBUG, currentThread + " finished executing");
- }
- }
- }
-
- private static class WrappedRunnable implements Runnable {
-
- private final OSGIKillbillLogService logService;
- private final Runnable runnable;
-
- private WrappedRunnable(final OSGIKillbillLogService logService, final Runnable runnable) {
- this.logService = logService;
- this.runnable = runnable;
- }
-
- public static Runnable wrap(final OSGIKillbillLogService logService, final Runnable runnable) {
- return runnable instanceof WrappedRunnable ? runnable : new WrappedRunnable(logService, runnable);
- }
-
- @Override
- public void run() {
- final Thread currentThread = Thread.currentThread();
-
- try {
- runnable.run();
- } catch (Throwable e) {
- logService.log(LogService.LOG_ERROR, currentThread + " ended abnormally with an exception", e);
- }
-
- logService.log(LogService.LOG_DEBUG, currentThread + " finished executing");
- }
- }
-
- /**
- * Factory that sets the name of each thread it creates to {@code [name]-[id]}.
- * This makes debugging stack traces much easier.
- */
- private static class NamedThreadFactory implements ThreadFactory {
-
- private final AtomicInteger count = new AtomicInteger(0);
- private final String name;
-
- public NamedThreadFactory(final String name) {
- this.name = name;
- }
-
- @Override
- public Thread newThread(final Runnable runnable) {
- final Thread thread = new Thread(runnable);
-
- thread.setName(name + "-" + count.incrementAndGet());
-
- return thread;
- }
+ public static Executor newSingleThreadScheduledExecutor() {
+ return Executors.newSingleThreadScheduledExecutor("osgi-analytics-reports",
+ new CallerRunsPolicy());
}
}
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java
index 7cbae91..3c80ea2 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java
@@ -42,7 +42,7 @@ public class TestDefaultAnalyticsUserApi extends AnalyticsTestSuiteWithEmbeddedD
reportGroup);
analyticsSqlDao.create(accountModelDao.getTableName(), accountModelDao, callContext);
- final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, killbillDataSource, BusinessExecutor.create(logService));
+ final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, killbillDataSource, BusinessExecutor.newCachedThreadPool());
final BusinessSnapshot businessSnapshot = analyticsUserApi.getBusinessSnapshot(account.getId(), callContext);
Assert.assertEquals(businessSnapshot.getBusinessAccount(), new BusinessAccount(accountModelDao));
}
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java
index a909c5d..e21f156 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java
@@ -72,7 +72,7 @@ public class TestBusinessBundleSummaryFactory extends AnalyticsTestSuiteNoDB {
}
}).when(osgiKillbillLogService).log(Mockito.anyInt(), Mockito.anyString());
- bundleSummaryDao = new BusinessBundleSummaryFactory(osgiKillbillLogService, null, BusinessExecutor.create(osgiKillbillLogService));
+ bundleSummaryDao = new BusinessBundleSummaryFactory(osgiKillbillLogService, null, BusinessExecutor.newCachedThreadPool());
}
@Test(groups = "fast")
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java
index 77bfc4e..6f45b5e 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java
@@ -64,7 +64,7 @@ public class TestBusinessInvoiceFactory extends AnalyticsTestSuiteNoDB {
}
}).when(osgiKillbillLogService).log(Mockito.anyInt(), Mockito.anyString());
- invoiceFactory = new BusinessInvoiceFactory(osgiKillbillLogService, null, BusinessExecutor.create(osgiKillbillLogService));
+ invoiceFactory = new BusinessInvoiceFactory(osgiKillbillLogService, null, BusinessExecutor.newCachedThreadPool());
}
@Test(groups = "fast")
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/TestBusinessExecutor.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/TestBusinessExecutor.java
index 947c336..f779d83 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/TestBusinessExecutor.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/TestBusinessExecutor.java
@@ -19,6 +19,7 @@ package com.ning.billing.osgi.bundles.analytics;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.atomic.AtomicInteger;
@@ -50,7 +51,7 @@ public class TestBusinessExecutor extends AnalyticsTestSuiteNoDB {
@Test(groups = "fast")
public void testRejectionPolicy() throws Exception {
- final BusinessExecutor executor = BusinessExecutor.create(logService);
+ final Executor executor = BusinessExecutor.newCachedThreadPool();
final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);
final int totalTasksSize = BusinessExecutor.NB_THREADS * 50;
osgi-bundles/bundles/jruby/pom.xml 4(+4 -0)
diff --git a/osgi-bundles/bundles/jruby/pom.xml b/osgi-bundles/bundles/jruby/pom.xml
index 4bc53fd..c949b06 100644
--- a/osgi-bundles/bundles/jruby/pom.xml
+++ b/osgi-bundles/bundles/jruby/pom.xml
@@ -40,6 +40,10 @@
<artifactId>killbill-osgi-bundles-lib-killbill</artifactId>
</dependency>
<dependency>
+ <groupId>com.ning.billing.commons</groupId>
+ <artifactId>killbill-concurrent</artifactId>
+ </dependency>
+ <dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</dependency>
diff --git a/osgi-bundles/bundles/jruby/src/main/java/com/ning/billing/osgi/bundles/jruby/JRubyActivator.java b/osgi-bundles/bundles/jruby/src/main/java/com/ning/billing/osgi/bundles/jruby/JRubyActivator.java
index 33341aa..e98b1ab 100644
--- a/osgi-bundles/bundles/jruby/src/main/java/com/ning/billing/osgi/bundles/jruby/JRubyActivator.java
+++ b/osgi-bundles/bundles/jruby/src/main/java/com/ning/billing/osgi/bundles/jruby/JRubyActivator.java
@@ -19,7 +19,6 @@ package com.ning.billing.osgi.bundles.jruby;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.osgi.framework.BundleContext;
import org.osgi.service.log.LogService;
+import com.ning.billing.commons.concurrent.Executors;
import com.ning.billing.osgi.api.config.PluginConfig.PluginType;
import com.ning.billing.osgi.api.config.PluginConfigServiceApi;
import com.ning.billing.osgi.api.config.PluginRubyConfig;
@@ -102,8 +102,8 @@ public class JRubyActivator extends KillbillActivatorBase {
}
final AtomicBoolean firstStart = new AtomicBoolean(true);
- // TODO Switch to failsafe once in killbill-commons
- restartFuture = Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
+ restartFuture = Executors.newSingleThreadScheduledExecutor("jruby-restarter-" + pluginMain)
+ .scheduleWithFixedDelay(new Runnable() {
long lastRestartMillis = System.currentTimeMillis();
@Override
pom.xml 7(+6 -1)
diff --git a/pom.xml b/pom.xml
index 6d819af..1840247 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,7 +42,7 @@
</scm>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <killbill-commons.version>0.1.3</killbill-commons.version>
+ <killbill-commons.version>0.1.5</killbill-commons.version>
<slf4j.version>1.7.2</slf4j.version>
<ehcache.version>2.6.2</ehcache.version>
</properties>
@@ -316,6 +316,11 @@
</dependency>
<dependency>
<groupId>com.ning.billing.commons</groupId>
+ <artifactId>killbill-concurrent</artifactId>
+ <version>${killbill-commons.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.ning.billing.commons</groupId>
<artifactId>killbill-embeddeddb</artifactId>
<version>${killbill-commons.version}</version>
</dependency>