Details
diff --git a/payment/src/test/java/com/ning/billing/payment/EventBusFuture.java b/payment/src/test/java/com/ning/billing/payment/EventBusFuture.java
new file mode 100644
index 0000000..e947df8
--- /dev/null
+++ b/payment/src/test/java/com/ning/billing/payment/EventBusFuture.java
@@ -0,0 +1,59 @@
+package com.ning.billing.payment;
+
+import javax.annotation.Nullable;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.ning.billing.util.eventbus.IEventBus;
+import com.ning.billing.util.eventbus.IEventBus.EventBusException;
+
+public class EventBusFuture<T, V extends IEventBusResponseType<T>> extends AbstractFuture<V> {
+ public static <V, W extends IEventBusRequestType<V>, X extends IEventBusResponseType<V>> EventBusFuture<V, X> post(final IEventBus eventBus, final W event) throws EventBusException {
+ final EventBusFuture<V, X> responseFuture = new EventBusFuture<V, X>(eventBus, event.getId());
+
+ eventBus.register(responseFuture);
+ eventBus.post(event);
+ return responseFuture;
+ }
+
+ private final IEventBus eventBus;
+ private final T requestId;
+
+ private EventBusFuture(IEventBus eventBus, T requestId) {
+ this.eventBus = eventBus;
+ this.requestId = requestId;
+ }
+
+ @Subscribe
+ public void handleResponse(V response) {
+ if (requestId.equals(response.getRequestId())) {
+ set(response);
+ }
+ }
+
+ @Override
+ public boolean set(@Nullable V value) {
+ boolean result = super.set(value);
+
+ try {
+ eventBus.unregister(this);
+ }
+ catch (EventBusException ex) {
+ throw new RuntimeException(ex);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean setException(Throwable throwable) {
+ boolean result = super.setException(throwable);
+
+ try {
+ eventBus.unregister(this);
+ }
+ catch (EventBusException ex) {
+ throw new RuntimeException(ex);
+ }
+ return result;
+ }
+}
diff --git a/payment/src/test/java/com/ning/billing/payment/IEventBusRequestType.java b/payment/src/test/java/com/ning/billing/payment/IEventBusRequestType.java
new file mode 100644
index 0000000..3a0d787
--- /dev/null
+++ b/payment/src/test/java/com/ning/billing/payment/IEventBusRequestType.java
@@ -0,0 +1,7 @@
+package com.ning.billing.payment;
+
+import com.ning.billing.util.eventbus.IEventBusType;
+
+public interface IEventBusRequestType<T> extends IEventBusType {
+ T getId();
+}
diff --git a/payment/src/test/java/com/ning/billing/payment/IEventBusResponseType.java b/payment/src/test/java/com/ning/billing/payment/IEventBusResponseType.java
new file mode 100644
index 0000000..d658678
--- /dev/null
+++ b/payment/src/test/java/com/ning/billing/payment/IEventBusResponseType.java
@@ -0,0 +1,7 @@
+package com.ning.billing.payment;
+
+import com.ning.billing.util.eventbus.IEventBusType;
+
+public interface IEventBusResponseType<T> extends IEventBusType {
+ T getRequestId();
+}
diff --git a/payment/src/test/java/com/ning/billing/payment/TestSyncWaitOnEventBus.java b/payment/src/test/java/com/ning/billing/payment/TestSyncWaitOnEventBus.java
new file mode 100644
index 0000000..521c3a0
--- /dev/null
+++ b/payment/src/test/java/com/ning/billing/payment/TestSyncWaitOnEventBus.java
@@ -0,0 +1,86 @@
+package com.ning.billing.payment;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.eventbus.Subscribe;
+import com.ning.billing.util.eventbus.IEventBus;
+import com.ning.billing.util.eventbus.MemoryEventBus;
+
+@Test
+public class TestSyncWaitOnEventBus {
+ private static final class TestEvent implements IEventBusRequestType<UUID> {
+ private final UUID id;
+ private final String msg;
+
+ public TestEvent(UUID id, String msg) {
+ this.id = id;
+ this.msg = msg;
+ }
+
+ @Override
+ public UUID getId() {
+ return id;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+ }
+
+ private static final class TestResponse implements IEventBusResponseType<UUID> {
+ private final UUID id;
+ private final String msg;
+
+ public TestResponse(UUID id, String msg) {
+ this.id = id;
+ this.msg = msg;
+ }
+
+ @Override
+ public UUID getRequestId() {
+ return id;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+ }
+
+ private IEventBus eventBus;
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp() throws Exception {
+ eventBus = new MemoryEventBus();
+ eventBus.start();
+ eventBus.register(new Object() {
+ @Subscribe
+ public void handleEvent(TestEvent event) throws Exception {
+ Thread.sleep(100);
+ eventBus.post(new TestResponse(event.getId(), event.getMsg()));
+ }
+ });
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ eventBus.stop();
+ }
+
+ public void test() throws Exception {
+ final TestEvent event = new TestEvent(UUID.randomUUID(), "Hello World!");
+
+ Future<TestResponse> future = EventBusFuture.post(eventBus, event);
+ TestResponse response = future.get(1, TimeUnit.SECONDS);
+
+ assertEquals(response.getRequestId(), event.getId());
+ assertEquals(response.getMsg(), event.getMsg());
+ }
+}