killbill-uncached

analytics: switch to CallerRunsPolicy for BusinessExecutor The

5/6/2013 8:04:20 PM

Details

diff --git a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
index 75c8a98..a496f78 100644
--- a/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
+++ b/osgi-bundles/bundles/analytics/src/main/java/com/ning/billing/osgi/bundles/analytics/BusinessExecutor.java
@@ -29,9 +29,12 @@ import org.osgi.service.log.LogService;
 
 import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class BusinessExecutor extends ThreadPoolExecutor {
 
-    private static final Integer NB_THREADS = Integer.valueOf(System.getProperty("com.ning.billing.osgi.bundles.analytics.nb_threads", "100"));
+    @VisibleForTesting
+    static final Integer NB_THREADS = Integer.valueOf(System.getProperty("com.ning.billing.osgi.bundles.analytics.nb_threads", "100"));
 
     private final OSGIKillbillLogService logService;
 
@@ -52,7 +55,8 @@ public class BusinessExecutor extends ThreadPoolExecutor {
                             final BlockingQueue<Runnable> workQueue,
                             final ThreadFactory threadFactory,
                             final OSGIKillbillLogService logService) {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        // Note: we don't use the default rejection handler here (AbortPolicy) as we always want the tasks to be executed
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new CallerRunsPolicy());
         this.logService = logService;
     }
 
diff --git a/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/TestBusinessExecutor.java b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/TestBusinessExecutor.java
new file mode 100644
index 0000000..947c336
--- /dev/null
+++ b/osgi-bundles/bundles/analytics/src/test/java/com/ning/billing/osgi/bundles/analytics/TestBusinessExecutor.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2010-2013 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.osgi.bundles.analytics;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.killbill.osgi.libs.killbill.OSGIKillbillLogService;
+
+public class TestBusinessExecutor extends AnalyticsTestSuiteNoDB {
+
+    @Override
+    @BeforeMethod(groups = "fast")
+    public void setUp() throws Exception {
+        super.setUp();
+
+        logService = Mockito.mock(OSGIKillbillLogService.class);
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable {
+                //logger.info(Arrays.toString(invocation.getArguments()));
+                return null;
+            }
+        }).when(logService).log(Mockito.anyInt(), Mockito.anyString());
+    }
+
+    @Test(groups = "fast")
+    public void testRejectionPolicy() throws Exception {
+        final BusinessExecutor executor = BusinessExecutor.create(logService);
+        final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);
+
+        final int totalTasksSize = BusinessExecutor.NB_THREADS * 50;
+        final AtomicInteger taskCounter = new AtomicInteger(totalTasksSize);
+        for (int i = 0; i < totalTasksSize; i++) {
+            completionService.submit(new Callable<Integer>() {
+                @Override
+                public Integer call() throws Exception {
+                    // Sleep a bit to trigger the rejection
+                    Thread.sleep(100);
+                    taskCounter.getAndDecrement();
+                    return 1;
+                }
+            });
+        }
+
+        int results = 0;
+        for (int i = 0; i < totalTasksSize; i++) {
+            try {
+                // We want to make sure the policy didn't affect the completion queue of the ExecutorCompletionService
+                results += completionService.take().get();
+            } catch (InterruptedException e) {
+                Assert.fail();
+            } catch (ExecutionException e) {
+                Assert.fail();
+            }
+        }
+        Assert.assertEquals(taskCounter.get(), 0);
+        Assert.assertEquals(results, totalTasksSize);
+    }
+}