killbill-memoizeit

Details

diff --git a/payment/src/test/java/com/ning/billing/payment/EventBusFuture.java b/payment/src/test/java/com/ning/billing/payment/EventBusFuture.java
new file mode 100644
index 0000000..e947df8
--- /dev/null
+++ b/payment/src/test/java/com/ning/billing/payment/EventBusFuture.java
@@ -0,0 +1,59 @@
+package com.ning.billing.payment;
+
+import javax.annotation.Nullable;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.ning.billing.util.eventbus.IEventBus;
+import com.ning.billing.util.eventbus.IEventBus.EventBusException;
+
+public class EventBusFuture<T, V extends IEventBusResponseType<T>> extends AbstractFuture<V> {
+    public static <V, W extends IEventBusRequestType<V>, X extends IEventBusResponseType<V>> EventBusFuture<V, X> post(final IEventBus eventBus, final W event) throws EventBusException {
+        final EventBusFuture<V, X> responseFuture = new EventBusFuture<V, X>(eventBus, event.getId());
+
+        eventBus.register(responseFuture);
+        eventBus.post(event);
+        return responseFuture;
+    }
+
+    private final IEventBus eventBus;
+    private final T requestId;
+
+    private EventBusFuture(IEventBus eventBus, T requestId) {
+        this.eventBus = eventBus;
+        this.requestId = requestId;
+    }
+
+    @Subscribe
+    public void handleResponse(V response) {
+        if (requestId.equals(response.getRequestId())) {
+            set(response);
+        }
+    }
+
+    @Override
+    public boolean set(@Nullable V value) {
+        boolean result = super.set(value);
+
+        try {
+            eventBus.unregister(this);
+        }
+        catch (EventBusException ex) {
+            throw new RuntimeException(ex);
+        }
+        return result;
+    }
+
+    @Override
+    public boolean setException(Throwable throwable) {
+        boolean result = super.setException(throwable);
+
+        try {
+            eventBus.unregister(this);
+        }
+        catch (EventBusException ex) {
+            throw new RuntimeException(ex);
+        }
+        return result;
+    }
+}
diff --git a/payment/src/test/java/com/ning/billing/payment/IEventBusRequestType.java b/payment/src/test/java/com/ning/billing/payment/IEventBusRequestType.java
new file mode 100644
index 0000000..3a0d787
--- /dev/null
+++ b/payment/src/test/java/com/ning/billing/payment/IEventBusRequestType.java
@@ -0,0 +1,7 @@
+package com.ning.billing.payment;
+
+import com.ning.billing.util.eventbus.IEventBusType;
+
+public interface IEventBusRequestType<T> extends IEventBusType {
+    T getId();
+}
diff --git a/payment/src/test/java/com/ning/billing/payment/IEventBusResponseType.java b/payment/src/test/java/com/ning/billing/payment/IEventBusResponseType.java
new file mode 100644
index 0000000..d658678
--- /dev/null
+++ b/payment/src/test/java/com/ning/billing/payment/IEventBusResponseType.java
@@ -0,0 +1,7 @@
+package com.ning.billing.payment;
+
+import com.ning.billing.util.eventbus.IEventBusType;
+
+public interface IEventBusResponseType<T> extends IEventBusType {
+    T getRequestId();
+}
diff --git a/payment/src/test/java/com/ning/billing/payment/TestSyncWaitOnEventBus.java b/payment/src/test/java/com/ning/billing/payment/TestSyncWaitOnEventBus.java
new file mode 100644
index 0000000..521c3a0
--- /dev/null
+++ b/payment/src/test/java/com/ning/billing/payment/TestSyncWaitOnEventBus.java
@@ -0,0 +1,86 @@
+package com.ning.billing.payment;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.eventbus.Subscribe;
+import com.ning.billing.util.eventbus.IEventBus;
+import com.ning.billing.util.eventbus.MemoryEventBus;
+
+@Test
+public class TestSyncWaitOnEventBus {
+    private static final class TestEvent implements IEventBusRequestType<UUID> {
+        private final UUID id;
+        private final String msg;
+
+        public TestEvent(UUID id, String msg) {
+            this.id = id;
+            this.msg = msg;
+        }
+
+        @Override
+        public UUID getId() {
+            return id;
+        }
+
+        public String getMsg() {
+            return msg;
+        }
+    }
+
+    private static final class TestResponse implements IEventBusResponseType<UUID> {
+        private final UUID id;
+        private final String msg;
+
+        public TestResponse(UUID id, String msg) {
+            this.id = id;
+            this.msg = msg;
+        }
+
+        @Override
+        public UUID getRequestId() {
+            return id;
+        }
+
+        public String getMsg() {
+            return msg;
+        }
+    }
+
+    private IEventBus eventBus;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws Exception {
+        eventBus = new MemoryEventBus();
+        eventBus.start();
+        eventBus.register(new Object() {
+            @Subscribe
+            public void handleEvent(TestEvent event) throws Exception {
+                Thread.sleep(100);
+                eventBus.post(new TestResponse(event.getId(), event.getMsg()));
+            }
+        });
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        eventBus.stop();
+    }
+
+    public void test() throws Exception {
+        final TestEvent event = new TestEvent(UUID.randomUUID(), "Hello World!");
+
+        Future<TestResponse> future = EventBusFuture.post(eventBus, event);
+        TestResponse response = future.get(1, TimeUnit.SECONDS);
+
+        assertEquals(response.getRequestId(), event.getId());
+        assertEquals(response.getMsg(), event.getMsg());
+    }
+}