killbill-memoizeit
Changes
util/src/test/resources/log4j.xml 36(+36 -0)
Details
diff --git a/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java b/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java
index 400227b..ec0b7b7 100644
--- a/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java
+++ b/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java
@@ -70,7 +70,7 @@ public interface IEventBus {
*
* @throws EventBusException if bus not been started yet
*/
- public void register(IEventBusType handlerInstance) throws EventBusException;
+ public void register(Object handlerInstance) throws EventBusException;
/**
diff --git a/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
index 0c346a1..0b4debb 100644
--- a/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
+++ b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
@@ -29,6 +29,9 @@ import com.google.common.eventbus.AsyncEventBus;
public class MemoryEventBus implements IEventBus {
+ // STEPH config ?
+ private final static int MAX_EVENT_THREADS = 13;
+
private final static String EVENT_BUS_IDENTIFIER = "eventbus-service";
private final static String EVENT_BUS_GROUP_NAME = "eventbus-grp";
private final static String EVENT_BUS_TH_NAME = "eventbus-th";
@@ -54,14 +57,14 @@ public class MemoryEventBus implements IEventBus {
}
public void stop() {
- // STEPH TBD
+ // STEPH hum..
}
}
public MemoryEventBus() {
final ThreadGroup group = new ThreadGroup(EVENT_BUS_GROUP_NAME);
- Executor executor = Executors.newCachedThreadPool(new ThreadFactory() {
+ Executor executor = Executors.newFixedThreadPool(MAX_EVENT_THREADS, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(group, r, EVENT_BUS_TH_NAME);
@@ -73,9 +76,9 @@ public class MemoryEventBus implements IEventBus {
}
@Override
- public void register(IEventBusType handlerInstance) throws EventBusException {
+ public void register(Object handlerInstnace) throws EventBusException {
checkInitialized("register");
- delegate.register(handlerInstance);
+ delegate.register(handlerInstnace);
}
@Override
@@ -94,6 +97,7 @@ public class MemoryEventBus implements IEventBus {
public void start() {
if (isInitialized.compareAndSet(false, true)) {
log.info("MemoryEventBus started...");
+
}
}
diff --git a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
index c69014a..e4e41df 100644
--- a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
+++ b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
@@ -16,26 +16,95 @@
package com.ning.billing.util.eventbus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
public class TestEventBus {
- private EventBus bus;
- private AsyncEventBus asyncBus;
+ private static final Logger log = LoggerFactory.getLogger(TestEventBus.class);
+
+ private IEventBus eventBus;
@BeforeClass
public void setup() {
+ eventBus = new MemoryEventBus();
+ eventBus.start();
+ }
+
+ @AfterClass
+ public void tearDown() {
+ eventBus.stop();
+ }
+
+ public static final class MyEvent implements IEventBusType {
+ String name;
+ Long value;
+ public MyEvent(String name, Long value) {
+ this.name = name;
+ this.value = value;
+ }
}
+ public static class MyEventHandler {
+
+ private final int expectedEvents;
+
+ private int gotEvents;
+
+
+ public MyEventHandler(int exp) {
+ this.expectedEvents = exp;
+ this.gotEvents = 0;
+ }
+
+ public synchronized int getEvents() {
+ return gotEvents;
+ }
+
+ @Subscribe
+ public synchronized void processEvent(MyEvent event) {
+ gotEvents++;
+ log.info("Got event {} {}", event.name, event.value);
+ }
+
+ public synchronized boolean waitForCompletion(long timeoutMs) {
+
+ while (gotEvents < expectedEvents) {
+ try {
+ wait(timeoutMs);
+ break;
+ } catch (InterruptedException ignore) {
+ }
+ }
+ return (gotEvents == expectedEvents);
+ }
+ }
@Test()
public void test() {
+ try {
+
+ int nbEvents = 127;
+ MyEventHandler handler = new MyEventHandler(nbEvents);
+ eventBus.register(handler);
+
+ for (int i = 0; i < nbEvents; i++) {
+ eventBus.post(new MyEvent("my-event", (long) i));
+ }
+
+ boolean completed = handler.waitForCompletion(3000);
+ Assert.assertEquals(completed, true);
+ } catch (Exception e) {
+ Assert.fail("",e);
+ }
}
}
util/src/test/resources/log4j.xml 36(+36 -0)
diff --git a/util/src/test/resources/log4j.xml b/util/src/test/resources/log4j.xml
new file mode 100644
index 0000000..ded1f80
--- /dev/null
+++ b/util/src/test/resources/log4j.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2010-2011 Ning, Inc.
+ ~
+ ~ Ning licenses this file to you under the Apache License, version 2.0
+ ~ (the "License"); you may not use this file except in compliance with the
+ ~ License. You may obtain a copy of the License at:
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ ~ License for the specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="stdout" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%p %d{ISO8601} %X{trace} %t %c %m%n"/>
+ </layout>
+ </appender>
+
+
+ <logger name="com.ning.billing.util">
+ <level value="info"/>
+ </logger>
+
+ <root>
+ <priority value="info"/>
+ <appender-ref ref="stdout"/>
+ </root>
+</log4j:configuration>