killbill-uncached

Changes

Details

diff --git a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
index c4671ea..587a72a 100644
--- a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
@@ -30,16 +30,16 @@ import com.ning.billing.account.api.user.DefaultAccountChangeNotification;
 import com.ning.billing.account.api.user.DefaultAccountCreationEvent;
 import com.ning.billing.util.customfield.CustomField;
 import com.ning.billing.util.customfield.dao.FieldStoreDao;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
 import com.ning.billing.util.tag.Tag;
 import com.ning.billing.util.tag.dao.TagStoreDao;
 
 public class DefaultAccountDao implements AccountDao {
     private final AccountSqlDao accountDao;
-    private final EventBus eventBus;
+    private final Bus eventBus;
 
     @Inject
-    public DefaultAccountDao(IDBI dbi, EventBus eventBus) {
+    public DefaultAccountDao(IDBI dbi, Bus eventBus) {
         this.eventBus = eventBus;
         this.accountDao = dbi.onDemand(AccountSqlDao.class);
     }
diff --git a/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java b/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
index b18a41b..d205439 100644
--- a/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
+++ b/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
@@ -21,7 +21,7 @@ import com.google.inject.Injector;
 import com.google.inject.Stage;
 import com.ning.billing.account.glue.AccountModuleMock;
 import com.ning.billing.util.eventbus.DefaultEventBusService;
-import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.BusService;
 import org.apache.commons.io.IOUtils;
 import org.skife.jdbi.v2.IDBI;
 import org.testng.annotations.AfterClass;
@@ -56,7 +56,7 @@ public abstract class AccountDaoTestBase {
             accountDao = injector.getInstance(AccountDao.class);
             accountDao.test();
 
-            EventBusService busService = injector.getInstance(EventBusService.class);
+            BusService busService = injector.getInstance(BusService.class);
             ((DefaultEventBusService) busService).startBus();
         }
         catch (Throwable t) {
diff --git a/analytics/src/main/java/com/ning/billing/analytics/api/AnalyticsService.java b/analytics/src/main/java/com/ning/billing/analytics/api/AnalyticsService.java
index a08e3ab..e537841 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/api/AnalyticsService.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/api/AnalyticsService.java
@@ -19,7 +19,7 @@ package com.ning.billing.analytics.api;
 import com.google.inject.Inject;
 import com.ning.billing.analytics.AnalyticsListener;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,10 +30,10 @@ public class AnalyticsService implements IAnalyticsService
     private static final String ANALYTICS_SERVICE = "analytics-service";
 
     private final AnalyticsListener listener;
-    private final EventBus eventBus;
+    private final Bus eventBus;
 
     @Inject
-    public AnalyticsService(final AnalyticsListener listener, final EventBus eventBus)
+    public AnalyticsService(final AnalyticsListener listener, final Bus eventBus)
     {
         this.listener = listener;
         this.eventBus = eventBus;
@@ -51,7 +51,7 @@ public class AnalyticsService implements IAnalyticsService
         try {
             eventBus.register(listener);
         }
-        catch (EventBus.EventBusException e) {
+        catch (Bus.EventBusException e) {
             log.error("Unable to register to the EventBus!", e);
         }
     }
diff --git a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
index c146de7..2feffda 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
@@ -47,7 +47,7 @@ import com.ning.billing.entitlement.api.user.SubscriptionTransition;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.user.ApiEventType;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
 import com.ning.billing.util.tag.DefaultTag;
 import com.ning.billing.util.tag.DefaultTagDescription;
 import com.ning.billing.util.tag.Tag;
@@ -90,7 +90,7 @@ public class TestAnalyticsService
     private AnalyticsService service;
 
     @Inject
-    private EventBus bus;
+    private Bus bus;
 
     @Inject
     private BusinessSubscriptionTransitionDao subscriptionDao;
diff --git a/api/src/main/java/com/ning/billing/account/api/AccountChangeNotification.java b/api/src/main/java/com/ning/billing/account/api/AccountChangeNotification.java
index 9e5f254..2bc40f8 100644
--- a/api/src/main/java/com/ning/billing/account/api/AccountChangeNotification.java
+++ b/api/src/main/java/com/ning/billing/account/api/AccountChangeNotification.java
@@ -16,12 +16,12 @@
 
 package com.ning.billing.account.api;
 
-import com.ning.billing.util.eventbus.EventBusNotification;
+import com.ning.billing.util.eventbus.BusEvent;
 
 import java.util.List;
 import java.util.UUID;
 
-public interface AccountChangeNotification extends EventBusNotification {
+public interface AccountChangeNotification extends BusEvent {
     public UUID getAccountId();
 
     public List<ChangedField> getChangedFields();
diff --git a/api/src/main/java/com/ning/billing/account/api/AccountCreationNotification.java b/api/src/main/java/com/ning/billing/account/api/AccountCreationNotification.java
index bc2c065..22a1752 100644
--- a/api/src/main/java/com/ning/billing/account/api/AccountCreationNotification.java
+++ b/api/src/main/java/com/ning/billing/account/api/AccountCreationNotification.java
@@ -16,11 +16,11 @@
 
 package com.ning.billing.account.api;
 
-import com.ning.billing.util.eventbus.EventBusNotification;
+import com.ning.billing.util.eventbus.BusEvent;
 
 import java.util.UUID;
 
-public interface AccountCreationNotification extends EventBusNotification {
+public interface AccountCreationNotification extends BusEvent {
     public UUID getId();
 
     public AccountData getData();
diff --git a/api/src/main/java/com/ning/billing/config/InvoiceConfig.java b/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
new file mode 100644
index 0000000..a2b4270
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.config;
+
+import org.skife.config.Config;
+import org.skife.config.Default;
+
+public interface InvoiceConfig {
+
+    @Config("killbill.invoice.dao.claim.time")
+    @Default("60000")
+    public long getDaoClaimTimeMs();
+
+    @Config("killbill.invoice.dao.ready.max")
+    @Default("10")
+    public int getDaoMaxReadyEvents();
+
+    @Config("killbill.invoice.engine.notifications.sleep")
+    @Default("500")
+    public long getNotificationSleepTimeMs();
+
+    @Config("killbill.invoice.engine.events.off")
+    @Default("false")
+    public boolean isEventProcessingOff();
+}
diff --git a/api/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionTransition.java b/api/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionTransition.java
index 26ce81f..449c368 100644
--- a/api/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionTransition.java
+++ b/api/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionTransition.java
@@ -19,12 +19,12 @@ package com.ning.billing.entitlement.api.user;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.PlanPhase;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
-import com.ning.billing.util.eventbus.EventBusNotification;
+import com.ning.billing.util.eventbus.BusEvent;
 import org.joda.time.DateTime;
 
 import java.util.UUID;
 
-public interface SubscriptionTransition extends EventBusNotification {
+public interface SubscriptionTransition extends BusEvent {
 
     public enum SubscriptionTransitionType {
         MIGRATE_ENTITLEMENT,
diff --git a/api/src/main/java/com/ning/billing/invoice/api/InvoiceCreationNotification.java b/api/src/main/java/com/ning/billing/invoice/api/InvoiceCreationNotification.java
index 89c0d87..7903647 100644
--- a/api/src/main/java/com/ning/billing/invoice/api/InvoiceCreationNotification.java
+++ b/api/src/main/java/com/ning/billing/invoice/api/InvoiceCreationNotification.java
@@ -17,13 +17,13 @@
 package com.ning.billing.invoice.api;
 
 import com.ning.billing.catalog.api.Currency;
-import com.ning.billing.util.eventbus.EventBusNotification;
+import com.ning.billing.util.eventbus.BusEvent;
 import org.joda.time.DateTime;
 
 import java.math.BigDecimal;
 import java.util.UUID;
 
-public interface InvoiceCreationNotification extends EventBusNotification {
+public interface InvoiceCreationNotification extends BusEvent {
     public UUID getInvoiceId();
     public UUID getAccountId();
     public BigDecimal getAmountOwed();
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 7bd1124..86fd78b 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -18,7 +18,6 @@ package com.ning.billing.entitlement.engine.core;
 
 import java.util.UUID;
 
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,8 +45,8 @@ import com.ning.billing.entitlement.exceptions.EntitlementError;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
 import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
 import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.eventbus.EventBus;
-import com.ning.billing.util.eventbus.EventBus.EventBusException;
+import com.ning.billing.util.eventbus.Bus;
+import com.ning.billing.util.eventbus.Bus.EventBusException;
 import com.ning.billing.util.notificationq.NotificationConfig;
 import com.ning.billing.util.notificationq.NotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService;
@@ -59,10 +58,6 @@ public class Engine implements EventListener, EntitlementService {
     public static final String NOTIFICATION_QUEUE_NAME = "subscription-events";
     public static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
 
-    private final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
-    private final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
-    private final long NANO_TO_MS = (1000 * 1000);
-
     private final static Logger log = LoggerFactory.getLogger(Engine.class);
 
     private final Clock clock;
@@ -72,19 +67,17 @@ public class Engine implements EventListener, EntitlementService {
     private final EntitlementBillingApi billingApi;
     private final EntitlementTestApi testApi;
     private final EntitlementMigrationApi migrationApi;
-    private final EventBus eventBus;
+    private final Bus eventBus;
     private final EntitlementConfig config;
     private final NotificationQueueService notificationQueueService;
 
-    private boolean startedNotificationThread;
-    private boolean stoppedNotificationThread;
     private NotificationQueue subscritionEventQueue;
 
     @Inject
     public Engine(Clock clock, EntitlementDao dao, PlanAligner planAligner,
             EntitlementConfig config, DefaultEntitlementUserApi userApi,
             DefaultEntitlementBillingApi billingApi, DefaultEntitlementTestApi testApi,
-            DefaultEntitlementMigrationApi migrationApi, EventBus eventBus,
+            DefaultEntitlementMigrationApi migrationApi, Bus eventBus,
             NotificationQueueService notificationQueueService) {
         super();
         this.clock = clock;
@@ -108,8 +101,6 @@ public class Engine implements EventListener, EntitlementService {
     public void initialize() {
 
         try {
-            this.stoppedNotificationThread = false;
-            this.startedNotificationThread = false;
             subscritionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
                     NOTIFICATION_QUEUE_NAME,
                     new NotificationQueueHandler() {
@@ -122,21 +113,6 @@ public class Engine implements EventListener, EntitlementService {
                         processEventReady(event);
                     }
                 }
-
-                @Override
-                public void completedQueueStop() {
-                    synchronized (this) {
-                        stoppedNotificationThread = true;
-                        this.notifyAll();
-                    }
-                }
-                @Override
-                public void completedQueueStart() {
-                    synchronized (this) {
-                        startedNotificationThread = true;
-                        this.notifyAll();
-                    }
-                }
             },
             new NotificationConfig() {
                 @Override
@@ -164,16 +140,13 @@ public class Engine implements EventListener, EntitlementService {
     @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
     public void start() {
         subscritionEventQueue.startQueue();
-        waitForNotificationStartCompletion();
     }
 
     @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
     public void stop() {
         if (subscritionEventQueue != null) {
             subscritionEventQueue.stopQueue();
-            waitForNotificationStopCompletion();
-        }
-        startedNotificationThread = false;
+         }
     }
 
     @Override
@@ -218,43 +191,6 @@ public class Engine implements EventListener, EntitlementService {
         }
     }
 
-    private void waitForNotificationStartCompletion() {
-        waitForNotificationEventCompletion(true);
-    }
-
-    private void waitForNotificationStopCompletion() {
-        waitForNotificationEventCompletion(false);
-    }
-
-    private void waitForNotificationEventCompletion(boolean startEvent) {
-
-        long ini = System.nanoTime();
-        synchronized(this) {
-            do {
-                if ((startEvent ? startedNotificationThread : stoppedNotificationThread)) {
-                    break;
-                }
-                try {
-                    this.wait(NOTIFICATION_THREAD_WAIT_INCREMENT_MS);
-                } catch (InterruptedException e ) {
-                    Thread.currentThread().interrupt();
-                    throw new EntitlementError(e);
-                }
-            } while (!(startEvent ? startedNotificationThread : stoppedNotificationThread) &&
-                    (System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
-
-            if (!(startEvent ? startedNotificationThread : stoppedNotificationThread)) {
-                log.error("Could not {} notification thread in {} msec !!!",
-                        (startEvent ? "start" : "stop"),
-                        MAX_NOTIFICATION_THREAD_WAIT_MS);
-                throw new EntitlementError("Failed to start service!!");
-            }
-            log.info("Notification thread has been {} in {} ms",
-                    (startEvent ? "started" : "stopped"),
-                    (System.nanoTime() - ini) / NANO_TO_MS);
-        }
-    }
-
     private void insertNextPhaseEvent(SubscriptionData subscription) {
         try {
             DateTime now = clock.getUTCNow();
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/ApiTestListener.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/ApiTestListener.java
index 6bee471..7241611 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/ApiTestListener.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/ApiTestListener.java
@@ -19,7 +19,7 @@ package com.ning.billing.entitlement.api;
 import com.google.common.base.Joiner;
 import com.google.common.eventbus.Subscribe;
 import com.ning.billing.entitlement.api.user.SubscriptionTransition;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +45,7 @@ public class ApiTestListener {
         PHASE
     }
 
-    public ApiTestListener(EventBus eventBus) {
+    public ApiTestListener(Bus eventBus) {
         this.nextExpectedEvent = new Stack<NextEvent>();
         this.completed = false;
     }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index 9a9d80b..adcf53f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -62,7 +62,7 @@ import com.ning.billing.lifecycle.KillbillService.ServiceException;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
 import com.ning.billing.util.eventbus.DefaultEventBusService;
-import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.BusService;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -86,7 +86,7 @@ public abstract class TestApiBase {
     protected EntitlementConfig config;
     protected EntitlementDao dao;
     protected ClockMock clock;
-    protected EventBusService busService;
+    protected BusService busService;
 
     protected AccountData accountData;
     protected Catalog catalog;
@@ -108,7 +108,7 @@ public abstract class TestApiBase {
     @AfterClass(groups={"setup"})
     public void tearDown() {
         try {
-            busService.getEventBus().register(testListener);
+            busService.getBus().register(testListener);
             ((DefaultEventBusService) busService).stopBus();
         } catch (Exception e) {
             log.warn("Failed to tearDown test properly ", e);
@@ -124,7 +124,7 @@ public abstract class TestApiBase {
 
         entitlementService = g.getInstance(EntitlementService.class);
         catalogService = g.getInstance(CatalogService.class);
-        busService = g.getInstance(EventBusService.class);
+        busService = g.getInstance(BusService.class);
         config = g.getInstance(EntitlementConfig.class);
         dao = g.getInstance(EntitlementDao.class);
         clock = (ClockMock) g.getInstance(Clock.class);
@@ -151,7 +151,7 @@ public abstract class TestApiBase {
         assertNotNull(catalog);
 
 
-        testListener = new ApiTestListener(busService.getEventBus());
+        testListener = new ApiTestListener(busService.getBus());
         entitlementApi = entitlementService.getUserApi();
         billingApi = entitlementService.getBillingApi();
         migrationApi = entitlementService.getMigrationApi();
@@ -169,7 +169,7 @@ public abstract class TestApiBase {
         clock.resetDeltaFromReality();
         ((MockEntitlementDao) dao).reset();
         try {
-            busService.getEventBus().register(testListener);
+            busService.getBus().register(testListener);
             UUID accountId = UUID.randomUUID();
             bundle = entitlementApi.createBundleForAccount(accountId, "myDefaultBundle");
         } catch (Exception e) {
diff --git a/entitlement/src/test/resources/entitlement.properties b/entitlement/src/test/resources/entitlement.properties
index d149d78..227aa7e 100644
--- a/entitlement/src/test/resources/entitlement.properties
+++ b/entitlement/src/test/resources/entitlement.properties
@@ -1,4 +1,4 @@
-killbill.catalog.uri=file:src/test/resources/testInput.xml
+killbill.catalog.uri=file:src/test/resources/versionedCatalog
 killbill.entitlement.dao.claim.time=60000
 killbill.entitlement.dao.ready.max=1
 killbill.entitlement.engine.notifications.sleep=500
diff --git a/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-1.xml b/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-1.xml
new file mode 100644
index 0000000..c21aac1
--- /dev/null
+++ b/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-1.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+  ~ Copyright 2010-2011 Ning, Inc.
+  ~
+  ~ Ning licenses this file to you under the Apache License, version 2.0
+  ~ (the "License"); you may not use this file except in compliance with the
+  ~ License.  You may obtain a copy of the License at:
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+  ~ License for the specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<catalog xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:noNamespaceSchemaLocation="../CatalogSchema.xsd ">
+
+	<effectiveDate>2011-01-01T00:00:00+00:00</effectiveDate>
+	<catalogName>WeaponsHireSmall</catalogName>
+
+	<currencies>
+		<currency>USD</currency>
+		<currency>EUR</currency>
+		<currency>GBP</currency>
+	</currencies>
+
+	<products>
+		<product name="Pistol">
+			<category>BASE</category>
+		</product>
+		<product name="Shotgun">
+			<category>BASE</category>
+		</product>
+		<product name="Laser-Scope">
+			<category>ADD_ON</category>
+		</product>
+	</products>
+	 
+	<rules>
+		<changePolicy>
+			<changePolicyCase> 
+				<fromBillingPeriod>MONTHLY</fromBillingPeriod>
+				<toProduct>Shotgun</toProduct>
+				<toBillingPeriod>MONTHLY</toBillingPeriod>
+				<policy>END_OF_TERM</policy>
+			</changePolicyCase>
+			<changePolicyCase> 
+				<phaseType>TRIAL</phaseType>
+				<policy>IMMEDIATE</policy>
+			</changePolicyCase>	
+		</changePolicy>
+		<changeAlignment>
+			<changeAlignmentCase>
+				<alignment>START_OF_SUBSCRIPTION</alignment>
+			</changeAlignmentCase>
+		</changeAlignment>
+		 <createAlignment>
+            <createAlignmentCase>
+                <product>Laser-Scope</product>
+                <alignment>START_OF_SUBSCRIPTION</alignment>
+            </createAlignmentCase>
+            <createAlignmentCase>
+                <alignment>START_OF_BUNDLE</alignment>
+            </createAlignmentCase>
+        </createAlignment>
+	</rules>
+
+
+	<plans>
+		<plan name="pistol-monthly">
+			<product>Pistol</product>
+			<initialPhases>
+				<phase type="TRIAL">
+					<duration>
+						<unit>DAYS</unit>
+						<number>30</number>
+					</duration>
+					<billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+					<fixedPrice> <!-- empty price implies $0 -->
+					</fixedPrice>
+				</phase>
+			</initialPhases>
+			<finalPhase type="EVERGREEN">
+				<duration>
+					<unit>UNLIMITED</unit>
+				</duration>
+				<billingPeriod>MONTHLY</billingPeriod>
+				<recurringPrice>
+					<price><currency>GBP</currency><value>1.0</value></price>
+					<price><currency>EUR</currency><value>1.0</value></price> 
+					<price><currency>USD</currency><value>1.0</value></price>								
+				</recurringPrice>
+			</finalPhase>
+		</plan>
+		<plan name="shotgun-monthly">
+			<product>Shotgun</product>
+			<initialPhases>
+				<phase type="TRIAL">
+					<duration>
+						<unit>DAYS</unit>
+						<number>30</number>
+					</duration>
+					<billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+					<fixedPrice></fixedPrice>
+				    <!-- no price implies $0 -->
+				</phase>
+			</initialPhases>
+			<finalPhase type="EVERGREEN">
+				<duration>
+					<unit>UNLIMITED</unit>
+					<number>-1</number>
+				</duration>
+				<billingPeriod>MONTHLY</billingPeriod>
+				<recurringPrice>
+					<price><currency>USD</currency><value>249.95</value></price>								
+					<price><currency>EUR</currency><value>149.95</value></price>
+					<price><currency>GBP</currency><value>169.95</value></price>
+				</recurringPrice>
+			</finalPhase>
+		</plan>
+		<plan name="shotgun-annual">
+			<product>Shotgun</product>
+			<initialPhases>
+				<phase type="TRIAL">
+					<duration>
+						<unit>DAYS</unit>
+						<number>30</number>
+					</duration>
+					<billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+					<fixedPrice>
+					</fixedPrice>
+				</phase>
+			</initialPhases>
+			<finalPhase type="EVERGREEN">
+				<duration>
+					<unit>UNLIMITED</unit>
+				</duration>
+				<billingPeriod>ANNUAL</billingPeriod>
+				<recurringPrice>
+					<price><currency>USD</currency><value>2399.95</value></price>								
+					<price><currency>EUR</currency><value>1499.95</value></price>
+					<price><currency>GBP</currency><value>1699.95</value></price>
+				</recurringPrice>
+			</finalPhase>
+		</plan>
+	</plans>
+	<priceLists>
+		<defaultPriceList name="DEFAULT">			
+				<plans>	
+				<plan>pistol-monthly</plan>
+				<plan>shotgun-monthly</plan>
+				<plan>shotgun-annual</plan>
+			</plans>
+		</defaultPriceList>
+	</priceLists>
+</catalog>
diff --git a/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-2.xml b/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-2.xml
new file mode 100644
index 0000000..0e650f1
--- /dev/null
+++ b/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-2.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+  ~ Copyright 2010-2011 Ning, Inc.
+  ~
+  ~ Ning licenses this file to you under the Apache License, version 2.0
+  ~ (the "License"); you may not use this file except in compliance with the
+  ~ License.  You may obtain a copy of the License at:
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+  ~ License for the specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<catalog xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:noNamespaceSchemaLocation="../CatalogSchema.xsd ">
+
+	<effectiveDate>2011-02-02T00:00:00+00:00</effectiveDate>
+	<catalogName>WeaponsHireSmall</catalogName>
+
+	<currencies>
+		<currency>USD</currency>
+		<currency>EUR</currency>
+		<currency>GBP</currency>
+	</currencies>
+
+	<products>
+		<product name="Pistol">
+			<category>BASE</category>
+		</product>
+		<product name="Shotgun">
+			<category>BASE</category>
+		</product>
+		<product name="Laser-Scope">
+			<category>ADD_ON</category>
+		</product>
+	</products>
+	 
+	<rules>
+		<changePolicy>
+			<changePolicyCase> 
+				<fromBillingPeriod>MONTHLY</fromBillingPeriod>
+				<toProduct>Shotgun</toProduct>
+				<toBillingPeriod>MONTHLY</toBillingPeriod>
+				<policy>END_OF_TERM</policy>
+			</changePolicyCase>
+			<changePolicyCase> 
+				<phaseType>TRIAL</phaseType>
+				<policy>IMMEDIATE</policy>
+			</changePolicyCase>	
+		</changePolicy>
+		<changeAlignment>
+			<changeAlignmentCase>
+				<alignment>START_OF_SUBSCRIPTION</alignment>
+			</changeAlignmentCase>
+		</changeAlignment>
+		 <createAlignment>
+            <createAlignmentCase>
+                <product>Laser-Scope</product>
+                <alignment>START_OF_SUBSCRIPTION</alignment>
+            </createAlignmentCase>
+            <createAlignmentCase>
+                <alignment>START_OF_BUNDLE</alignment>
+            </createAlignmentCase>
+        </createAlignment>
+	</rules>
+
+	<plans>
+		<plan name="pistol-monthly">
+			<effectiveDateForExistingSubscriptons>2011-02-14T00:00:00+00:00</effectiveDateForExistingSubscriptons>
+			<product>Pistol</product>
+			<initialPhases>
+				<phase type="TRIAL">
+					<duration>
+						<unit>DAYS</unit>
+						<number>30</number>
+					</duration>
+					<billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+					<fixedPrice> <!-- empty price implies $0 -->
+					</fixedPrice>
+				</phase>
+			</initialPhases>
+			<finalPhase type="EVERGREEN">
+				<duration>
+					<unit>UNLIMITED</unit>
+				</duration>
+				<billingPeriod>MONTHLY</billingPeriod>
+				<recurringPrice>
+					<price><currency>GBP</currency><value>2.0</value></price>
+					<price><currency>EUR</currency><value>2.0</value></price> 
+					<price><currency>USD</currency><value>2.0</value></price>								
+				</recurringPrice>
+			</finalPhase>
+		</plan>
+		<plan name="shotgun-monthly">
+			<product>Shotgun</product>
+			<initialPhases>
+				<phase type="TRIAL">
+					<duration>
+						<unit>DAYS</unit>
+						<number>30</number>
+					</duration>
+					<billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+					<fixedPrice></fixedPrice>
+				    <!-- no price implies $0 -->
+				</phase>
+			</initialPhases>
+			<finalPhase type="EVERGREEN">
+				<duration>
+					<unit>UNLIMITED</unit>
+					<number>-1</number>
+				</duration>
+				<billingPeriod>MONTHLY</billingPeriod>
+				<recurringPrice>
+					<price><currency>USD</currency><value>249.95</value></price>								
+					<price><currency>EUR</currency><value>149.95</value></price>
+					<price><currency>GBP</currency><value>169.95</value></price>
+				</recurringPrice>
+			</finalPhase>
+		</plan>
+		<plan name="shotgun-annual">
+			<product>Shotgun</product>
+			<initialPhases>
+				<phase type="TRIAL">
+					<duration>
+						<unit>DAYS</unit>
+						<number>30</number>
+					</duration>
+					<billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+					<fixedPrice>
+					</fixedPrice>
+				</phase>
+			</initialPhases>
+			<finalPhase type="EVERGREEN">
+				<duration>
+					<unit>UNLIMITED</unit>
+				</duration>
+				<billingPeriod>ANNUAL</billingPeriod>
+				<recurringPrice>
+					<price><currency>USD</currency><value>2399.95</value></price>								
+					<price><currency>EUR</currency><value>1499.95</value></price>
+					<price><currency>GBP</currency><value>1699.95</value></price>
+				</recurringPrice>
+			</finalPhase>
+		</plan>
+	</plans>
+	<priceLists>
+		<defaultPriceList name="DEFAULT">			
+				<plans>	
+				<plan>pistol-monthly</plan>
+				<plan>shotgun-monthly</plan>
+				<plan>shotgun-annual</plan>
+			</plans>
+		</defaultPriceList>
+	</priceLists>
+</catalog>
diff --git a/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-3.xml b/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-3.xml
new file mode 100644
index 0000000..f7ae066
--- /dev/null
+++ b/entitlement/src/test/resources/versionedCatalog/WeaponsHireSmall-3.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+  ~ Copyright 2010-2011 Ning, Inc.
+  ~
+  ~ Ning licenses this file to you under the Apache License, version 2.0
+  ~ (the "License"); you may not use this file except in compliance with the
+  ~ License.  You may obtain a copy of the License at:
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+  ~ License for the specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<catalog xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:noNamespaceSchemaLocation="../CatalogSchema.xsd ">
+
+	<effectiveDate>2011-03-03T00:00:00+00:00</effectiveDate>
+	<catalogName>WeaponsHireSmall</catalogName>
+
+	<currencies>
+		<currency>USD</currency>
+		<currency>EUR</currency>
+		<currency>GBP</currency>
+	</currencies>
+
+	<products>
+		<product name="Pistol">
+			<category>BASE</category>
+		</product>
+		<product name="Shotgun">
+			<category>BASE</category>
+		</product>
+		<product name="Laser-Scope">
+			<category>ADD_ON</category>
+		</product>
+	</products>
+	 
+	<rules>
+		<changePolicy>
+			<changePolicyCase> 
+				<fromBillingPeriod>MONTHLY</fromBillingPeriod>
+				<toProduct>Shotgun</toProduct>
+				<toBillingPeriod>MONTHLY</toBillingPeriod>
+				<policy>END_OF_TERM</policy>
+			</changePolicyCase>
+			<changePolicyCase> 
+				<phaseType>TRIAL</phaseType>
+				<policy>IMMEDIATE</policy>
+			</changePolicyCase>	
+		</changePolicy>
+		<changeAlignment>
+			<changeAlignmentCase>
+				<alignment>START_OF_SUBSCRIPTION</alignment>
+			</changeAlignmentCase>
+		</changeAlignment>
+		 <createAlignment>
+            <createAlignmentCase>
+                <product>Laser-Scope</product>
+                <alignment>START_OF_SUBSCRIPTION</alignment>
+            </createAlignmentCase>
+            <createAlignmentCase>
+                <alignment>START_OF_BUNDLE</alignment>
+            </createAlignmentCase>
+        </createAlignment>
+	</rules>
+
+
+	<plans>
+		<plan name="pistol-monthly">
+			<product>Pistol</product>
+			<initialPhases>
+				<phase type="TRIAL">
+					<duration>
+						<unit>DAYS</unit>
+						<number>30</number>
+					</duration>
+					<billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+					<fixedPrice> <!-- empty price implies $0 -->
+					</fixedPrice>
+				</phase>
+			</initialPhases>
+			<finalPhase type="EVERGREEN">
+				<duration>
+					<unit>UNLIMITED</unit>
+				</duration>
+				<billingPeriod>MONTHLY</billingPeriod>
+				<recurringPrice>
+					<price><currency>GBP</currency><value>3.0</value></price>
+					<price><currency>EUR</currency><value>3.0</value></price> 
+					<price><currency>USD</currency><value>3.0</value></price>								
+				</recurringPrice>
+			</finalPhase>
+		</plan>
+		<plan name="shotgun-monthly">
+			<product>Shotgun</product>
+			<initialPhases>
+				<phase type="TRIAL">
+					<duration>
+						<unit>DAYS</unit>
+						<number>30</number>
+					</duration>
+					<billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+					<fixedPrice></fixedPrice>
+				    <!-- no price implies $0 -->
+				</phase>
+			</initialPhases>
+			<finalPhase type="EVERGREEN">
+				<duration>
+					<unit>UNLIMITED</unit>
+					<number>-1</number>
+				</duration>
+				<billingPeriod>MONTHLY</billingPeriod>
+				<recurringPrice>
+					<price><currency>USD</currency><value>249.95</value></price>								
+					<price><currency>EUR</currency><value>149.95</value></price>
+					<price><currency>GBP</currency><value>169.95</value></price>
+				</recurringPrice>
+			</finalPhase>
+		</plan>
+		<plan name="shotgun-annual">
+			<product>Shotgun</product>
+			<initialPhases>
+				<phase type="TRIAL">
+					<duration>
+						<unit>DAYS</unit>
+						<number>30</number>
+					</duration>
+					<billingPeriod>NO_BILLING_PERIOD</billingPeriod>
+					<fixedPrice>
+					</fixedPrice>
+				</phase>
+			</initialPhases>
+			<finalPhase type="EVERGREEN">
+				<duration>
+					<unit>UNLIMITED</unit>
+				</duration>
+				<billingPeriod>ANNUAL</billingPeriod>
+				<recurringPrice>
+					<price><currency>USD</currency><value>2399.95</value></price>								
+					<price><currency>EUR</currency><value>1499.95</value></price>
+					<price><currency>GBP</currency><value>1699.95</value></price>
+				</recurringPrice>
+			</finalPhase>
+		</plan>
+	</plans>
+	<priceLists>
+		<defaultPriceList name="DEFAULT">			
+				<plans>	
+				<plan>pistol-monthly</plan>
+				<plan>shotgun-monthly</plan>
+				<plan>shotgun-annual</plan>
+			</plans>
+		</defaultPriceList>
+	</priceLists>
+</catalog>
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index 22f4990..d9e29f0 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -22,7 +22,7 @@ import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceUserApi;
 import com.ning.billing.invoice.dao.DefaultInvoiceDao;
 import com.ning.billing.invoice.dao.InvoiceDao;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index 20f33c7..8612eb4 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -23,7 +23,7 @@ import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceCreationNotification;
 import com.ning.billing.invoice.api.InvoiceItem;
 import com.ning.billing.invoice.api.user.DefaultInvoiceCreationNotification;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
@@ -38,11 +38,11 @@ import java.util.UUID;
 public class DefaultInvoiceDao implements InvoiceDao {
     private final InvoiceSqlDao invoiceDao;
 
-    private final EventBus eventBus;
+    private final Bus eventBus;
     private final static Logger log = LoggerFactory.getLogger(DefaultInvoiceDao.class);
 
     @Inject
-    public DefaultInvoiceDao(final IDBI dbi, final EventBus eventBus) {
+    public DefaultInvoiceDao(final IDBI dbi, final Bus eventBus) {
         this.invoiceDao = dbi.onDemand(InvoiceSqlDao.class);
         this.eventBus = eventBus;
     }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateEvent.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateEvent.java
new file mode 100644
index 0000000..59ec8a2
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import java.util.UUID;
+
+import com.ning.billing.util.eventbus.BusEvent;
+
+public class NextBillingDateEvent implements BusEvent{
+	private final UUID subscriptionId;
+
+	public NextBillingDateEvent(UUID subscriptionId) {
+		super();
+		this.subscriptionId = subscriptionId;
+	}
+
+	public UUID getSubscriptionId() {
+		return subscriptionId;
+	}
+}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
new file mode 100644
index 0000000..a8a1b9e
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.config.InvoiceConfig;
+import com.ning.billing.entitlement.exceptions.EntitlementError;
+import com.ning.billing.lifecycle.KillbillService;
+import com.ning.billing.lifecycle.LifecycleHandlerType;
+import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
+import com.ning.billing.util.eventbus.Bus;
+import com.ning.billing.util.eventbus.Bus.EventBusException;
+import com.ning.billing.util.notificationq.NotificationConfig;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotficationQueueAlreadyExists;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public class NextBillingDateNotifier implements KillbillService {
+    private final static Logger log = LoggerFactory.getLogger(NextBillingDateNotifier.class);
+
+    private static final String NEXT_BILLING_DATE_NOTIFIER_SERVICE_NAME = "next-billing-date-notifier";
+    private static final String NEXT_BILLING_DATE_NOTIFIER_QUEUE = "next-billing-date-queue";
+    
+    private final Bus eventBus;
+	private boolean stoppedNotificationThread;
+	private boolean startedNotificationThread;
+    private final NotificationQueueService notificationQueueService;
+    private NotificationQueue nextBillingQueue;
+	private InvoiceConfig config;
+
+	@Inject
+	public NextBillingDateNotifier(NotificationQueueService notificationQueueService, Bus eventBus, InvoiceConfig config){
+		this.notificationQueueService = notificationQueueService;
+		this.config = config;
+		this.eventBus = eventBus;
+	}
+	
+	
+	@LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+    public void initialize() {
+		try {
+            this.stoppedNotificationThread = false;
+            this.startedNotificationThread = false;
+            nextBillingQueue = notificationQueueService.createNotificationQueue(NEXT_BILLING_DATE_NOTIFIER_SERVICE_NAME,
+            		NEXT_BILLING_DATE_NOTIFIER_QUEUE,
+                    new NotificationQueueHandler() {
+                @Override
+                public void handleReadyNotification(String notificationKey) {
+                	UUID subscriptionId;
+                	try {
+                		subscriptionId = UUID.fromString(notificationKey);
+                	} catch (IllegalArgumentException e) {
+                		log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID",e);
+                		return;
+                	}
+                    
+                    processEventReady(subscriptionId);
+                }
+            },
+            new NotificationConfig() {
+                @Override
+                public boolean isNotificationProcessingOff() {
+                    return config.isEventProcessingOff();
+                }
+                @Override
+                public long getNotificationSleepTimeMs() {
+                    return config.getNotificationSleepTimeMs();
+                }
+                @Override
+                public int getDaoMaxReadyEvents() {
+                    return config.getDaoMaxReadyEvents();
+                }
+                @Override
+                public long getDaoClaimTimeMs() {
+                    return config.getDaoMaxReadyEvents();
+                }
+            });
+        } catch (NotficationQueueAlreadyExists e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+    public void start() {
+    	nextBillingQueue.startQueue();
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+    public void stop() {
+        if (nextBillingQueue != null) {
+        	nextBillingQueue.stopQueue();
+        }
+        startedNotificationThread = false;
+    }
+     
+	@Override
+	public String getName() {
+		return NEXT_BILLING_DATE_NOTIFIER_SERVICE_NAME;
+	}
+	
+    public void processEventReady(UUID subscriptionId) {
+        try {
+            eventBus.post(new NextBillingDateEvent(subscriptionId));
+        } catch (EventBusException e) {
+            log.error("Failed to post entitlement event " + subscriptionId, e);
+        }
+    }
+
+}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
index 268aaf5..7d41ed8 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
@@ -24,7 +24,7 @@ import com.google.inject.Injector;
 import com.google.inject.Stage;
 import com.ning.billing.invoice.glue.InvoiceModuleMock;
 import com.ning.billing.util.eventbus.DefaultEventBusService;
-import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.BusService;
 
 import static org.testng.Assert.fail;
 
@@ -47,7 +47,7 @@ public abstract class InvoiceDaoTestBase {
 
             invoiceItemDao = module.getInvoiceItemDao();
 
-            EventBusService busService = injector.getInstance(EventBusService.class);
+            BusService busService = injector.getInstance(BusService.class);
             ((DefaultEventBusService) busService).startBus();
         }
         catch (Throwable t) {
diff --git a/util/src/main/java/com/ning/billing/util/eventbus/DefaultEventBusService.java b/util/src/main/java/com/ning/billing/util/eventbus/DefaultEventBusService.java
index afcae5e..01ad9b8 100644
--- a/util/src/main/java/com/ning/billing/util/eventbus/DefaultEventBusService.java
+++ b/util/src/main/java/com/ning/billing/util/eventbus/DefaultEventBusService.java
@@ -20,14 +20,14 @@ import com.google.inject.Inject;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
 import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
 
-public class DefaultEventBusService implements EventBusService {
+public class DefaultEventBusService implements BusService {
 
     private final static String EVENT_BUS_SERVICE = "eventbus-service";
 
-    private final EventBus eventBus;
+    private final Bus eventBus;
 
     @Inject
-    public DefaultEventBusService(EventBus eventBus) {
+    public DefaultEventBusService(Bus eventBus) {
         this.eventBus = eventBus;
     }
 
@@ -47,7 +47,7 @@ public class DefaultEventBusService implements EventBusService {
     }
 
     @Override
-    public EventBus getEventBus() {
+    public Bus getBus() {
         return eventBus;
     }
 
diff --git a/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
index 259736d..7312e5b 100644
--- a/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
+++ b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
@@ -26,7 +26,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class MemoryEventBus implements EventBus {
+public class MemoryEventBus implements Bus {
 
     // STEPH config ?
     private final static int MAX_EVENT_THREADS = 1;
@@ -92,13 +92,13 @@ public class MemoryEventBus implements EventBus {
     }
 
     @Override
-    public void post(EventBusNotification event) throws EventBusException {
+    public void post(BusEvent event) throws EventBusException {
         checkInitialized("post");
         delegate.post(event);
     }
 
     @Override
-    public void postFromTransaction(EventBusNotification event, Transmogrifier dao) throws EventBusException {
+    public void postFromTransaction(BusEvent event, Transmogrifier dao) throws EventBusException {
         checkInitialized("postFromTransaction");
         delegate.post(event);
     }
diff --git a/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java b/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
index 078c331..8e6830e 100644
--- a/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
@@ -18,16 +18,16 @@ package com.ning.billing.util.glue;
 
 import com.google.inject.AbstractModule;
 import com.ning.billing.util.eventbus.DefaultEventBusService;
-import com.ning.billing.util.eventbus.EventBus;
-import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.Bus;
+import com.ning.billing.util.eventbus.BusService;
 import com.ning.billing.util.eventbus.MemoryEventBus;
 
 public class EventBusModule extends AbstractModule {
 
     @Override
     protected void configure() {
-        bind(EventBusService.class).to(DefaultEventBusService.class);
-        bind(EventBus.class).to(MemoryEventBus.class).asEagerSingleton();
+        bind(BusService.class).to(DefaultEventBusService.class);
+        bind(Bus.class).to(MemoryEventBus.class).asEagerSingleton();
 
     }
 
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 80f7385..2f18379 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -40,6 +40,8 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
         this.dao = dbi.onDemand(NotificationSqlDao.class);
     }
 
+
+
     @Override
     protected void doProcessEvents(int sequenceId) {
         List<Notification> notifications = getReadyNotifications(sequenceId);
@@ -116,4 +118,5 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
         }
         return result;
     }
+    
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationError.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationError.java
new file mode 100644
index 0000000..4e771ba
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationError.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+public class NotificationError extends Error {
+
+    private static final long serialVersionUID = 131398536;
+
+    public NotificationError() {
+        super();
+    }
+
+    public NotificationError(String msg, Throwable arg1) {
+        super(msg, arg1);
+    }
+
+    public NotificationError(String msg) {
+        super(msg);
+    }
+
+    public NotificationError(Throwable msg) {
+        super(msg);
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 23f0de0..4ea38f7 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -42,18 +42,17 @@ public interface NotificationQueue {
    public void processReadyNotification();
 
    /**
-    * Stops the queue.
+    * Stops the queue. Blocks until queue is completely stopped.
     *
     * @see NotificationQueueHandler.completedQueueStop to be notified when the notification thread exited
     */
    public void stopQueue();
 
    /**
-    * Starts the queue.
+    * Starts the queue. Blocks until queue has completely started.
     *
     * @see NotificationQueueHandler.completedQueueStart to be notified when the notification thread started
     */
    public void startQueue();
 
-
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 6eaf33f..6a2810d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -17,34 +17,28 @@
 package com.ning.billing.util.notificationq;
 
 import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.joda.time.DateTime;
-import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.Transaction;
-import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.util.Hostname;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
-import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
 
 public abstract class NotificationQueueBase implements NotificationQueue {
 
     protected final static Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
 
+    private static final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
+    private static final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
+    private static final long NANO_TO_MS = (1000 * 1000);
+
     protected static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
     protected final long STOP_WAIT_TIMEOUT_MS = 60000;
 
@@ -63,7 +57,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
     // Use this object's monitor for synchronization (no need for volatile)
     protected boolean isProcessingEvents;
-
+    
+    private boolean startedComplete = false;
+    private boolean stoppedComplete = false;
+    
     // Package visibility on purpose
     NotificationQueueBase(final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
         this.clock = clock;
@@ -99,7 +96,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
     @Override
     public void stopQueue() {
         if (config.isNotificationProcessingOff()) {
-            handler.completedQueueStop();
+            completedQueueStop();
             return;
         }
 
@@ -113,7 +110,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
                 log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
             }
         }
-
+        waitForNotificationStopCompletion();
     }
 
     @Override
@@ -125,7 +122,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
         if (config.isNotificationProcessingOff()) {
             log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
-            handler.completedQueueStart();
+            completedQueueStart();
             return;
         }
         final NotificationQueueBase notificationQueue = this;
@@ -139,7 +136,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
                         Thread.currentThread().getId()));
 
                 // Thread is now started, notify the listener
-                handler.completedQueueStart();
+                completedQueueStart();
 
                 try {
                     while (true) {
@@ -171,7 +168,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
                     // Just to make it really obvious in the log
                     e.printStackTrace();
                 } finally {
-                    handler.completedQueueStop();
+                    completedQueueStop();
                     log.info(String.format("NotificationQueue thread  %s  [%d] exited...",
                             Thread.currentThread().getName(),
                             Thread.currentThread().getId()));
@@ -182,8 +179,59 @@ public abstract class NotificationQueueBase implements NotificationQueue {
                 Thread.sleep(config.getNotificationSleepTimeMs());
             }
         });
+        waitForNotificationStartCompletion();
+    }
+    
+    private void completedQueueStop() {
+    	synchronized (this) {
+    		stoppedComplete = true;
+            this.notifyAll();
+        }
+    }
+    
+    private void completedQueueStart() {
+        synchronized (this) {
+        	startedComplete = true;
+            this.notifyAll();
+        }
     }
 
+    private void waitForNotificationStartCompletion() {
+        waitForNotificationEventCompletion(true);
+    }
+
+    private void waitForNotificationStopCompletion() {
+        waitForNotificationEventCompletion(false);
+    }
+
+    private void waitForNotificationEventCompletion(boolean startEvent) {
+
+        long ini = System.nanoTime();
+        synchronized(this) {
+            do {
+                if ((startEvent ? startedComplete : stoppedComplete)) {
+                    break;
+                }
+                try {
+                    this.wait(NOTIFICATION_THREAD_WAIT_INCREMENT_MS);
+                } catch (InterruptedException e ) {
+                    Thread.currentThread().interrupt();
+                    throw new NotificationError(e);
+                }
+            } while (!(startEvent ? startedComplete : stoppedComplete) &&
+                    (System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
+
+            if (!(startEvent ? startedComplete : stoppedComplete)) {
+                log.error("Could not {} notification thread in {} msec !!!",
+                        (startEvent ? "start" : "stop"),
+                        MAX_NOTIFICATION_THREAD_WAIT_MS);
+                throw new NotificationError("Failed to start service!!");
+            }
+            log.info("Notification thread has been {} in {} ms",
+                    (startEvent ? "started" : "stopped"),
+                    (System.nanoTime() - ini) / NANO_TO_MS);
+        }
+    }
 
     protected String getFullQName() {
         return svcName + ":" +  queueName;
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index a18906b..c1feca1 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -23,21 +23,12 @@ public interface NotificationQueueService {
 
     public interface NotificationQueueHandler {
         /**
-         * Called when the Notification thread has been started
-         */
-        public void completedQueueStart();
-
-        /**
          * Called for each notification ready
          *
          * @param key the notification key associated to that notification entry
          */
         public void handleReadyNotification(String notificationKey);
-        /**
-         * Called right before the Notification thread is about to exit
-         */
-        public void completedQueueStop();
-    }
+     }
 
     public static final class NotficationQueueAlreadyExists extends Exception {
         private static final long serialVersionUID = 1541281L;
diff --git a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
index 4b0f4a2..0091ab6 100644
--- a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
+++ b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
@@ -28,7 +28,7 @@ public class TestEventBus {
 
     private static final Logger log = LoggerFactory.getLogger(TestEventBus.class);
 
-    private EventBus eventBus;
+    private Bus eventBus;
 
 
     @BeforeClass
@@ -42,7 +42,7 @@ public class TestEventBus {
         eventBus.stop();
     }
 
-    public static final class MyEvent implements EventBusNotification {
+    public static final class MyEvent implements BusEvent {
         String name;
         Long value;
 
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 2e3bb3c..8058561 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -17,7 +17,6 @@
 package com.ning.billing.util.notificationq;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -28,18 +27,11 @@ import java.util.UUID;
 
 import org.apache.commons.io.IOUtils;
 import org.joda.time.DateTime;
-import org.skife.config.ConfigurationObjectFactory;
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
 import org.skife.jdbi.v2.tweak.HandleCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Guice;
@@ -49,8 +41,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
-import com.ning.billing.dbi.DBIProvider;
-import com.ning.billing.dbi.DbiConfig;
 import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
@@ -59,370 +49,243 @@ import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
 @Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
 public class TestNotificationQueue {
-
-    private final static Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
-
-    @Inject
-    private DBI dbi;
-
-    @Inject
-    MysqlTestingHelper helper;
-
-    @Inject
-    private Clock clock;
-
-    private DummySqlTest dao;
-
-   // private NotificationQueue queue;
-
-    private void startMysql() throws IOException, ClassNotFoundException, SQLException {
-        final String ddl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
-        final String testDdl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl_test.sql"));
-        helper.startMysql();
-        helper.initDb(ddl);
-        helper.initDb(testDdl);
-    }
-
-    @BeforeSuite(alwaysRun = true)
-    public void setup() throws Exception {
-        startMysql();
-        dao = dbi.onDemand(DummySqlTest.class);
-    }
-
-    @BeforeTest
-    public void beforeTest() {
-        dbi.withHandle(new HandleCallback<Void>() {
-
-            @Override
-            public Void withHandle(Handle handle) throws Exception {
-                handle.execute("delete from notifications");
-                handle.execute("delete from claimed_notifications");
-                handle.execute("delete from dummy");
-                return null;
-            }
-        });
-        // Reset time to real value
-        ((ClockMock) clock).resetDeltaFromReality();
-    }
-
-
-
-    /**
-     * Verify that we can call start/stop on a disabled queue and that both start/stop callbacks are called
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void testSimpleQueueDisabled() throws InterruptedException {
-
-        final TestStartStop testStartStop = new TestStartStop(false, false);
-        DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "dead",
-                new NotificationQueueHandler() {
-                    @Override
-                    public void handleReadyNotification(String notificationKey) {
-                    }
-                    @Override
-                    public void completedQueueStop() {
-                        testStartStop.stopped();
-                    }
-                    @Override
-                    public void completedQueueStart() {
-                        testStartStop.started();
-                    }
-                },
-                getNotificationConfig(true, 100, 1, 10000));
-
-        executeTest(testStartStop, queue, new WithTest() {
-            @Override
-            public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
-                // Do nothing
-            }
-        });
-        assertTrue(true);
-    }
-
-    /**
-     * Test that we can post a notification in the future from a transaction and get the notification
-     * callback with the correct key when the time is ready
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void testSimpleNotification() throws InterruptedException {
-
-        final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
-
-        final TestStartStop testStartStop = new TestStartStop(false, false);
-        DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
-                new NotificationQueueHandler() {
-                    @Override
-                    public void handleReadyNotification(String notificationKey) {
-                        synchronized (expectedNotifications) {
-                            expectedNotifications.put(notificationKey, Boolean.TRUE);
-                            expectedNotifications.notify();
-                        }
-                    }
-                    @Override
-                    public void completedQueueStop() {
-                        testStartStop.stopped();
-                    }
-                    @Override
-                    public void completedQueueStart() {
-                        testStartStop.started();
-                    }
-                },
-                getNotificationConfig(false, 100, 1, 10000));
-
-
-        executeTest(testStartStop, queue, new WithTest() {
-            @Override
-            public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
-
-                final UUID key = UUID.randomUUID();
-                final DummyObject obj = new DummyObject("foo", key);
-                final DateTime now = new DateTime();
-                final DateTime readyTime = now.plusMillis(2000);
-                final NotificationKey notificationKey = new NotificationKey() {
-                    @Override
-                    public String toString() {
-                        return key.toString();
-                    }
-                };
-                expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
-
-
-                // Insert dummy to be processed in 2 sec'
-                dao.inTransaction(new Transaction<Void, DummySqlTest>() {
-                    @Override
-                    public Void inTransaction(DummySqlTest transactional,
-                            TransactionStatus status) throws Exception {
-
-                        transactional.insertDummy(obj);
-                        readyQueue.recordFutureNotificationFromTransaction(transactional,
-                                readyTime, notificationKey);
-                        return null;
-                    }
-                });
-
-                // Move time in the future after the notification effectiveDate
-                ((ClockMock) clock).setDeltaFromReality(3000);
-
-                // Notification should have kicked but give it at least a sec' for thread scheduling
-                int nbTry = 1;
-                boolean success = false;
-                do {
-                    synchronized(expectedNotifications) {
-                        if (expectedNotifications.get(notificationKey.toString())) {
-                            success = true;
-                            break;
-                        }
-                        expectedNotifications.wait(1000);
-                    }
-                } while (nbTry-- > 0);
-                assertEquals(success, true);
-            }
-        });
-    }
-
-    @Test
-    public void testManyNotifications() throws InterruptedException {
-        final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
-
-        final TestStartStop testStartStop = new TestStartStop(false, false);
-        DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
-                new NotificationQueueHandler() {
-                    @Override
-                    public void handleReadyNotification(String notificationKey) {
-                        synchronized (expectedNotifications) {
-                            expectedNotifications.put(notificationKey, Boolean.TRUE);
-                            expectedNotifications.notify();
-                        }
-                    }
-                    @Override
-                    public void completedQueueStop() {
-                        testStartStop.stopped();
-                    }
-                    @Override
-                    public void completedQueueStart() {
-                        testStartStop.started();
-                    }
-                },
-                getNotificationConfig(false, 100, 10, 10000));
-
-
-        executeTest(testStartStop, queue, new WithTest() {
-            @Override
-            public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
-
-                final DateTime now = clock.getUTCNow();
-                final int MAX_NOTIFICATIONS = 100;
-                for (int i = 0; i < MAX_NOTIFICATIONS; i++) {
-
-                    final int nextReadyTimeIncrementMs = 1000;
-
-                    final UUID key = UUID.randomUUID();
-                    final DummyObject obj = new DummyObject("foo", key);
-                    final int currentIteration = i;
-
-                    final NotificationKey notificationKey = new NotificationKey() {
-                        @Override
-                        public String toString() {
-                            return key.toString();
-                        }
-                    };
-                    expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
-
-                    dao.inTransaction(new Transaction<Void, DummySqlTest>() {
-                        @Override
-                        public Void inTransaction(DummySqlTest transactional,
-                                TransactionStatus status) throws Exception {
-
-                            transactional.insertDummy(obj);
-                            readyQueue.recordFutureNotificationFromTransaction(transactional,
-                                    now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
-                            return null;
-                        }
-                    });
-
-                    // Move time in the future after the notification effectiveDate
-                    if (i == 0) {
-                        ((ClockMock) clock).setDeltaFromReality(nextReadyTimeIncrementMs);
-                    } else {
-                        ((ClockMock) clock).addDeltaFromReality(nextReadyTimeIncrementMs);
-                    }
-                }
-
-                // Wait a little longer since there are a lot of callback that need to happen
-                int nbTry = MAX_NOTIFICATIONS + 1;
-                boolean success = false;
-                do {
-                    synchronized(expectedNotifications) {
-
-                        Collection<Boolean> completed =  Collections2.filter(expectedNotifications.values(), new Predicate<Boolean>() {
-                            @Override
-                            public boolean apply(Boolean input) {
-                                return input;
-                            }
-                        });
-
-                        if (completed.size() == MAX_NOTIFICATIONS) {
-                            success = true;
-                            break;
-                        }
-                        //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
-                        expectedNotifications.wait(1000);
-                    }
-                } while (nbTry-- > 0);
-                assertEquals(success, true);
-            }
-        });
-    }
-
-
-    NotificationConfig getNotificationConfig(final boolean off,
-            final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
-        return new NotificationConfig() {
-            @Override
-            public boolean isNotificationProcessingOff() {
-                return off;
-            }
-            @Override
-            public long getNotificationSleepTimeMs() {
-                return sleepTime;
-            }
-            @Override
-            public int getDaoMaxReadyEvents() {
-                return maxReadyEvents;
-            }
-            @Override
-            public long getDaoClaimTimeMs() {
-                return claimTimeMs;
-            }
-        };
-    }
-
-    private static class TestStartStop {
-        private boolean started;
-        private boolean stopped;
-
-        public TestStartStop(boolean started, boolean stopped) {
-            super();
-            this.started = started;
-            this.stopped = stopped;
-        }
-
-        public void started() {
-            synchronized(this) {
-                started = true;
-                notify();
-            }
-        }
-
-        public void stopped() {
-            synchronized(this) {
-                stopped = true;
-                notify();
-            }
-        }
-
-        public boolean waitForStartComplete(int timeoutMs) throws InterruptedException {
-            return waitForEventCompletion(timeoutMs, true);
-        }
-
-        public boolean waitForStopComplete(int timeoutMs) throws InterruptedException {
-            return waitForEventCompletion(timeoutMs, false);
-        }
-
-        private boolean waitForEventCompletion(int timeoutMs, boolean start) throws InterruptedException {
-            DateTime init = new DateTime();
-            synchronized(this) {
-                while (! ((start ? started : stopped))) {
-                    wait(timeoutMs);
-                    if (init.plusMillis(timeoutMs).isAfterNow()) {
-                        break;
-                    }
-                }
-            }
-            return (start ? started : stopped);
-        }
-    }
-
-    private interface WithTest {
-        public void test(DefaultNotificationQueue readyQueue) throws InterruptedException;
-    }
-
-    private void executeTest(final TestStartStop testStartStop,
-            DefaultNotificationQueue queue, WithTest test) throws InterruptedException{
-
-        queue.startQueue();
-        boolean started = testStartStop.waitForStartComplete(3000);
-        assertEquals(started, true);
-
-        test.test(queue);
-
-        queue.stopQueue();
-        boolean stopped = testStartStop.waitForStopComplete(3000);
-        assertEquals(stopped, true);
-    }
-
-
-    public static class TestNotificationQueueModule extends AbstractModule {
-        @Override
-        protected void configure() {
-
-            bind(Clock.class).to(ClockMock.class);
-
-            final MysqlTestingHelper helper = new MysqlTestingHelper();
-            bind(MysqlTestingHelper.class).toInstance(helper);
-            DBI dbi = helper.getDBI();
-            bind(DBI.class).toInstance(dbi);
-            /*
+	@Inject
+	private DBI dbi;
+
+	@Inject
+	MysqlTestingHelper helper;
+
+	@Inject
+	private Clock clock;
+
+	private DummySqlTest dao;
+
+	// private NotificationQueue queue;
+
+	private void startMysql() throws IOException, ClassNotFoundException, SQLException {
+		final String ddl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
+		final String testDdl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl_test.sql"));
+		helper.startMysql();
+		helper.initDb(ddl);
+		helper.initDb(testDdl);
+	}
+
+	@BeforeSuite(alwaysRun = true)
+	public void setup() throws Exception {
+		startMysql();
+		dao = dbi.onDemand(DummySqlTest.class);
+	}
+
+	@BeforeTest
+	public void beforeTest() {
+		dbi.withHandle(new HandleCallback<Void>() {
+
+			@Override
+			public Void withHandle(Handle handle) throws Exception {
+				handle.execute("delete from notifications");
+				handle.execute("delete from claimed_notifications");
+				handle.execute("delete from dummy");
+				return null;
+			}
+		});
+		// Reset time to real value
+		((ClockMock) clock).resetDeltaFromReality();
+	}
+
+
+
+	/**
+	 * Test that we can post a notification in the future from a transaction and get the notification
+	 * callback with the correct key when the time is ready
+	 *
+	 * @throws InterruptedException
+	 */
+	@Test
+	public void testSimpleNotification() throws InterruptedException {
+
+		final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+		final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
+				new NotificationQueueHandler() {
+			@Override
+			public void handleReadyNotification(String notificationKey) {
+				synchronized (expectedNotifications) {
+					expectedNotifications.put(notificationKey, Boolean.TRUE);
+					expectedNotifications.notify();
+				}
+			}
+		},
+		getNotificationConfig(false, 100, 1, 10000));
+
+
+		queue.startQueue();
+
+		final UUID key = UUID.randomUUID();
+		final DummyObject obj = new DummyObject("foo", key);
+		final DateTime now = new DateTime();
+		final DateTime readyTime = now.plusMillis(2000);
+		final NotificationKey notificationKey = new NotificationKey() {
+			@Override
+			public String toString() {
+				return key.toString();
+			}
+		};
+		expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+
+		// Insert dummy to be processed in 2 sec'
+		dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+			@Override
+			public Void inTransaction(DummySqlTest transactional,
+					TransactionStatus status) throws Exception {
+
+				transactional.insertDummy(obj);
+				queue.recordFutureNotificationFromTransaction(transactional,
+						readyTime, notificationKey);
+				return null;
+			}
+		});
+
+		// Move time in the future after the notification effectiveDate
+		((ClockMock) clock).setDeltaFromReality(3000);
+
+		// Notification should have kicked but give it at least a sec' for thread scheduling
+		int nbTry = 1;
+		boolean success = false;
+		do {
+			synchronized(expectedNotifications) {
+				if (expectedNotifications.get(notificationKey.toString())) {
+					success = true;
+					break;
+				}
+				expectedNotifications.wait(1000);
+			}
+		} while (nbTry-- > 0);
+		assertEquals(success, true);
+	}
+
+	@Test
+	public void testManyNotifications() throws InterruptedException {
+		final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+		final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
+				new NotificationQueueHandler() {
+			@Override
+			public void handleReadyNotification(String notificationKey) {
+				synchronized (expectedNotifications) {
+					expectedNotifications.put(notificationKey, Boolean.TRUE);
+					expectedNotifications.notify();
+				}
+			}
+		},
+		getNotificationConfig(false, 100, 10, 10000));
+
+
+		queue.startQueue();
+
+		final DateTime now = clock.getUTCNow();
+		final int MAX_NOTIFICATIONS = 100;
+		for (int i = 0; i < MAX_NOTIFICATIONS; i++) {
+
+			final int nextReadyTimeIncrementMs = 1000;
+
+			final UUID key = UUID.randomUUID();
+			final DummyObject obj = new DummyObject("foo", key);
+			final int currentIteration = i;
+
+			final NotificationKey notificationKey = new NotificationKey() {
+				@Override
+				public String toString() {
+					return key.toString();
+				}
+			};
+			expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+			dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+				@Override
+				public Void inTransaction(DummySqlTest transactional,
+						TransactionStatus status) throws Exception {
+
+					transactional.insertDummy(obj);
+					queue.recordFutureNotificationFromTransaction(transactional,
+							now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
+					return null;
+				}
+			});
+
+			// Move time in the future after the notification effectiveDate
+			if (i == 0) {
+				((ClockMock) clock).setDeltaFromReality(nextReadyTimeIncrementMs);
+			} else {
+				((ClockMock) clock).addDeltaFromReality(nextReadyTimeIncrementMs);
+			}
+		}
+
+		// Wait a little longer since there are a lot of callback that need to happen
+		int nbTry = MAX_NOTIFICATIONS + 1;
+		boolean success = false;
+		do {
+			synchronized(expectedNotifications) {
+
+				Collection<Boolean> completed =  Collections2.filter(expectedNotifications.values(), new Predicate<Boolean>() {
+					@Override
+					public boolean apply(Boolean input) {
+						return input;
+					}
+				});
+
+				if (completed.size() == MAX_NOTIFICATIONS) {
+					success = true;
+					break;
+				}
+				//log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+				expectedNotifications.wait(1000);
+			}
+		} while (nbTry-- > 0);
+		assertEquals(success, true);
+
+	}
+
+	NotificationConfig getNotificationConfig(final boolean off,
+			final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
+		return new NotificationConfig() {
+			@Override
+			public boolean isNotificationProcessingOff() {
+				return off;
+			}
+			@Override
+			public long getNotificationSleepTimeMs() {
+				return sleepTime;
+			}
+			@Override
+			public int getDaoMaxReadyEvents() {
+				return maxReadyEvents;
+			}
+			@Override
+			public long getDaoClaimTimeMs() {
+				return claimTimeMs;
+			}
+		};
+	}
+
+
+	public static class TestNotificationQueueModule extends AbstractModule {
+		@Override
+		protected void configure() {
+
+			bind(Clock.class).to(ClockMock.class);
+
+			final MysqlTestingHelper helper = new MysqlTestingHelper();
+			bind(MysqlTestingHelper.class).toInstance(helper);
+			DBI dbi = helper.getDBI();
+			bind(DBI.class).toInstance(dbi);
+			/*
             bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
             final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
             bind(DbiConfig.class).toInstance(config);
-            */
-        }
-    }
+			 */
+		}
+	}
 
 
 }