Details
diff --git a/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java b/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java
new file mode 100644
index 0000000..400227b
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.eventbus;
+
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import com.google.common.eventbus.Subscribe;
+
+
+/**
+ *
+ * EventBus API based on the guava EventBus API
+ *
+ * The API also provides an API to send events from within a transaction
+ * with the guarantee that the event will be delivered if and only if
+ * the transaction completes. If the implementation is not based on a
+ * DB, this API is behaves the same as the regular post() call.
+ *
+ */
+public interface IEventBus {
+
+
+ public class EventBusException extends Exception {
+
+ private static final long serialVersionUID = 12355236L;
+
+ public EventBusException() {
+ super();
+ }
+ public EventBusException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public EventBusException(String message) {
+ super(message);
+ }
+ }
+
+ /**
+ * Start accepting events and dispatching them
+ *
+ */
+ public void start();
+
+ /**
+ * Stop accepting events and flush event queue before it returns.
+ *
+ */
+ public void stop();
+
+ /**
+ *
+ * Registers all handler methods on {@code object} to receive events.
+ * Handler methods need to be Annotated with {@link Subscribe}
+ *
+ * @param handlerInstance
+ *
+ * @throws EventBusException if bus not been started yet
+ */
+ public void register(IEventBusType handlerInstance) throws EventBusException;
+
+
+ /**
+ * Post an event asynchronously
+ *
+ * @param event to be posted
+ *
+ * @throws EventBusException if bus not been started yet
+ */
+ public void post(IEventBusType event) throws EventBusException;
+
+ /**
+ *
+ * Post an event from within a transaction.
+ * Guarantees that the event is persisted on disk from within the same transaction
+ *
+ *
+ * @param event to be posted
+ * @param dao a valid DAO object obtained through the DBI.onDeamand() API.
+ *
+ * @throws EventBusException if bus not been started yet
+ */
+ public void postFromTransaction(IEventBusType event, Transmogrifier dao) throws EventBusException;
+
+
+}
diff --git a/api/src/main/java/com/ning/billing/util/eventbus/IEventBusService.java b/api/src/main/java/com/ning/billing/util/eventbus/IEventBusService.java
new file mode 100644
index 0000000..71fa8a3
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/util/eventbus/IEventBusService.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.eventbus;
+
+import com.ning.billing.lifecycle.IService;
+
+public interface IEventBusService extends IService {
+
+ public IEventBus getEventBus();
+}
diff --git a/api/src/main/java/com/ning/billing/util/eventbus/IEventBusType.java b/api/src/main/java/com/ning/billing/util/eventbus/IEventBusType.java
new file mode 100644
index 0000000..719e2d4
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/util/eventbus/IEventBusType.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.eventbus;
+
+public interface IEventBusType {
+
+}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/billing/EntitlementBillingApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/billing/EntitlementBillingApi.java
new file mode 100644
index 0000000..d3e3c6e
--- /dev/null
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/billing/EntitlementBillingApi.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.entitlement.api.billing;
+
+import java.util.List;
+import java.util.SortedSet;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+import com.google.inject.Inject;
+import com.ning.billing.account.api.IAccount;
+import com.ning.billing.catalog.api.ICatalog;
+import com.ning.billing.entitlement.api.user.ISubscription;
+import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.Subscription.SubscriptionBuilder;
+import com.ning.billing.entitlement.engine.core.Engine;
+import com.ning.billing.entitlement.engine.dao.IEntitlementDao;
+import com.ning.billing.util.clock.IClock;
+
+public class EntitlementBillingApi implements IEntitlementBillingApi {
+
+ private final IClock clock;
+ private final IEntitlementDao dao;
+
+ @Inject
+ public EntitlementBillingApi(IClock clock, IEntitlementDao dao) {
+ super();
+ this.clock = clock;
+ this.dao = dao;
+ }
+
+ @Override
+ public List<IAccount> getActiveAccounts() {
+ return null;
+ }
+
+ @Override
+ public SortedSet<IBillingEvent> getBillingEventsForSubscription(
+ UUID subscriptionId) {
+ return null;
+ }
+
+ @Override
+ public void setChargedThroughDate(UUID subscriptionId, DateTime ctd) {
+ Subscription subscription = (Subscription) dao.getSubscriptionFromId(subscriptionId);
+ if (subscription == null) {
+ new EntitlementBillingApiException(String.format("Unknwon subscription %s", subscriptionId));
+ }
+
+ Subscription updatedSubscription = new SubscriptionBuilder()
+ .setId(subscription.getId())
+ .setBundleId(subscription.getBundleId())
+ .setStartDate(subscription.getStartDate())
+ .setBundleStartDate(subscription.getBundleStartDate())
+ .setChargedThroughDate(ctd)
+ .setPaidThroughDate(subscription.getPaidThroughDate())
+ .setActiveVersion(subscription.getActiveVersion())
+ .setCategory(subscription.getCategory())
+ .build();
+
+ dao.updateSubscription(updatedSubscription);
+ }
+
+}
diff --git a/util/src/main/java/com/ning/billing/util/eventbus/EventBusService.java b/util/src/main/java/com/ning/billing/util/eventbus/EventBusService.java
new file mode 100644
index 0000000..53c07c9
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/eventbus/EventBusService.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.eventbus;
+
+import com.google.inject.Inject;
+import com.ning.billing.lifecycle.LyfecycleHandlerType;
+import com.ning.billing.lifecycle.LyfecycleHandlerType.LyfecycleLevel;
+
+public class EventBusService implements IEventBusService {
+
+ private final static String EVENT_BUS_SERVICE = "eventbus-service";
+
+ private final IEventBus eventBus;
+
+ @Inject
+ public EventBusService(IEventBus eventBus) {
+ this.eventBus = eventBus;
+ }
+
+ @Override
+ public String getName() {
+ return EVENT_BUS_SERVICE;
+ }
+
+ @LyfecycleHandlerType(LyfecycleLevel.INIT_BUS)
+ public void startBus() {
+ eventBus.start();
+ }
+
+ @LyfecycleHandlerType(LyfecycleLevel.STOP_BUS)
+ public void stopBus() {
+ eventBus.stop();
+ }
+
+ @Override
+ public IEventBus getEventBus() {
+ return eventBus;
+ }
+
+}
diff --git a/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
new file mode 100644
index 0000000..0c346a1
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.eventbus;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.AsyncEventBus;
+
+public class MemoryEventBus implements IEventBus {
+
+ private final static String EVENT_BUS_IDENTIFIER = "eventbus-service";
+ private final static String EVENT_BUS_GROUP_NAME = "eventbus-grp";
+ private final static String EVENT_BUS_TH_NAME = "eventbus-th";
+
+ private static final Logger log = LoggerFactory.getLogger(MemoryEventBus.class);
+
+ private final EventBusDelegate delegate;
+ private final AtomicBoolean isInitialized;
+
+ public class EventBusDelegate extends AsyncEventBus {
+
+ private final Executor executor;
+ private final ThreadGroup grp;
+
+ public EventBusDelegate(String name, ThreadGroup grp, Executor executor) {
+ super(name, executor);
+ this.executor = executor;
+ this.grp = grp;
+ }
+
+ public void completeDispatch() {
+ dispatchQueuedEvents();
+ }
+
+ public void stop() {
+ // STEPH TBD
+ }
+ }
+
+ public MemoryEventBus() {
+
+ final ThreadGroup group = new ThreadGroup(EVENT_BUS_GROUP_NAME);
+ Executor executor = Executors.newCachedThreadPool(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(group, r, EVENT_BUS_TH_NAME);
+ }
+ });
+
+ this.delegate = new EventBusDelegate(EVENT_BUS_IDENTIFIER, group, executor);
+ this.isInitialized = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void register(IEventBusType handlerInstance) throws EventBusException {
+ checkInitialized("register");
+ delegate.register(handlerInstance);
+ }
+
+ @Override
+ public void post(IEventBusType event) throws EventBusException {
+ checkInitialized("post");
+ delegate.post(event);
+ }
+
+ @Override
+ public void postFromTransaction(IEventBusType event, Transmogrifier dao) throws EventBusException {
+ checkInitialized("postFromTransaction");
+ delegate.post(event);
+ }
+
+ @Override
+ public void start() {
+ if (isInitialized.compareAndSet(false, true)) {
+ log.info("MemoryEventBus started...");
+ }
+ }
+
+
+ private void checkInitialized(String operation) throws EventBusException {
+ if (!isInitialized.get()) {
+ throw new EventBusException(String.format("Attempting operation %s on an non initialized eventbus",
+ operation));
+ }
+ }
+ @Override
+ public void stop() {
+ if (isInitialized.compareAndSet(true, false)) {
+ log.info("MemoryEventBus stopping...");
+ delegate.completeDispatch();
+ log.info("MemoryEventBus completed dispatching events...");
+ delegate.stop();
+ log.info("MemoryEventBus stoped...");
+ }
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java b/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
new file mode 100644
index 0000000..e6e995a
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.glue;
+
+import com.google.inject.AbstractModule;
+import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.IEventBus;
+import com.ning.billing.util.eventbus.IEventBusService;
+import com.ning.billing.util.eventbus.MemoryEventBus;
+
+public class EventBusModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(IEventBusService.class).to(EventBusService.class);
+ bind(IEventBus.class).to(MemoryEventBus.class).asEagerSingleton();
+
+ }
+
+}
diff --git a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
new file mode 100644
index 0000000..c69014a
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.eventbus;
+
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+
+public class TestEventBus {
+
+ private EventBus bus;
+ private AsyncEventBus asyncBus;
+
+
+ @BeforeClass
+ public void setup() {
+
+ }
+
+
+ @Test()
+ public void test() {
+
+ }
+}