Details
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java b/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java
index 1cdba7b..7aab8c8 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/DefaultBeatrixService.java
@@ -19,9 +19,7 @@ package com.ning.billing.beatrix;
import javax.inject.Inject;
import com.ning.billing.beatrix.bus.api.BeatrixService;
-import com.ning.billing.beatrix.bus.api.ExternalBus;
import com.ning.billing.beatrix.extbus.BeatrixListener;
-import com.ning.billing.beatrix.extbus.PersistentExternalBus;
import com.ning.billing.bus.PersistentBus;
import com.ning.billing.lifecycle.LifecycleHandlerType;
import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
@@ -33,10 +31,10 @@ public class DefaultBeatrixService implements BeatrixService {
private final BeatrixListener beatrixListener;
private final PersistentBus eventBus;
- private final ExternalBus externalBus;
+ private final PersistentBus externalBus;
@Inject
- public DefaultBeatrixService(final PersistentBus eventBus, final ExternalBus externalBus, final BeatrixListener beatrixListener) {
+ public DefaultBeatrixService(final PersistentBus eventBus, final PersistentBus externalBus, final BeatrixListener beatrixListener) {
this.eventBus = eventBus;
this.externalBus = externalBus;
this.beatrixListener = beatrixListener;
@@ -68,12 +66,11 @@ public class DefaultBeatrixService implements BeatrixService {
@LifecycleHandlerType(LifecycleLevel.INIT_BUS)
public void startBus() {
- ((PersistentExternalBus) externalBus).startQueue();
+ externalBus.start();
}
@LifecycleHandlerType(LifecycleLevel.STOP_BUS)
public void stopBus() {
- ((PersistentExternalBus) externalBus).stopQueue();
+ externalBus.stop();
}
-
}
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
index bfba1d1..0f76c4f 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
@@ -26,8 +26,8 @@ import org.slf4j.LoggerFactory;
import com.ning.billing.ObjectType;
import com.ning.billing.account.api.Account;
import com.ning.billing.account.api.AccountApiException;
-import com.ning.billing.beatrix.bus.api.ExternalBus;
import com.ning.billing.bus.BusPersistentEvent;
+import com.ning.billing.bus.PersistentBus;
import com.ning.billing.bus.PersistentBus.EventBusException;
import com.ning.billing.entitlement.api.SubscriptionTransitionType;
import com.ning.billing.notification.plugin.api.ExtBusEventType;
@@ -62,7 +62,7 @@ public class BeatrixListener {
private static final Logger log = LoggerFactory.getLogger(BeatrixListener.class);
- private final ExternalBus externalBus;
+ private final PersistentBus externalBus;
private final InternalCallContextFactory internalCallContextFactory;
private final AccountInternalApi accountApi;
@@ -70,7 +70,7 @@ public class BeatrixListener {
protected final ObjectMapper objectMapper;
@Inject
- public BeatrixListener(final ExternalBus externalBus,
+ public BeatrixListener(final PersistentBus externalBus,
final InternalCallContextFactory internalCallContextFactory,
final AccountInternalApi accountApi) {
this.externalBus = externalBus;
@@ -88,7 +88,7 @@ public class BeatrixListener {
try {
final BusPersistentEvent externalEvent = computeExtBusEventEntryFromBusInternalEvent(event, internalContext);
if (externalEvent != null) {
- ((PersistentExternalBus) externalBus).post(externalEvent, internalContext);
+ externalBus.post(externalEvent);
}
} catch (EventBusException e) {
log.warn("Failed to dispatch external bus events", e);
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/glue/BeatrixModule.java b/beatrix/src/main/java/com/ning/billing/beatrix/glue/BeatrixModule.java
index 943f070..4a6b721 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/glue/BeatrixModule.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/glue/BeatrixModule.java
@@ -18,26 +18,40 @@ package com.ning.billing.beatrix.glue;
import com.ning.billing.beatrix.DefaultBeatrixService;
import com.ning.billing.beatrix.bus.api.BeatrixService;
-import com.ning.billing.beatrix.bus.api.ExternalBus;
import com.ning.billing.beatrix.extbus.BeatrixListener;
-import com.ning.billing.beatrix.extbus.PersistentExternalBus;
import com.ning.billing.beatrix.lifecycle.DefaultLifecycle;
import com.ning.billing.beatrix.lifecycle.Lifecycle;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.util.glue.BusProvider;
import com.google.inject.AbstractModule;
+import com.google.inject.Key;
+import com.google.inject.name.Names;
public class BeatrixModule extends AbstractModule {
+ // TODO This has to match with the DLL obviously to work
+ private final static String EXTERNAL_BUS_TABLE_NAME = "bus_ext_events";
+
+ public static final String EXTERNAL_BUS = "externalBus";
+
@Override
protected void configure() {
- bind(Lifecycle.class).to(DefaultLifecycle.class).asEagerSingleton();
+ installLifecycle();
installExternalBus();
}
+ protected void installLifecycle() {
+ bind(Lifecycle.class).to(DefaultLifecycle.class).asEagerSingleton();
+ }
+
protected void installExternalBus() {
bind(BeatrixService.class).to(DefaultBeatrixService.class);
bind(DefaultBeatrixService.class).asEagerSingleton();
- bind(ExternalBus.class).to(PersistentExternalBus.class).asEagerSingleton();
+
+ bind(BusProvider.class).annotatedWith(Names.named(EXTERNAL_BUS)).toInstance(new BusProvider(EXTERNAL_BUS_TABLE_NAME));
+ bind(PersistentBus.class).annotatedWith(Names.named(EXTERNAL_BUS)).toProvider(Key.get(BusProvider.class, Names.named(EXTERNAL_BUS))).asEagerSingleton();
+
bind(BeatrixListener.class).asEagerSingleton();
}
}
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/BeatrixIntegrationModule.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/BeatrixIntegrationModule.java
index c22d3d6..ce967ef 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/BeatrixIntegrationModule.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/BeatrixIntegrationModule.java
@@ -26,9 +26,7 @@ import com.ning.billing.GuicyKillbillTestWithEmbeddedDBModule;
import com.ning.billing.account.api.AccountService;
import com.ning.billing.account.glue.DefaultAccountModule;
import com.ning.billing.beatrix.DefaultBeatrixService;
-import com.ning.billing.beatrix.bus.api.ExternalBus;
-import com.ning.billing.beatrix.extbus.BeatrixListener;
-import com.ning.billing.beatrix.extbus.PersistentExternalBus;
+import com.ning.billing.beatrix.glue.BeatrixModule;
import com.ning.billing.beatrix.integration.overdue.IntegrationTestOverdueModule;
import com.ning.billing.beatrix.lifecycle.DefaultLifecycle;
import com.ning.billing.beatrix.lifecycle.Lifecycle;
@@ -98,8 +96,6 @@ public class BeatrixIntegrationModule extends AbstractModule {
loadSystemPropertiesFromClasspath("/beatrix.properties");
- bind(Lifecycle.class).to(SubsetDefaultLifecycle.class).asEagerSingleton();
-
install(new GuicyKillbillTestWithEmbeddedDBModule());
install(new GlobalLockerModule());
@@ -125,6 +121,7 @@ public class BeatrixIntegrationModule extends AbstractModule {
install(new DefaultOSGIModule(configSource));
install(new NonEntityDaoModule());
install(new RecordIdModule());
+ install(new BeatrixModuleWithSubsetLifecycle());
bind(AccountChecker.class).asEagerSingleton();
bind(EntitlementChecker.class).asEagerSingleton();
@@ -132,15 +129,8 @@ public class BeatrixIntegrationModule extends AbstractModule {
bind(PaymentChecker.class).asEagerSingleton();
bind(RefundChecker.class).asEagerSingleton();
bind(AuditChecker.class).asEagerSingleton();
-
- installPublicBus();
}
- private void installPublicBus() {
- bind(ExternalBus.class).to(PersistentExternalBus.class).asEagerSingleton();
- bind(BeatrixListener.class).asEagerSingleton();
- bind(DefaultBeatrixService.class).asEagerSingleton();
- }
private static final class DefaultInvoiceModuleWithSwitchRepairLogic extends DefaultInvoiceModule {
@@ -200,4 +190,12 @@ public class BeatrixIntegrationModule extends AbstractModule {
return services;
}
}
+
+ private static final class BeatrixModuleWithSubsetLifecycle extends BeatrixModule {
+
+ @Override
+ protected void installLifecycle() {
+ bind(Lifecycle.class).to(SubsetDefaultLifecycle.class).asEagerSingleton();
+ }
+ }
}
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java
index 07998cc..2b0730f 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java
@@ -42,7 +42,6 @@ import com.ning.billing.api.TestApiListener;
import com.ning.billing.api.TestApiListener.NextEvent;
import com.ning.billing.api.TestListenerStatus;
import com.ning.billing.beatrix.BeatrixTestSuiteWithEmbeddedDB;
-import com.ning.billing.beatrix.bus.api.ExternalBus;
import com.ning.billing.beatrix.lifecycle.Lifecycle;
import com.ning.billing.beatrix.osgi.SetupBundleWithAssertion;
import com.ning.billing.beatrix.util.AccountChecker;
@@ -50,6 +49,7 @@ import com.ning.billing.beatrix.util.EntitlementChecker;
import com.ning.billing.beatrix.util.InvoiceChecker;
import com.ning.billing.beatrix.util.PaymentChecker;
import com.ning.billing.beatrix.util.RefundChecker;
+import com.ning.billing.bus.PersistentBus;
import com.ning.billing.catalog.api.BillingPeriod;
import com.ning.billing.catalog.api.Currency;
import com.ning.billing.catalog.api.PlanPhaseSpecifier;
@@ -180,7 +180,7 @@ public class TestIntegrationBase extends BeatrixTestSuiteWithEmbeddedDB implemen
protected AccountChecker accountChecker;
@Inject
- protected ExternalBus externalBus;
+ protected PersistentBus externalBus;
@Inject
protected RefundChecker refundChecker;
diff --git a/osgi/src/main/java/com/ning/billing/osgi/KillbillEventObservable.java b/osgi/src/main/java/com/ning/billing/osgi/KillbillEventObservable.java
index e4ba9c9..45a8e36 100644
--- a/osgi/src/main/java/com/ning/billing/osgi/KillbillEventObservable.java
+++ b/osgi/src/main/java/com/ning/billing/osgi/KillbillEventObservable.java
@@ -23,7 +23,8 @@ import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.ning.billing.beatrix.bus.api.ExternalBus;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBus.EventBusException;
import com.ning.billing.notification.plugin.api.ExtBusEvent;
import com.google.common.eventbus.Subscribe;
@@ -33,18 +34,18 @@ public class KillbillEventObservable extends Observable {
private Logger logger = LoggerFactory.getLogger(KillbillEventObservable.class);
- private final ExternalBus externalBus;
+ private final PersistentBus externalBus;
@Inject
- public KillbillEventObservable(final ExternalBus externalBus) {
+ public KillbillEventObservable(final PersistentBus externalBus) {
this.externalBus = externalBus;
}
- public void register() {
+ public void register() throws EventBusException {
externalBus.register(this);
}
- public void unregister() {
+ public void unregister() throws EventBusException {
deleteObservers();
if (externalBus != null) {
externalBus.unregister(this);
diff --git a/server/src/main/java/com/ning/billing/server/DefaultServerService.java b/server/src/main/java/com/ning/billing/server/DefaultServerService.java
index ad68e2b..56f7040 100644
--- a/server/src/main/java/com/ning/billing/server/DefaultServerService.java
+++ b/server/src/main/java/com/ning/billing/server/DefaultServerService.java
@@ -17,21 +17,27 @@ package com.ning.billing.server;
import javax.inject.Inject;
-import com.ning.billing.beatrix.bus.api.ExternalBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBus.EventBusException;
import com.ning.billing.lifecycle.LifecycleHandlerType;
import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
import com.ning.billing.server.notifications.PushNotificationListener;
public class DefaultServerService implements ServerService {
+ private final static Logger log = LoggerFactory.getLogger(DefaultServerService.class);
+
private final static String SERVER_SERVICE = "server-service";
- private final ExternalBus bus;
+ private final PersistentBus bus;
private final PushNotificationListener pushNotificationListener;
@Inject
- public DefaultServerService(final ExternalBus bus, final PushNotificationListener pushNotificationListener) {
+ public DefaultServerService(final PersistentBus bus, final PushNotificationListener pushNotificationListener) {
this.bus = bus;
this.pushNotificationListener = pushNotificationListener;
}
@@ -43,11 +49,19 @@ public class DefaultServerService implements ServerService {
@LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
public void registerForNotifications() {
- bus.register(pushNotificationListener);
+ try {
+ bus.register(pushNotificationListener);
+ } catch (EventBusException e) {
+ log.warn("Failed to initialize Server service :", e);
+ }
}
@LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
public void unregisterForNotifications() {
- bus.unregister(pushNotificationListener);
+ try {
+ bus.unregister(pushNotificationListener);
+ } catch (EventBusException e) {
+ log.warn("Failed to stop Server service :", e);
+ }
}
}
diff --git a/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java b/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
index 31065db..f1d9da2 100644
--- a/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
+++ b/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
@@ -24,7 +24,6 @@ import javax.servlet.ServletContextEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.ning.billing.beatrix.bus.api.ExternalBus;
import com.ning.billing.beatrix.lifecycle.DefaultLifecycle;
import com.ning.billing.bus.PersistentBus;
import com.ning.billing.jaxrs.resources.JaxRsResourceBase;
@@ -75,7 +74,6 @@ public class KillbillGuiceListener extends SetupServer {
.addJMXExport(KillbillHealthcheck.class)
.addJMXExport(NotificationQueueService.class)
.addJMXExport(PersistentBus.class)
- .addJMXExport(ExternalBus.class)
.addModule(getModule())
// Don't filter all requests through Jersey, only the JAX-RS APIs (otherwise,
// things like static resources, favicon, etc. are 404'ed)
diff --git a/util/src/main/java/com/ning/billing/util/glue/BusModule.java b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
index 34afc53..eafcb5b 100644
--- a/util/src/main/java/com/ning/billing/util/glue/BusModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
@@ -19,8 +19,6 @@ package com.ning.billing.util.glue;
import org.skife.config.ConfigSource;
import org.skife.config.ConfigurationObjectFactory;
-import com.ning.billing.bus.BusTableName;
-import com.ning.billing.bus.DefaultPersistentBus;
import com.ning.billing.bus.InMemoryPersistentBus;
import com.ning.billing.bus.PersistentBus;
import com.ning.billing.bus.PersistentBusConfig;
@@ -28,7 +26,6 @@ import com.ning.billing.util.bus.DefaultBusService;
import com.ning.billing.util.svcsapi.bus.BusService;
import com.google.inject.AbstractModule;
-import com.google.inject.name.Names;
public class BusModule extends AbstractModule {
@@ -78,8 +75,8 @@ public class BusModule extends AbstractModule {
private void configurePersistentEventBus() {
configurePersistentBusConfig();
- bind(String.class).annotatedWith(BusTableName.class).toInstance(tableName);
- bind(PersistentBus.class).to(DefaultPersistentBus.class).asEagerSingleton();
+ bind(BusProvider.class).toInstance(new BusProvider(tableName));
+ bind(PersistentBus.class).toProvider(BusProvider.class).asEagerSingleton();
}
private void configureInMemoryEventBus() {
diff --git a/util/src/main/java/com/ning/billing/util/glue/BusProvider.java b/util/src/main/java/com/ning/billing/util/glue/BusProvider.java
new file mode 100644
index 0000000..99e20dd
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/glue/BusProvider.java
@@ -0,0 +1,37 @@
+package com.ning.billing.util.glue;
+
+import javax.inject.Inject;
+import javax.inject.Provider;
+
+import org.skife.jdbi.v2.IDBI;
+
+import com.ning.billing.bus.DefaultPersistentBus;
+import com.ning.billing.bus.PersistentBus;
+import com.ning.billing.bus.PersistentBusConfig;
+import com.ning.billing.util.clock.Clock;
+
+
+public class BusProvider implements Provider<PersistentBus> {
+
+ private IDBI dbi;
+ private Clock clock;
+ private PersistentBusConfig busConfig;
+ private String tableName;
+
+ public BusProvider(final String tableName) {
+ this.tableName = tableName;
+ }
+
+ @Inject
+ public void initialize(final IDBI dbi, final Clock clock, final PersistentBusConfig config) {
+ this.dbi = dbi;
+ this.clock = clock;
+ this.busConfig = config;
+ }
+
+
+ @Override
+ public PersistentBus get() {
+ return new DefaultPersistentBus(dbi, clock, busConfig, tableName);
+ }
+}