killbill-memoizeit

jruby: analytics: use Executors from killbill-concurrent Signed-off-by:

5/15/2013 6:01:58 PM

Details

diff --git a/osgi-bundles/bundles/analytics/pom.xml b/osgi-bundles/bundles/analytics/pom.xml
index 55bc399..d39c94d 100644
--- a/osgi-bundles/bundles/analytics/pom.xml
+++ b/osgi-bundles/bundles/analytics/pom.xml
@@ -56,6 +56,10 @@
         </dependency>
         <dependency>
             <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-concurrent</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.ning.billing.commons</groupId>
             <artifactId>killbill-embeddeddb</artifactId>
         </dependency>
         <dependency>
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
index f3879c2..4c52a44 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/AnalyticsActivator.java
@@ -40,7 +40,7 @@ public class AnalyticsActivator extends KillbillActivatorBase {
     public void start(final BundleContext context) throws Exception {
         super.start(context);
 
-        final Executor executor = BusinessExecutor.create(logService);
+        final Executor executor = BusinessExecutor.newCachedThreadPool();
 
         analyticsListener = new AnalyticsListener(logService, killbillAPI, dataSource, executor);
         dispatcher.registerEventHandler(analyticsListener);
diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
index a496f78..773d6ac 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
@@ -16,155 +16,32 @@
 
 package com.ning.billing.osgi.bundles.analytics;
 
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.osgi.service.log.LogService;
-
-import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
+import com.ning.billing.commons.concurrent.Executors;
 
 import com.google.common.annotations.VisibleForTesting;
 
-public class BusinessExecutor extends ThreadPoolExecutor {
+public class BusinessExecutor {
 
     @VisibleForTesting
     static final Integer NB_THREADS = Integer.valueOf(System.getProperty("com.ning.billing.osgi.bundles.analytics.nb_threads", "100"));
 
-    private final OSGIKillbillLogService logService;
-
-    public static BusinessExecutor create(final OSGIKillbillLogService logService) {
-        return new BusinessExecutor(0,
-                                    NB_THREADS,
-                                    60L,
-                                    TimeUnit.SECONDS,
-                                    new SynchronousQueue<Runnable>(),
-                                    new NamedThreadFactory("osgi-analytics"),
-                                    logService);
-    }
-
-    public BusinessExecutor(final int corePoolSize,
-                            final int maximumPoolSize,
-                            final long keepAliveTime,
-                            final TimeUnit unit,
-                            final BlockingQueue<Runnable> workQueue,
-                            final ThreadFactory threadFactory,
-                            final OSGIKillbillLogService logService) {
+    public static Executor newCachedThreadPool() {
         // Note: we don't use the default rejection handler here (AbortPolicy) as we always want the tasks to be executed
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new CallerRunsPolicy());
-        this.logService = logService;
-    }
-
-    @Override
-    public <T> Future<T> submit(final Callable<T> task) {
-        return super.submit(WrappedCallable.wrap(logService, task));
-    }
-
-    @Override
-    public <T> Future<T> submit(final Runnable task, final T result) {
-        // HACK: assumes ThreadPoolExecutor will create a callable and call execute()
-        // (can't wrap the runnable here or exception isn't re-thrown when Future.get() is called)
-        return super.submit(task, result);
-    }
+        return Executors.newCachedThreadPool(0,
+                                             NB_THREADS,
+                                             "osgi-analytics-refresh",
+                                             60L,
+                                             TimeUnit.SECONDS,
+                                             new CallerRunsPolicy());
 
-    @Override
-    public Future<?> submit(final Runnable task) {
-        return super.submit(WrappedRunnable.wrap(logService, task));
     }
 
-    @Override
-    public void execute(final Runnable command) {
-        super.execute(WrappedRunnable.wrap(logService, command));
-    }
-
-    private static class WrappedCallable<T> implements Callable<T> {
-
-        private final OSGIKillbillLogService logService;
-        private final Callable<T> callable;
-
-        private WrappedCallable(final OSGIKillbillLogService logService, final Callable<T> callable) {
-            this.logService = logService;
-            this.callable = callable;
-        }
-
-        public static <T> Callable<T> wrap(final OSGIKillbillLogService logService, final Callable<T> callable) {
-            return callable instanceof WrappedCallable ? callable : new WrappedCallable<T>(logService, callable);
-        }
-
-        @Override
-        public T call() throws Exception {
-            final Thread currentThread = Thread.currentThread();
-
-            try {
-                return callable.call();
-            } catch (Exception e) {
-                // since callables are expected to sometimes throw exceptions, log this at DEBUG instead of ERROR
-                logService.log(LogService.LOG_DEBUG, currentThread + " ended with an exception", e);
-
-                throw e;
-            } catch (Error e) {
-                logService.log(LogService.LOG_ERROR, currentThread + " ended with an exception", e);
-
-                throw e;
-            } finally {
-                logService.log(LogService.LOG_DEBUG, currentThread + " finished executing");
-            }
-        }
-    }
-
-    private static class WrappedRunnable implements Runnable {
-
-        private final OSGIKillbillLogService logService;
-        private final Runnable runnable;
-
-        private WrappedRunnable(final OSGIKillbillLogService logService, final Runnable runnable) {
-            this.logService = logService;
-            this.runnable = runnable;
-        }
-
-        public static Runnable wrap(final OSGIKillbillLogService logService, final Runnable runnable) {
-            return runnable instanceof WrappedRunnable ? runnable : new WrappedRunnable(logService, runnable);
-        }
-
-        @Override
-        public void run() {
-            final Thread currentThread = Thread.currentThread();
-
-            try {
-                runnable.run();
-            } catch (Throwable e) {
-                logService.log(LogService.LOG_ERROR, currentThread + " ended abnormally with an exception", e);
-            }
-
-            logService.log(LogService.LOG_DEBUG, currentThread + " finished executing");
-        }
-    }
-
-    /**
-     * Factory that sets the name of each thread it creates to {@code [name]-[id]}.
-     * This makes debugging stack traces much easier.
-     */
-    private static class NamedThreadFactory implements ThreadFactory {
-
-        private final AtomicInteger count = new AtomicInteger(0);
-        private final String name;
-
-        public NamedThreadFactory(final String name) {
-            this.name = name;
-        }
-
-        @Override
-        public Thread newThread(final Runnable runnable) {
-            final Thread thread = new Thread(runnable);
-
-            thread.setName(name + "-" + count.incrementAndGet());
-
-            return thread;
-        }
+    public static Executor newSingleThreadScheduledExecutor() {
+        return Executors.newSingleThreadScheduledExecutor("osgi-analytics-reports",
+                                                          new CallerRunsPolicy());
     }
 }
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java
index 7cbae91..3c80ea2 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/api/user/TestDefaultAnalyticsUserApi.java
@@ -42,7 +42,7 @@ public class TestDefaultAnalyticsUserApi extends AnalyticsTestSuiteWithEmbeddedD
                                                                                     reportGroup);
         analyticsSqlDao.create(accountModelDao.getTableName(), accountModelDao, callContext);
 
-        final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, killbillDataSource, BusinessExecutor.create(logService));
+        final AnalyticsUserApi analyticsUserApi = new AnalyticsUserApi(logService, killbillAPI, killbillDataSource, BusinessExecutor.newCachedThreadPool());
         final BusinessSnapshot businessSnapshot = analyticsUserApi.getBusinessSnapshot(account.getId(), callContext);
         Assert.assertEquals(businessSnapshot.getBusinessAccount(), new BusinessAccount(accountModelDao));
     }
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java
index a909c5d..e21f156 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessBundleSummaryFactory.java
@@ -72,7 +72,7 @@ public class TestBusinessBundleSummaryFactory extends AnalyticsTestSuiteNoDB {
             }
         }).when(osgiKillbillLogService).log(Mockito.anyInt(), Mockito.anyString());
 
