killbill-memoizeit

Add initial unit test for EventBus

11/14/2011 9:23:31 PM

Details

diff --git a/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java b/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java
index 400227b..ec0b7b7 100644
--- a/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java
+++ b/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java
@@ -70,7 +70,7 @@ public interface IEventBus {
      *
      *  @throws EventBusException if bus not been started yet
      */
-    public void register(IEventBusType handlerInstance) throws EventBusException;
+    public void register(Object handlerInstance) throws EventBusException;
 
 
     /**
diff --git a/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
index 0c346a1..0b4debb 100644
--- a/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
+++ b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
@@ -29,6 +29,9 @@ import com.google.common.eventbus.AsyncEventBus;
 
 public class MemoryEventBus implements IEventBus {
 
+    // STEPH config ?
+    private final static int MAX_EVENT_THREADS = 13;
+
     private final static String EVENT_BUS_IDENTIFIER = "eventbus-service";
     private final static String EVENT_BUS_GROUP_NAME = "eventbus-grp";
     private final static String EVENT_BUS_TH_NAME = "eventbus-th";
@@ -54,14 +57,14 @@ public class MemoryEventBus implements IEventBus {
         }
 
         public void stop() {
-            // STEPH TBD
+            // STEPH hum..
         }
     }
 
     public MemoryEventBus() {
 
         final ThreadGroup group = new ThreadGroup(EVENT_BUS_GROUP_NAME);
-        Executor executor = Executors.newCachedThreadPool(new ThreadFactory() {
+        Executor executor = Executors.newFixedThreadPool(MAX_EVENT_THREADS, new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                 return new Thread(group, r, EVENT_BUS_TH_NAME);
@@ -73,9 +76,9 @@ public class MemoryEventBus implements IEventBus {
     }
 
     @Override
-    public void register(IEventBusType handlerInstance) throws EventBusException {
+    public void register(Object handlerInstnace) throws EventBusException {
         checkInitialized("register");
-        delegate.register(handlerInstance);
+        delegate.register(handlerInstnace);
     }
 
     @Override
@@ -94,6 +97,7 @@ public class MemoryEventBus implements IEventBus {
     public void start() {
         if (isInitialized.compareAndSet(false, true)) {
             log.info("MemoryEventBus started...");
+
         }
     }
 
diff --git a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
index c69014a..e4e41df 100644
--- a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
+++ b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
@@ -16,26 +16,95 @@
 
 package com.ning.billing.util.eventbus;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 
 public class TestEventBus {
 
-    private EventBus bus;
-    private AsyncEventBus asyncBus;
+    private static final Logger log = LoggerFactory.getLogger(TestEventBus.class);
+
+    private IEventBus eventBus;
 
 
     @BeforeClass
     public void setup() {
+        eventBus = new MemoryEventBus();
+        eventBus.start();
+    }
+
+    @AfterClass
+    public void tearDown() {
+        eventBus.stop();
+    }
+
+    public static final class MyEvent implements IEventBusType {
+        String name;
+        Long value;
 
+        public MyEvent(String name, Long value) {
+            this.name = name;
+            this.value = value;
+        }
     }
 
+    public static class MyEventHandler {
+
+        private final int expectedEvents;
+
+        private int gotEvents;
+
+
+        public MyEventHandler(int exp) {
+            this.expectedEvents = exp;
+            this.gotEvents = 0;
+        }
+
+        public synchronized int getEvents() {
+            return gotEvents;
+        }
+
+        @Subscribe
+        public synchronized void processEvent(MyEvent event) {
+            gotEvents++;
+            log.info("Got event {} {}", event.name, event.value);
+        }
+
+        public synchronized boolean waitForCompletion(long timeoutMs) {
+
+            while (gotEvents < expectedEvents) {
+                try {
+                    wait(timeoutMs);
+                    break;
+                } catch (InterruptedException ignore) {
+                }
+            }
+            return (gotEvents == expectedEvents);
+        }
+    }
 
     @Test()
     public void test() {
+        try {
+
+            int nbEvents = 127;
+            MyEventHandler handler = new MyEventHandler(nbEvents);
+            eventBus.register(handler);
+
+            for (int i = 0; i < nbEvents; i++) {
+                eventBus.post(new MyEvent("my-event", (long) i));
+            }
+
+            boolean completed = handler.waitForCompletion(3000);
+            Assert.assertEquals(completed, true);
+        } catch (Exception e) {
+            Assert.fail("",e);
+        }
 
     }
 }
diff --git a/util/src/test/resources/log4j.xml b/util/src/test/resources/log4j.xml
new file mode 100644
index 0000000..ded1f80
--- /dev/null
+++ b/util/src/test/resources/log4j.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2010-2011 Ning, Inc.
+  ~
+  ~ Ning licenses this file to you under the Apache License, version 2.0
+  ~ (the "License"); you may not use this file except in compliance with the
+  ~ License.  You may obtain a copy of the License at:
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+  ~ License for the specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+    <appender name="stdout" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%p	%d{ISO8601}	%X{trace}	%t	%c	%m%n"/>
+        </layout>
+    </appender>
+
+
+    <logger name="com.ning.billing.util">
+        <level value="info"/>
+    </logger>
+
+    <root>
+        <priority value="info"/>
+        <appender-ref ref="stdout"/>
+    </root>
+</log4j:configuration>