killbill-memoizeit

Introduing EventBus into killbill

11/11/2011 6:55:47 PM

Details

diff --git a/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java b/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java
new file mode 100644
index 0000000..400227b
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/util/eventbus/IEventBus.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.eventbus;
+
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import com.google.common.eventbus.Subscribe;
+
+
+/**
+ *
+ * EventBus API based on the guava EventBus API
+ *
+ * The API also provides an API to send events from within a transaction
+ * with the guarantee that the event will be delivered if and only if
+ * the transaction completes. If the implementation is not based on a
+ * DB, this API is behaves the same as the regular post() call.
+ *
+ */
+public interface IEventBus {
+
+
+    public class EventBusException extends Exception {
+
+        private static final long serialVersionUID = 12355236L;
+
+        public EventBusException() {
+            super();
+        }
+        public EventBusException(String message, Throwable cause) {
+            super(message, cause);
+        }
+
+        public EventBusException(String message) {
+            super(message);
+        }
+    }
+
+    /**
+     * Start accepting events and dispatching them
+     *
+     */
+    public void start();
+
+    /**
+     * Stop accepting events and flush event queue before it returns.
+     *
+     */
+    public void stop();
+
+    /**
+     *
+     * Registers all handler methods on {@code object} to receive events.
+     * Handler methods need to be Annotated with {@link Subscribe}
+     *
+     * @param handlerInstance
+     *
+     *  @throws EventBusException if bus not been started yet
+     */
+    public void register(IEventBusType handlerInstance) throws EventBusException;
+
+
+    /**
+     * Post an event asynchronously
+     *
+     * @param event to be posted
+     *
+     *  @throws EventBusException if bus not been started yet
+     */
+    public void post(IEventBusType event) throws EventBusException;
+
+    /**
+     *
+     * Post an event from within a transaction.
+     * Guarantees that the event is persisted on disk from within the same transaction
+     *
+     *
+     * @param event to be posted
+     * @param dao a valid DAO object obtained through the DBI.onDeamand() API.
+     *
+     *  @throws EventBusException if bus not been started yet
+     */
+    public void postFromTransaction(IEventBusType event, Transmogrifier dao) throws EventBusException;
+
+
+}
diff --git a/api/src/main/java/com/ning/billing/util/eventbus/IEventBusService.java b/api/src/main/java/com/ning/billing/util/eventbus/IEventBusService.java
new file mode 100644
index 0000000..71fa8a3
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/util/eventbus/IEventBusService.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.eventbus;
+
+import com.ning.billing.lifecycle.IService;
+
+public interface IEventBusService extends IService {
+
+    public IEventBus getEventBus();
+}
diff --git a/api/src/main/java/com/ning/billing/util/eventbus/IEventBusType.java b/api/src/main/java/com/ning/billing/util/eventbus/IEventBusType.java
new file mode 100644
index 0000000..719e2d4
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/util/eventbus/IEventBusType.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.eventbus;
+
+public interface IEventBusType {
+
+}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/billing/EntitlementBillingApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/billing/EntitlementBillingApi.java
new file mode 100644
index 0000000..d3e3c6e
--- /dev/null
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/billing/EntitlementBillingApi.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.entitlement.api.billing;
+
+import java.util.List;
+import java.util.SortedSet;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+import com.google.inject.Inject;
+import com.ning.billing.account.api.IAccount;
+import com.ning.billing.catalog.api.ICatalog;
+import com.ning.billing.entitlement.api.user.ISubscription;
+import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.Subscription.SubscriptionBuilder;
+import com.ning.billing.entitlement.engine.core.Engine;
+import com.ning.billing.entitlement.engine.dao.IEntitlementDao;
+import com.ning.billing.util.clock.IClock;
+
+public class EntitlementBillingApi implements IEntitlementBillingApi {
+
+    private final IClock clock;
+    private final IEntitlementDao dao;
+
+    @Inject
+    public EntitlementBillingApi(IClock clock, IEntitlementDao dao) {
+        super();
+        this.clock = clock;
+        this.dao = dao;
+    }
+
+    @Override
+    public List<IAccount> getActiveAccounts() {
+        return null;
+    }
+
+    @Override
+    public SortedSet<IBillingEvent> getBillingEventsForSubscription(
+            UUID subscriptionId) {
+        return null;
+    }
+
+    @Override
+    public void setChargedThroughDate(UUID subscriptionId, DateTime ctd) {
+        Subscription subscription = (Subscription) dao.getSubscriptionFromId(subscriptionId);
+        if (subscription == null) {
+            new EntitlementBillingApiException(String.format("Unknwon subscription %s", subscriptionId));
+        }
+
+        Subscription updatedSubscription = new SubscriptionBuilder()
+            .setId(subscription.getId())
+            .setBundleId(subscription.getBundleId())
+            .setStartDate(subscription.getStartDate())
+            .setBundleStartDate(subscription.getBundleStartDate())
+            .setChargedThroughDate(ctd)
+            .setPaidThroughDate(subscription.getPaidThroughDate())
+            .setActiveVersion(subscription.getActiveVersion())
+            .setCategory(subscription.getCategory())
+            .build();
+
+        dao.updateSubscription(updatedSubscription);
+    }
+
+}
diff --git a/util/src/main/java/com/ning/billing/util/eventbus/EventBusService.java b/util/src/main/java/com/ning/billing/util/eventbus/EventBusService.java
new file mode 100644
index 0000000..53c07c9
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/eventbus/EventBusService.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.eventbus;
+
+import com.google.inject.Inject;
+import com.ning.billing.lifecycle.LyfecycleHandlerType;
+import com.ning.billing.lifecycle.LyfecycleHandlerType.LyfecycleLevel;
+
+public class EventBusService implements IEventBusService {
+
+    private final static String EVENT_BUS_SERVICE = "eventbus-service";
+
+    private final IEventBus eventBus;
+
+    @Inject
+    public EventBusService(IEventBus eventBus) {
+        this.eventBus = eventBus;
+    }
+
+    @Override
+    public String getName() {
+        return EVENT_BUS_SERVICE;
+    }
+
+    @LyfecycleHandlerType(LyfecycleLevel.INIT_BUS)
+    public void startBus() {
+        eventBus.start();
+    }
+
+    @LyfecycleHandlerType(LyfecycleLevel.STOP_BUS)
+    public void stopBus() {
+        eventBus.stop();
+    }
+
+    @Override
+    public IEventBus getEventBus() {
+        return eventBus;
+    }
+
+}
diff --git a/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
new file mode 100644
index 0000000..0c346a1
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.eventbus;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.AsyncEventBus;
+
+public class MemoryEventBus implements IEventBus {
+
+    private final static String EVENT_BUS_IDENTIFIER = "eventbus-service";
+    private final static String EVENT_BUS_GROUP_NAME = "eventbus-grp";
+    private final static String EVENT_BUS_TH_NAME = "eventbus-th";
+
+    private static final Logger log = LoggerFactory.getLogger(MemoryEventBus.class);
+
+    private final EventBusDelegate delegate;
+    private final AtomicBoolean isInitialized;
+
+    public class EventBusDelegate extends AsyncEventBus {
+
+        private final Executor executor;
+        private final ThreadGroup grp;
+
+        public EventBusDelegate(String name, ThreadGroup grp, Executor executor) {
+            super(name, executor);
+            this.executor = executor;
+            this.grp = grp;
+        }
+
+        public void completeDispatch() {
+            dispatchQueuedEvents();
+        }
+
+        public void stop() {
+            // STEPH TBD
+        }
+    }
+
+    public MemoryEventBus() {
+
+        final ThreadGroup group = new ThreadGroup(EVENT_BUS_GROUP_NAME);
+        Executor executor = Executors.newCachedThreadPool(new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(group, r, EVENT_BUS_TH_NAME);
+            }
+        });
+
+        this.delegate = new EventBusDelegate(EVENT_BUS_IDENTIFIER, group, executor);
+        this.isInitialized = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void register(IEventBusType handlerInstance) throws EventBusException {
+        checkInitialized("register");
+        delegate.register(handlerInstance);
+    }
+
+    @Override
+    public void post(IEventBusType event) throws EventBusException {
+        checkInitialized("post");
+        delegate.post(event);
+    }
+
+    @Override
+    public void postFromTransaction(IEventBusType event, Transmogrifier dao) throws EventBusException {
+        checkInitialized("postFromTransaction");
+        delegate.post(event);
+    }
+
+    @Override
+    public void start() {
+        if (isInitialized.compareAndSet(false, true)) {
+            log.info("MemoryEventBus started...");
+        }
+    }
+
+
+    private void checkInitialized(String operation) throws EventBusException {
+        if (!isInitialized.get()) {
+            throw new EventBusException(String.format("Attempting operation %s on an non initialized eventbus",
+                    operation));
+        }
+    }
+    @Override
+    public void stop() {
+        if (isInitialized.compareAndSet(true, false)) {
+            log.info("MemoryEventBus stopping...");
+            delegate.completeDispatch();
+            log.info("MemoryEventBus completed dispatching events...");
+            delegate.stop();
+            log.info("MemoryEventBus stoped...");
+        }
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java b/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
new file mode 100644
index 0000000..e6e995a
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.glue;
+
+import com.google.inject.AbstractModule;
+import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.IEventBus;
+import com.ning.billing.util.eventbus.IEventBusService;
+import com.ning.billing.util.eventbus.MemoryEventBus;
+
+public class EventBusModule extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        bind(IEventBusService.class).to(EventBusService.class);
+        bind(IEventBus.class).to(MemoryEventBus.class).asEagerSingleton();
+
+    }
+
+}
diff --git a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
new file mode 100644
index 0000000..c69014a
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.eventbus;
+
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+
+public class TestEventBus {
+
+    private EventBus bus;
+    private AsyncEventBus asyncBus;
+
+
+    @BeforeClass
+    public void setup() {
+
+    }
+
+
+    @Test()
+    public void test() {
+
+    }
+}