-        bundleSummaryDao = new BusinessBundleSummaryFactory(osgiKillbillLogService, null, BusinessExecutor.create(osgiKillbillLogService));
+        bundleSummaryDao = new BusinessBundleSummaryFactory(osgiKillbillLogService, null, BusinessExecutor.newCachedThreadPool());
     }
 
     @Test(groups = "fast")
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java
index 77bfc4e..6f45b5e 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/dao/factory/TestBusinessInvoiceFactory.java
@@ -64,7 +64,7 @@ public class TestBusinessInvoiceFactory extends AnalyticsTestSuiteNoDB {
             }
         }).when(osgiKillbillLogService).log(Mockito.anyInt(), Mockito.anyString());
 
-        invoiceFactory = new BusinessInvoiceFactory(osgiKillbillLogService, null, BusinessExecutor.create(osgiKillbillLogService));
+        invoiceFactory = new BusinessInvoiceFactory(osgiKillbillLogService, null, BusinessExecutor.newCachedThreadPool());
     }
 
     @Test(groups = "fast")
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/TestBusinessExecutor.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/TestBusinessExecutor.java
index 947c336..f779d83 100644
--- a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/TestBusinessExecutor.java
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/TestBusinessExecutor.java
@@ -19,6 +19,7 @@ package com.ning.billing.osgi.bundles.analytics;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -50,7 +51,7 @@ public class TestBusinessExecutor extends AnalyticsTestSuiteNoDB {
 
     @Test(groups = "fast")
     public void testRejectionPolicy() throws Exception {
-        final BusinessExecutor executor = BusinessExecutor.create(logService);
+        final Executor executor = BusinessExecutor.newCachedThreadPool();
         final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);
 
         final int totalTasksSize = BusinessExecutor.NB_THREADS * 50;
diff --git a/osgi-bundles/bundles/jruby/pom.xml b/osgi-bundles/bundles/jruby/pom.xml
index 4bc53fd..c949b06 100644
--- a/osgi-bundles/bundles/jruby/pom.xml
+++ b/osgi-bundles/bundles/jruby/pom.xml
@@ -40,6 +40,10 @@
             <artifactId>killbill-osgi-bundles-lib-killbill</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.ning.billing.commons</groupId>
+            <artifactId>killbill-concurrent</artifactId>
+        </dependency>
+        <dependency>
             <groupId>javax.servlet</groupId>
             <artifactId>javax.servlet-api</artifactId>
         </dependency>
diff --git a/osgi-bundles/bundles/jruby/src/main/java/com/ning/billing/osgi/bundles/jruby/JRubyActivator.java b/osgi-bundles/bundles/jruby/src/main/java/com/ning/billing/osgi/bundles/jruby/JRubyActivator.java
index 33341aa..e98b1ab 100644
--- a/osgi-bundles/bundles/jruby/src/main/java/com/ning/billing/osgi/bundles/jruby/JRubyActivator.java
+++ b/osgi-bundles/bundles/jruby/src/main/java/com/ning/billing/osgi/bundles/jruby/JRubyActivator.java
@@ -19,7 +19,6 @@ package com.ning.billing.osgi.bundles.jruby;
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.log.LogService;
 
+import com.ning.billing.commons.concurrent.Executors;
 import com.ning.billing.osgi.api.config.PluginConfig.PluginType;
 import com.ning.billing.osgi.api.config.PluginConfigServiceApi;
 import com.ning.billing.osgi.api.config.PluginRubyConfig;
@@ -102,8 +102,8 @@ public class JRubyActivator extends KillbillActivatorBase {
         }
 
         final AtomicBoolean firstStart = new AtomicBoolean(true);
-        // TODO Switch to failsafe once in killbill-commons
-        restartFuture = Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
+        restartFuture = Executors.newSingleThreadScheduledExecutor("jruby-restarter-" + pluginMain)
+                                 .scheduleWithFixedDelay(new Runnable() {
             long lastRestartMillis = System.currentTimeMillis();
 
             @Override

pom.xml 7(+6 -1)

diff --git a/pom.xml b/pom.xml
index 6d819af..1840247 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,7 +42,7 @@
     </scm>
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <killbill-commons.version>0.1.3</killbill-commons.version>
+        <killbill-commons.version>0.1.5</killbill-commons.version>
         <slf4j.version>1.7.2</slf4j.version>
         <ehcache.version>2.6.2</ehcache.version>
     </properties>
@@ -316,6 +316,11 @@
             </dependency>
             <dependency>
                 <groupId>com.ning.billing.commons</groupId>
+                <artifactId>killbill-concurrent</artifactId>
+                <version>${killbill-commons.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.ning.billing.commons</groupId>
                 <artifactId>killbill-embeddeddb</artifactId>
                 <version>${killbill-commons.version}</version>
             </dependency>