killbill-aplcache

Changes

invoice/pom.xml 5(+5 -0)

Details

diff --git a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
index c4671ea..587a72a 100644
--- a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
@@ -30,16 +30,16 @@ import com.ning.billing.account.api.user.DefaultAccountChangeNotification;
 import com.ning.billing.account.api.user.DefaultAccountCreationEvent;
 import com.ning.billing.util.customfield.CustomField;
 import com.ning.billing.util.customfield.dao.FieldStoreDao;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
 import com.ning.billing.util.tag.Tag;
 import com.ning.billing.util.tag.dao.TagStoreDao;
 
 public class DefaultAccountDao implements AccountDao {
     private final AccountSqlDao accountDao;
-    private final EventBus eventBus;
+    private final Bus eventBus;
 
     @Inject
-    public DefaultAccountDao(IDBI dbi, EventBus eventBus) {
+    public DefaultAccountDao(IDBI dbi, Bus eventBus) {
         this.eventBus = eventBus;
         this.accountDao = dbi.onDemand(AccountSqlDao.class);
     }
diff --git a/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java b/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
index b18a41b..9dcce76 100644
--- a/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
+++ b/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
@@ -16,18 +16,17 @@
 
 package com.ning.billing.account.dao;
 
+import java.io.IOException;
+import org.apache.commons.io.IOUtils;
+import org.skife.jdbi.v2.IDBI;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Stage;
 import com.ning.billing.account.glue.AccountModuleMock;
+import com.ning.billing.util.eventbus.BusService;
 import com.ning.billing.util.eventbus.DefaultEventBusService;
-import com.ning.billing.util.eventbus.EventBusService;
-import org.apache.commons.io.IOUtils;
-import org.skife.jdbi.v2.IDBI;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-
-import java.io.IOException;
 
 import static org.testng.Assert.fail;
 
@@ -42,12 +41,10 @@ public abstract class AccountDaoTestBase {
         try {
             module = new AccountModuleMock();
             final String accountDdl = IOUtils.toString(AccountSqlDao.class.getResourceAsStream("/com/ning/billing/account/ddl.sql"));
-            final String invoiceDdl = IOUtils.toString(AccountSqlDao.class.getResourceAsStream("/com/ning/billing/invoice/ddl.sql"));
             final String utilDdl = IOUtils.toString(AccountSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
 
             module.startDb();
             module.initDb(accountDdl);
-            module.initDb(invoiceDdl);
             module.initDb(utilDdl);
 
             final Injector injector = Guice.createInjector(Stage.DEVELOPMENT, module);
@@ -56,7 +53,7 @@ public abstract class AccountDaoTestBase {
             accountDao = injector.getInstance(AccountDao.class);
             accountDao.test();
 
-            EventBusService busService = injector.getInstance(EventBusService.class);
+            BusService busService = injector.getInstance(BusService.class);
             ((DefaultEventBusService) busService).startBus();
         }
         catch (Throwable t) {
diff --git a/analytics/src/main/java/com/ning/billing/analytics/api/AnalyticsService.java b/analytics/src/main/java/com/ning/billing/analytics/api/AnalyticsService.java
index a08e3ab..e537841 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/api/AnalyticsService.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/api/AnalyticsService.java
@@ -19,7 +19,7 @@ package com.ning.billing.analytics.api;
 import com.google.inject.Inject;
 import com.ning.billing.analytics.AnalyticsListener;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,10 +30,10 @@ public class AnalyticsService implements IAnalyticsService
     private static final String ANALYTICS_SERVICE = "analytics-service";
 
     private final AnalyticsListener listener;
-    private final EventBus eventBus;
+    private final Bus eventBus;
 
     @Inject
-    public AnalyticsService(final AnalyticsListener listener, final EventBus eventBus)
+    public AnalyticsService(final AnalyticsListener listener, final Bus eventBus)
     {
         this.listener = listener;
         this.eventBus = eventBus;
@@ -51,7 +51,7 @@ public class AnalyticsService implements IAnalyticsService
         try {
             eventBus.register(listener);
         }
-        catch (EventBus.EventBusException e) {
+        catch (Bus.EventBusException e) {
             log.error("Unable to register to the EventBus!", e);
         }
     }
diff --git a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
index c146de7..2feffda 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
@@ -47,7 +47,7 @@ import com.ning.billing.entitlement.api.user.SubscriptionTransition;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.user.ApiEventType;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
 import com.ning.billing.util.tag.DefaultTag;
 import com.ning.billing.util.tag.DefaultTagDescription;
 import com.ning.billing.util.tag.Tag;
@@ -90,7 +90,7 @@ public class TestAnalyticsService
     private AnalyticsService service;
 
     @Inject
-    private EventBus bus;
+    private Bus bus;
 
     @Inject
     private BusinessSubscriptionTransitionDao subscriptionDao;
diff --git a/api/src/main/java/com/ning/billing/account/api/AccountChangeNotification.java b/api/src/main/java/com/ning/billing/account/api/AccountChangeNotification.java
index 9e5f254..2bc40f8 100644
--- a/api/src/main/java/com/ning/billing/account/api/AccountChangeNotification.java
+++ b/api/src/main/java/com/ning/billing/account/api/AccountChangeNotification.java
@@ -16,12 +16,12 @@
 
 package com.ning.billing.account.api;
 
-import com.ning.billing.util.eventbus.EventBusNotification;
+import com.ning.billing.util.eventbus.BusEvent;
 
 import java.util.List;
 import java.util.UUID;
 
-public interface AccountChangeNotification extends EventBusNotification {
+public interface AccountChangeNotification extends BusEvent {
     public UUID getAccountId();
 
     public List<ChangedField> getChangedFields();
diff --git a/api/src/main/java/com/ning/billing/account/api/AccountCreationNotification.java b/api/src/main/java/com/ning/billing/account/api/AccountCreationNotification.java
index bc2c065..22a1752 100644
--- a/api/src/main/java/com/ning/billing/account/api/AccountCreationNotification.java
+++ b/api/src/main/java/com/ning/billing/account/api/AccountCreationNotification.java
@@ -16,11 +16,11 @@
 
 package com.ning.billing.account.api;
 
-import com.ning.billing.util.eventbus.EventBusNotification;
+import com.ning.billing.util.eventbus.BusEvent;
 
 import java.util.UUID;
 
-public interface AccountCreationNotification extends EventBusNotification {
+public interface AccountCreationNotification extends BusEvent {
     public UUID getId();
 
     public AccountData getData();
diff --git a/api/src/main/java/com/ning/billing/config/InvoiceConfig.java b/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
new file mode 100644
index 0000000..a2b4270
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.config;
+
+import org.skife.config.Config;
+import org.skife.config.Default;
+
+public interface InvoiceConfig {
+
+    @Config("killbill.invoice.dao.claim.time")
+    @Default("60000")
+    public long getDaoClaimTimeMs();
+
+    @Config("killbill.invoice.dao.ready.max")
+    @Default("10")
+    public int getDaoMaxReadyEvents();
+
+    @Config("killbill.invoice.engine.notifications.sleep")
+    @Default("500")
+    public long getNotificationSleepTimeMs();
+
+    @Config("killbill.invoice.engine.events.off")
+    @Default("false")
+    public boolean isEventProcessingOff();
+}
diff --git a/api/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionTransition.java b/api/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionTransition.java
index 26ce81f..449c368 100644
--- a/api/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionTransition.java
+++ b/api/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionTransition.java
@@ -19,12 +19,12 @@ package com.ning.billing.entitlement.api.user;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.PlanPhase;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
-import com.ning.billing.util.eventbus.EventBusNotification;
+import com.ning.billing.util.eventbus.BusEvent;
 import org.joda.time.DateTime;
 
 import java.util.UUID;
 
-public interface SubscriptionTransition extends EventBusNotification {
+public interface SubscriptionTransition extends BusEvent {
 
     public enum SubscriptionTransitionType {
         MIGRATE_ENTITLEMENT,
diff --git a/api/src/main/java/com/ning/billing/invoice/api/Invoice.java b/api/src/main/java/com/ning/billing/invoice/api/Invoice.java
index f69b934..0d13363 100644
--- a/api/src/main/java/com/ning/billing/invoice/api/Invoice.java
+++ b/api/src/main/java/com/ning/billing/invoice/api/Invoice.java
@@ -25,14 +25,22 @@ import java.util.List;
 import java.util.UUID;
 
 public interface Invoice extends Entity {
-    boolean add(InvoiceItem item);
+    boolean addInvoiceItem(InvoiceItem item);
 
-    boolean add(List<InvoiceItem> items);
+    boolean addInvoiceItems(List<InvoiceItem> items);
 
-    List<InvoiceItem> getItems();
+    List<InvoiceItem> getInvoiceItems();
 
     int getNumberOfItems();
 
+    boolean addPayment(InvoicePayment payment);
+
+    boolean addPayments(List<InvoicePayment> payments);
+
+    List<InvoicePayment> getPayments();
+
+    int getNumberOfPayments();
+
     UUID getAccountId();
 
     DateTime getInvoiceDate();
@@ -47,7 +55,7 @@ public interface Invoice extends Entity {
 
     BigDecimal getTotalAmount();
 
-    BigDecimal getAmountOutstanding();
+    BigDecimal getBalance();
 
     boolean isDueForPayment(DateTime targetDate, int numberOfDays);
 }
diff --git a/api/src/main/java/com/ning/billing/invoice/api/InvoiceCreationNotification.java b/api/src/main/java/com/ning/billing/invoice/api/InvoiceCreationNotification.java
index 89c0d87..7903647 100644
--- a/api/src/main/java/com/ning/billing/invoice/api/InvoiceCreationNotification.java
+++ b/api/src/main/java/com/ning/billing/invoice/api/InvoiceCreationNotification.java
@@ -17,13 +17,13 @@
 package com.ning.billing.invoice.api;
 
 import com.ning.billing.catalog.api.Currency;
-import com.ning.billing.util.eventbus.EventBusNotification;
+import com.ning.billing.util.eventbus.BusEvent;
 import org.joda.time.DateTime;
 
 import java.math.BigDecimal;
 import java.util.UUID;
 
-public interface InvoiceCreationNotification extends EventBusNotification {
+public interface InvoiceCreationNotification extends BusEvent {
     public UUID getInvoiceId();
     public UUID getAccountId();
     public BigDecimal getAmountOwed();
diff --git a/api/src/main/java/com/ning/billing/invoice/api/InvoicePayment.java b/api/src/main/java/com/ning/billing/invoice/api/InvoicePayment.java
new file mode 100644
index 0000000..a4acadb
--- /dev/null
+++ b/api/src/main/java/com/ning/billing/invoice/api/InvoicePayment.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.api;
+
+import java.math.BigDecimal;
+import java.util.UUID;
+import org.joda.time.DateTime;
+import com.ning.billing.catalog.api.Currency;
+import com.ning.billing.util.entity.Entity;
+
+public interface InvoicePayment extends Entity {
+    UUID getInvoiceId();
+
+    DateTime getPaymentDate();
+
+    BigDecimal getAmount();
+
+    Currency getCurrency();
+}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
index d8a0100..05a063d 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
@@ -111,7 +111,7 @@ public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
                     // Not implemented yet
                     break;
                 case STANDALONE:
-                    // Not implemented yet
+                    data = createStandaloneSubscriptionMigrationData(bundleData.getId(), curSub.getCategory(), curSub.getSubscriptionCases(), now);
                     break;
                 default:
                     throw new EntitlementMigrationApiException(String.format("Unkown product type ", curSub.getCategory()));
@@ -144,6 +144,22 @@ public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
         return new SubscriptionMigrationData(subscriptionData, toEvents(subscriptionData, now, events));
     }
 
+    private SubscriptionMigrationData createStandaloneSubscriptionMigrationData(UUID bundleId, ProductCategory productCategory,
+            EntitlementSubscriptionMigrationCase [] input, DateTime now)
+    throws EntitlementMigrationApiException {
+        TimedMigration [] events = migrationAligner.getEventsMigration(input, now);
+        DateTime migrationStartDate= events[0].getEventTime();
+        List<EntitlementEvent> emptyEvents =  Collections.emptyList();
+        SubscriptionData subscriptionData = factory.createSubscription(new SubscriptionBuilder()
+            .setId(UUID.randomUUID())
+            .setBundleId(bundleId)
+            .setCategory(productCategory)
+            .setBundleStartDate(migrationStartDate)
+            .setStartDate(migrationStartDate),
+            emptyEvents);
+        return new SubscriptionMigrationData(subscriptionData, toEvents(subscriptionData, now, events));
+    }
+
     private List<EntitlementEvent> toEvents(SubscriptionData subscriptionData, DateTime now, TimedMigration [] migrationEvents) {
 
         List<EntitlementEvent> events = new ArrayList<EntitlementEvent>(migrationEvents.length);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
index 19b851d..2843ec1 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
@@ -19,6 +19,7 @@ package com.ning.billing.entitlement.api.test;
 import com.google.inject.Inject;
 import com.ning.billing.config.EntitlementConfig;
 import com.ning.billing.entitlement.engine.core.Engine;
+import com.ning.billing.entitlement.exceptions.EntitlementError;
 import com.ning.billing.util.notificationq.NotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
@@ -26,6 +27,7 @@ import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotifi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 import java.util.UUID;
 
 public class DefaultEntitlementTestApi implements EntitlementTestApi {
@@ -43,11 +45,13 @@ public class DefaultEntitlementTestApi implements EntitlementTestApi {
 
     @Override
     public void doProcessReadyEvents(UUID [] subscriptionsIds, Boolean recursive, Boolean oneEventOnly) {
+        if (recursive || oneEventOnly) {
+            throw new EntitlementError("Not implemented");
+        }
         if (config.isEventProcessingOff()) {
             log.warn("Running event processing loop");
             NotificationQueue queue = getNotificationQueue();
             queue.processReadyNotification();
-
         }
     }
 
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 7bd1124..86fd78b 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -18,7 +18,6 @@ package com.ning.billing.entitlement.engine.core;
 
 import java.util.UUID;
 
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,8 +45,8 @@ import com.ning.billing.entitlement.exceptions.EntitlementError;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
 import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
 import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.eventbus.EventBus;
-import com.ning.billing.util.eventbus.EventBus.EventBusException;
+import com.ning.billing.util.eventbus.Bus;
+import com.ning.billing.util.eventbus.Bus.EventBusException;
 import com.ning.billing.util.notificationq.NotificationConfig;
 import com.ning.billing.util.notificationq.NotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService;
@@ -59,10 +58,6 @@ public class Engine implements EventListener, EntitlementService {
     public static final String NOTIFICATION_QUEUE_NAME = "subscription-events";
     public static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
 
-    private final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
-    private final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
-    private final long NANO_TO_MS = (1000 * 1000);
-
     private final static Logger log = LoggerFactory.getLogger(Engine.class);
 
     private final Clock clock;
@@ -72,19 +67,17 @@ public class Engine implements EventListener, EntitlementService {
     private final EntitlementBillingApi billingApi;
     private final EntitlementTestApi testApi;
     private final EntitlementMigrationApi migrationApi;
-    private final EventBus eventBus;
+    private final Bus eventBus;
     private final EntitlementConfig config;
     private final NotificationQueueService notificationQueueService;
 
-    private boolean startedNotificationThread;
-    private boolean stoppedNotificationThread;
     private NotificationQueue subscritionEventQueue;
 
     @Inject
     public Engine(Clock clock, EntitlementDao dao, PlanAligner planAligner,
             EntitlementConfig config, DefaultEntitlementUserApi userApi,
             DefaultEntitlementBillingApi billingApi, DefaultEntitlementTestApi testApi,
-            DefaultEntitlementMigrationApi migrationApi, EventBus eventBus,
+            DefaultEntitlementMigrationApi migrationApi, Bus eventBus,
             NotificationQueueService notificationQueueService) {
         super();
         this.clock = clock;
@@ -108,8 +101,6 @@ public class Engine implements EventListener, EntitlementService {
     public void initialize() {
 
         try {
-            this.stoppedNotificationThread = false;
-            this.startedNotificationThread = false;
             subscritionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
                     NOTIFICATION_QUEUE_NAME,
                     new NotificationQueueHandler() {
@@ -122,21 +113,6 @@ public class Engine implements EventListener, EntitlementService {
                         processEventReady(event);
                     }
                 }
-
-                @Override
-                public void completedQueueStop() {
-                    synchronized (this) {
-                        stoppedNotificationThread = true;
-                        this.notifyAll();
-                    }
-                }
-                @Override
-                public void completedQueueStart() {
-                    synchronized (this) {
-                        startedNotificationThread = true;
-                        this.notifyAll();
-                    }
-                }
             },
             new NotificationConfig() {
                 @Override
@@ -164,16 +140,13 @@ public class Engine implements EventListener, EntitlementService {
     @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
     public void start() {
         subscritionEventQueue.startQueue();
-        waitForNotificationStartCompletion();
     }
 
     @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
     public void stop() {
         if (subscritionEventQueue != null) {
             subscritionEventQueue.stopQueue();
-            waitForNotificationStopCompletion();
-        }
-        startedNotificationThread = false;
+         }
     }
 
     @Override
@@ -218,43 +191,6 @@ public class Engine implements EventListener, EntitlementService {
         }
     }
 
-    private void waitForNotificationStartCompletion() {
-        waitForNotificationEventCompletion(true);
-    }
-
-    private void waitForNotificationStopCompletion() {
-        waitForNotificationEventCompletion(false);
-    }
-
-    private void waitForNotificationEventCompletion(boolean startEvent) {
-
-        long ini = System.nanoTime();
-        synchronized(this) {
-            do {
-                if ((startEvent ? startedNotificationThread : stoppedNotificationThread)) {
-                    break;
-                }
-                try {
-                    this.wait(NOTIFICATION_THREAD_WAIT_INCREMENT_MS);
-                } catch (InterruptedException e ) {
-                    Thread.currentThread().interrupt();
-                    throw new EntitlementError(e);
-                }
-            } while (!(startEvent ? startedNotificationThread : stoppedNotificationThread) &&
-                    (System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
-
-            if (!(startEvent ? startedNotificationThread : stoppedNotificationThread)) {
-                log.error("Could not {} notification thread in {} msec !!!",
-                        (startEvent ? "start" : "stop"),
-                        MAX_NOTIFICATION_THREAD_WAIT_MS);
-                throw new EntitlementError("Failed to start service!!");
-            }
-            log.info("Notification thread has been {} in {} ms",
-                    (startEvent ? "started" : "stopped"),
-                    (System.nanoTime() - ini) / NANO_TO_MS);
-        }
-    }
-
     private void insertNextPhaseEvent(SubscriptionData subscription) {
         try {
             DateTime now = clock.getUTCNow();
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/ApiTestListener.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/ApiTestListener.java
index 6bee471..7241611 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/ApiTestListener.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/ApiTestListener.java
@@ -19,7 +19,7 @@ package com.ning.billing.entitlement.api;
 import com.google.common.base.Joiner;
 import com.google.common.eventbus.Subscribe;
 import com.ning.billing.entitlement.api.user.SubscriptionTransition;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +45,7 @@ public class ApiTestListener {
         PHASE
     }
 
-    public ApiTestListener(EventBus eventBus) {
+    public ApiTestListener(Bus eventBus) {
         this.nextExpectedEvent = new Stack<NextEvent>();
         this.completed = false;
     }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/TestDefaultEntitlementBillingApi.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/TestDefaultEntitlementBillingApi.java
index 68ad693..21b982b 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/TestDefaultEntitlementBillingApi.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/TestDefaultEntitlementBillingApi.java
@@ -90,7 +90,7 @@ public class TestDefaultEntitlementBillingApi {
         ((DefaultCatalogService)catalogService).loadCatalog();
 	}
 	
-	@BeforeMethod
+	@BeforeMethod(alwaysRun=true)
 	public void setupEveryTime() {
 		bundles = new ArrayList<SubscriptionBundle>();
 		final SubscriptionBundle bundle = new SubscriptionBundleData( zeroId,"TestKey", oneId,  new DateTime().minusDays(4));
@@ -98,13 +98,11 @@ public class TestDefaultEntitlementBillingApi {
 		
 		
 		transitions = new ArrayList<SubscriptionTransition>();
-		
-		
 		subscriptions = new ArrayList<Subscription>();
 		
 		SubscriptionBuilder builder = new SubscriptionBuilder();
 		subscriptionStartDate = new DateTime().minusDays(3);
-		builder.setStartDate(subscriptionStartDate);
+		builder.setStartDate(subscriptionStartDate).setId(oneId);
 		subscription = new SubscriptionData(builder) {
 		    public List<SubscriptionTransition> getAllTransitions() {
 		    	return transitions;
@@ -138,13 +136,11 @@ public class TestDefaultEntitlementBillingApi {
 			public SubscriptionBundle getSubscriptionBundleFromId(UUID bundleId) {
 				return bundle;
 			}
-
-
 		};
 
 	}
 	
-	@Test
+    @Test(enabled=true, groups="fast")
 	public void testBillingEventsEmpty() {
 		EntitlementDao dao = new BrainDeadMockEntitlementDao() {
 			public List<SubscriptionBundle> getSubscriptionBundleForAccount(
@@ -164,7 +160,7 @@ public class TestDefaultEntitlementBillingApi {
 		Assert.assertEquals(events.size(), 0);
 	}
 	
-	@Test
+    @Test(enabled=true, groups="fast")
 	public void testBillingEventsNoBillingPeriod() throws CatalogApiException {
 		DateTime now = clock.getUTCNow();
 		DateTime then = now.minusDays(1);
@@ -189,7 +185,7 @@ public class TestDefaultEntitlementBillingApi {
 		checkFirstEvent(events, nextPlan, 32, oneId, now, nextPhase, ApiEventType.CREATE.toString());
 	}
 
-	@Test
+    @Test(enabled=true, groups="fast")
 	public void testBillingEventsAnual() throws CatalogApiException {
 		DateTime now = clock.getUTCNow();
 		DateTime then = now.minusDays(1);
@@ -214,11 +210,11 @@ public class TestDefaultEntitlementBillingApi {
 		checkFirstEvent(events, nextPlan, subscription.getStartDate().getDayOfMonth(), oneId, now, nextPhase, ApiEventType.CREATE.toString());
 	}
 	
-	@Test
+    @Test(enabled=true, groups="fast")
 	public void testBillingEventsMonthly() throws CatalogApiException {
 		DateTime now = clock.getUTCNow();
 		DateTime then = now.minusDays(1);
-		Plan nextPlan = catalogService.getFullCatalog().findPlan("shotgun-annual", now);
+		Plan nextPlan = catalogService.getFullCatalog().findPlan("shotgun-monthly", now);
 		PlanPhase nextPhase = nextPlan.getAllPhases()[1];
 		String nextPriceList = PriceListSet.DEFAULT_PRICELIST_NAME;
 		SubscriptionTransition t = new SubscriptionTransitionData(
@@ -239,11 +235,11 @@ public class TestDefaultEntitlementBillingApi {
 		checkFirstEvent(events, nextPlan, 32, oneId, now, nextPhase, ApiEventType.CREATE.toString());
 	}
 	
-	@Test
+    @Test(enabled=true, groups="fast")
 	public void testBillingEventsAddOn() throws CatalogApiException {
 		DateTime now = clock.getUTCNow();
 		DateTime then = now.minusDays(1);
-		Plan nextPlan = catalogService.getFullCatalog().findPlan("shotgun-annual", now);
+		Plan nextPlan = catalogService.getFullCatalog().findPlan("laser-scope-monthly", now);
 		PlanPhase nextPhase = nextPlan.getAllPhases()[0];
 		String nextPriceList = PriceListSet.DEFAULT_PRICELIST_NAME;
 		SubscriptionTransition t = new SubscriptionTransitionData(
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index 9a9d80b..adcf53f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -62,7 +62,7 @@ import com.ning.billing.lifecycle.KillbillService.ServiceException;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
 import com.ning.billing.util.eventbus.DefaultEventBusService;
-import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.BusService;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -86,7 +86,7 @@ public abstract class TestApiBase {
     protected EntitlementConfig config;
     protected EntitlementDao dao;
     protected ClockMock clock;
-    protected EventBusService busService;
+    protected BusService busService;
 
     protected AccountData accountData;
     protected Catalog catalog;
@@ -108,7 +108,7 @@ public abstract class TestApiBase {
     @AfterClass(groups={"setup"})
     public void tearDown() {
         try {
-            busService.getEventBus().register(testListener);
+            busService.getBus().register(testListener);
             ((DefaultEventBusService) busService).stopBus();
         } catch (Exception e) {
             log.warn("Failed to tearDown test properly ", e);
@@ -124,7 +124,7 @@ public abstract class TestApiBase {
 
         entitlementService = g.getInstance(EntitlementService.class);
         catalogService = g.getInstance(CatalogService.class);
-        busService = g.getInstance(EventBusService.class);
+        busService = g.getInstance(BusService.class);
         config = g.getInstance(EntitlementConfig.class);
         dao = g.getInstance(EntitlementDao.class);
         clock = (ClockMock) g.getInstance(Clock.class);
@@ -151,7 +151,7 @@ public abstract class TestApiBase {
         assertNotNull(catalog);
 
 
-        testListener = new ApiTestListener(busService.getEventBus());
+        testListener = new ApiTestListener(busService.getBus());
         entitlementApi = entitlementService.getUserApi();
         billingApi = entitlementService.getBillingApi();
         migrationApi = entitlementService.getMigrationApi();
@@ -169,7 +169,7 @@ public abstract class TestApiBase {
         clock.resetDeltaFromReality();
         ((MockEntitlementDao) dao).reset();
         try {
-            busService.getEventBus().register(testListener);
+            busService.getBus().register(testListener);
             UUID accountId = UUID.randomUUID();
             bundle = entitlementApi.createBundleForAccount(accountId, "myDefaultBundle");
         } catch (Exception e) {

invoice/pom.xml 5(+5 -0)

diff --git a/invoice/pom.xml b/invoice/pom.xml
index 76c7fdc..1001e68 100644
--- a/invoice/pom.xml
+++ b/invoice/pom.xml
@@ -99,6 +99,11 @@
             <artifactId>guice</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        
     </dependencies>
     <build>
     </build>
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index 1629a14..a79ee36 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -23,7 +23,7 @@ import com.ning.billing.invoice.api.InvoiceItem;
 import com.ning.billing.invoice.api.InvoiceUserApi;
 import com.ning.billing.invoice.dao.DefaultInvoiceDao;
 import com.ning.billing.invoice.dao.InvoiceDao;
-import com.ning.billing.util.eventbus.EventBus;
+import com.ning.billing.util.eventbus.Bus;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index 49e1124..6b8e460 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -16,32 +16,29 @@
 
 package com.ning.billing.invoice.dao;
 
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
 import com.google.inject.Inject;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceCreationNotification;
 import com.ning.billing.invoice.api.InvoiceItem;
+import com.ning.billing.invoice.api.InvoicePayment;
 import com.ning.billing.invoice.api.user.DefaultInvoiceCreationNotification;
-import com.ning.billing.util.eventbus.EventBus;
-import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.Transaction;
-import org.skife.jdbi.v2.TransactionStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.List;
-import java.util.UUID;
+import com.ning.billing.util.eventbus.Bus;
 
 public class DefaultInvoiceDao implements InvoiceDao {
     private final InvoiceSqlDao invoiceSqlDao;
     private final InvoiceItemSqlDao invoiceItemSqlDao;
 
-    private final EventBus eventBus;
-    private final static Logger log = LoggerFactory.getLogger(DefaultInvoiceDao.class);
+    private final Bus eventBus;
 
     @Inject
-    public DefaultInvoiceDao(final IDBI dbi, final EventBus eventBus) {
+    public DefaultInvoiceDao(final IDBI dbi, final Bus eventBus) {
         this.invoiceSqlDao = dbi.onDemand(InvoiceSqlDao.class);
         this.invoiceItemSqlDao = dbi.onDemand(InvoiceItemSqlDao.class);
         this.eventBus = eventBus;
@@ -49,7 +46,27 @@ public class DefaultInvoiceDao implements InvoiceDao {
 
     @Override
     public List<Invoice> getInvoicesByAccount(final String accountId) {
-        return invoiceSqlDao.getInvoicesByAccount(accountId);
+        return invoiceSqlDao.inTransaction(new Transaction<List<Invoice>, InvoiceSqlDao>() {
+             @Override
+             public List<Invoice> inTransaction(final InvoiceSqlDao invoiceDao, final TransactionStatus status) throws Exception {
+                 List<Invoice> invoices = invoiceDao.getInvoicesByAccount(accountId);
+
+                 InvoiceItemSqlDao invoiceItemDao = invoiceDao.become(InvoiceItemSqlDao.class);
+                 for (final Invoice invoice : invoices) {
+                     List<InvoiceItem> invoiceItems = invoiceItemDao.getInvoiceItemsByInvoice(invoice.getId().toString());
+                     invoice.addInvoiceItems(invoiceItems);
+                 }
+
+                 InvoicePaymentSqlDao invoicePaymentSqlDao = invoiceDao.become(InvoicePaymentSqlDao.class);
+                 for (final Invoice invoice : invoices) {
+                     String invoiceId = invoice.getId().toString();
+                     List<InvoicePayment> invoicePayments = invoicePaymentSqlDao.getPaymentsForInvoice(invoiceId);
+                     invoice.addPayments(invoicePayments);
+                 }
+
+                 return invoices;
+             }
+        });
     }
 
     @Override
@@ -65,9 +82,16 @@ public class DefaultInvoiceDao implements InvoiceDao {
                  List<Invoice> invoices = invoiceDao.get();
 
                  InvoiceItemSqlDao invoiceItemDao = invoiceDao.become(InvoiceItemSqlDao.class);
-                 for (Invoice invoice : invoices) {
+                 for (final Invoice invoice : invoices) {
                      List<InvoiceItem> invoiceItems = invoiceItemDao.getInvoiceItemsByInvoice(invoice.getId().toString());
-                     invoice.add(invoiceItems);
+                     invoice.addInvoiceItems(invoiceItems);
+                 }
+
+                 InvoicePaymentSqlDao invoicePaymentSqlDao = invoiceDao.become(InvoicePaymentSqlDao.class);
+                 for (final Invoice invoice : invoices) {
+                     String invoiceId = invoice.getId().toString();
+                     List<InvoicePayment> invoicePayments = invoicePaymentSqlDao.getPaymentsForInvoice(invoiceId);
+                     invoice.addPayments(invoicePayments);
                  }
 
                  return invoices;
@@ -85,7 +109,11 @@ public class DefaultInvoiceDao implements InvoiceDao {
                  if (invoice != null) {
                      InvoiceItemSqlDao invoiceItemDao = invoiceDao.become(InvoiceItemSqlDao.class);
                      List<InvoiceItem> invoiceItems = invoiceItemDao.getInvoiceItemsByInvoice(invoiceId);
-                     invoice.add(invoiceItems);
+                     invoice.addInvoiceItems(invoiceItems);
+
+                     InvoicePaymentSqlDao invoicePaymentSqlDao = invoiceDao.become(InvoicePaymentSqlDao.class);
+                     List<InvoicePayment> invoicePayments = invoicePaymentSqlDao.getPaymentsForInvoice(invoiceId);
+                     invoice.addPayments(invoicePayments);
                  }
 
                  return invoice;
@@ -95,62 +123,72 @@ public class DefaultInvoiceDao implements InvoiceDao {
 
     @Override
     public void create(final Invoice invoice) {
-         invoiceSqlDao.inTransaction(new Transaction<Void, InvoiceSqlDao>() {
-             @Override
-             public Void inTransaction(final InvoiceSqlDao invoiceDao, final TransactionStatus status) throws Exception {
+        invoiceSqlDao.inTransaction(new Transaction<Void, InvoiceSqlDao>() {
+            @Override
+            public Void inTransaction(final InvoiceSqlDao invoiceDao, final TransactionStatus status) throws Exception {
                 Invoice currentInvoice = invoiceDao.getById(invoice.getId().toString());
 
                 if (currentInvoice == null) {
                     invoiceDao.create(invoice);
 
-                    List<InvoiceItem> invoiceItems = invoice.getItems();
+                    List<InvoiceItem> invoiceItems = invoice.getInvoiceItems();
                     InvoiceItemSqlDao invoiceItemDao = invoiceDao.become(InvoiceItemSqlDao.class);
                     invoiceItemDao.create(invoiceItems);
 
+                    List<InvoicePayment> invoicePayments = invoice.getPayments();
+                    InvoicePaymentSqlDao invoicePaymentSqlDao = invoiceDao.become(InvoicePaymentSqlDao.class);
+                    invoicePaymentSqlDao.create(invoicePayments);
+
                     InvoiceCreationNotification event;
                     event = new DefaultInvoiceCreationNotification(invoice.getId(), invoice.getAccountId(),
-                                                                  invoice.getAmountOutstanding(), invoice.getCurrency(),
+                                                                  invoice.getBalance(), invoice.getCurrency(),
                                                                   invoice.getInvoiceDate());
                     eventBus.post(event);
-
-
                 }
 
                 return null;
-             }
-         });
+            }
+        });
     }
 
     @Override
     public List<Invoice> getInvoicesBySubscription(final String subscriptionId) {
         return invoiceSqlDao.inTransaction(new Transaction<List<Invoice>, InvoiceSqlDao>() {
-             @Override
-             public List<Invoice> inTransaction(InvoiceSqlDao invoiceDao, TransactionStatus status) throws Exception {
-                 List<Invoice> invoices = invoiceDao.getInvoicesBySubscription(subscriptionId);
+            @Override
+            public List<Invoice> inTransaction(final InvoiceSqlDao invoiceDao, final TransactionStatus status) throws Exception {
+                List<Invoice> invoices = invoiceDao.getInvoicesBySubscription(subscriptionId);
+
+                InvoiceItemSqlDao invoiceItemDao = invoiceDao.become(InvoiceItemSqlDao.class);
+                for (final Invoice invoice : invoices) {
+                    List<InvoiceItem> invoiceItems = invoiceItemDao.getInvoiceItemsByInvoice(invoice.getId().toString());
+                    invoice.addInvoiceItems(invoiceItems);
+                }
 
-                 InvoiceItemSqlDao invoiceItemDao = invoiceDao.become(InvoiceItemSqlDao.class);
-                 for (Invoice invoice : invoices) {
-                     List<InvoiceItem> invoiceItems = invoiceItemDao.getInvoiceItemsByInvoice(invoice.getId().toString());
-                     invoice.add(invoiceItems);
-                 }
+                InvoicePaymentSqlDao invoicePaymentSqlDao = invoiceDao.become(InvoicePaymentSqlDao.class);
+                for (final Invoice invoice : invoices) {
+                    String invoiceId = invoice.getId().toString();
+                    List<InvoicePayment> invoicePayments = invoicePaymentSqlDao.getPaymentsForInvoice(invoiceId);
+                    invoice.addPayments(invoicePayments);
+                }
 
-                 return invoices;
-             }
+                return invoices;
+            }
         });
     }
 
     @Override
-    public List<UUID> getInvoicesForPayment(Date targetDate, int numberOfDays) {
+    public List<UUID> getInvoicesForPayment(final Date targetDate, final int numberOfDays) {
         return invoiceSqlDao.getInvoicesForPayment(targetDate, numberOfDays);
     }
 
     @Override
-    public void notifySuccessfulPayment(String invoiceId, BigDecimal paymentAmount, String currency, String paymentId, Date paymentDate) {
+    public void notifySuccessfulPayment(final String invoiceId, final BigDecimal paymentAmount,
+                                        final String currency, final String paymentId, final Date paymentDate) {
         invoiceSqlDao.notifySuccessfulPayment(invoiceId, paymentAmount, currency, paymentId, paymentDate);
     }
 
     @Override
-    public void notifyFailedPayment(String invoiceId, String paymentId, Date paymentAttemptDate) {
+    public void notifyFailedPayment(final String invoiceId, final String paymentId, final Date paymentAttemptDate) {
         invoiceSqlDao.notifyFailedPayment(invoiceId, paymentId, paymentAttemptDate);
     }
 
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.java
new file mode 100644
index 0000000..2efc98d
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.dao;
+
+import java.lang.annotation.Annotation;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.UUID;
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.Binder;
+import org.skife.jdbi.v2.sqlobject.BinderFactory;
+import org.skife.jdbi.v2.sqlobject.BindingAnnotation;
+import org.skife.jdbi.v2.sqlobject.SqlBatch;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+import com.ning.billing.catalog.api.Currency;
+import com.ning.billing.invoice.api.InvoicePayment;
+import com.ning.billing.util.entity.EntityDao;
+
+@ExternalizedSqlViaStringTemplate3
+@RegisterMapper(InvoicePaymentSqlDao.InvoicePaymentMapper.class)
+public interface InvoicePaymentSqlDao extends EntityDao<InvoicePayment> {
+    @Override
+    @SqlUpdate
+    public void create(@InvoicePaymentBinder final InvoicePayment invoicePayment);
+
+    @SqlBatch
+    void create(@InvoicePaymentBinder final List<InvoicePayment> items);
+
+    @Override
+    @SqlUpdate
+    public void update(@InvoicePaymentBinder final InvoicePayment invoicePayment);
+
+    @SqlQuery
+    public List<InvoicePayment> getPaymentsForInvoice(@Bind("invoiceId") final String invoiceId);
+
+    public static class InvoicePaymentMapper implements ResultSetMapper<InvoicePayment> {
+        @Override
+        public InvoicePayment map(int index, ResultSet result, StatementContext context) throws SQLException {
+            final UUID id = UUID.fromString(result.getString("payment_id"));
+            final UUID invoiceId = UUID.fromString(result.getString("invoice_id"));
+            final DateTime paymentDate = new DateTime(result.getTimestamp("payment_date"));
+            final BigDecimal amount = result.getBigDecimal("amount");
+            final String currencyString = result.getString("currency");
+            final Currency currency = (currencyString == null) ? null : Currency.valueOf(currencyString);
+
+            return new InvoicePayment() {
+                @Override
+                public UUID getId() {
+                    return id;
+                }
+                @Override
+                public UUID getInvoiceId() {
+                    return invoiceId;
+                }
+                @Override
+                public DateTime getPaymentDate() {
+                    return paymentDate;
+                }
+                @Override
+                public BigDecimal getAmount() {
+                    return amount;
+                }
+                @Override
+                public Currency getCurrency() {
+                    return currency;
+                }
+            };
+        }
+    }
+
+    @BindingAnnotation(InvoicePaymentBinder.InvoicePaymentBinderFactory.class)
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target({ElementType.PARAMETER})
+    public @interface InvoicePaymentBinder {
+        public static class InvoicePaymentBinderFactory implements BinderFactory {
+            public Binder build(Annotation annotation) {
+                return new Binder<InvoicePaymentBinder, InvoicePayment>() {
+                    public void bind(SQLStatement q, InvoicePaymentBinder bind, InvoicePayment payment) {
+                        q.bind("invoiceId", payment.getInvoiceId().toString());
+                        q.bind("paymentId", payment.getId().toString());
+                        q.bind("paymentDate", payment.getAmount());
+                        q.bind("amount", payment.getAmount());
+                        Currency currency = payment.getCurrency();
+                        q.bind("currency", (currency == null) ? null : currency.toString());
+                    }
+                };
+            }
+        }
+    }
+}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceSqlDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceSqlDao.java
index 9ede145..19ce07b 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceSqlDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceSqlDao.java
@@ -19,6 +19,7 @@ package com.ning.billing.invoice.dao;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceItem;
+import com.ning.billing.invoice.api.InvoicePayment;
 import com.ning.billing.invoice.model.DefaultInvoice;
 import com.ning.billing.util.UuidMapper;
 import com.ning.billing.util.entity.EntityDao;
@@ -93,10 +94,6 @@ public interface InvoiceSqlDao extends EntityDao<Invoice>, Transactional<Invoice
                         q.bind("accountId", invoice.getAccountId().toString());
                         q.bind("invoiceDate", invoice.getInvoiceDate().toDate());
                         q.bind("targetDate", invoice.getTargetDate().toDate());
-                        q.bind("amountPaid", invoice.getAmountPaid());
-                        q.bind("amountOutstanding", invoice.getAmountOutstanding());
-                        DateTime last_payment_date = invoice.getLastPaymentAttempt();
-                        q.bind("lastPaymentAttempt", last_payment_date == null ? null : last_payment_date.toDate());
                         q.bind("currency", invoice.getCurrency().toString());
                     }
                 };
@@ -111,15 +108,9 @@ public interface InvoiceSqlDao extends EntityDao<Invoice>, Transactional<Invoice
             UUID accountId = UUID.fromString(result.getString("account_id"));
             DateTime invoiceDate = new DateTime(result.getTimestamp("invoice_date"));
             DateTime targetDate = new DateTime(result.getTimestamp("target_date"));
-            BigDecimal amountPaid = result.getBigDecimal("amount_paid");
-            if (amountPaid == null) {
-                amountPaid = BigDecimal.ZERO;
-            }
-            Timestamp lastPaymentAttemptTimeStamp = result.getTimestamp("last_payment_attempt");
-            DateTime lastPaymentAttempt = lastPaymentAttemptTimeStamp == null ? null : new DateTime(lastPaymentAttemptTimeStamp);
             Currency currency = Currency.valueOf(result.getString("currency"));
 
-            return new DefaultInvoice(id, accountId, invoiceDate, targetDate, currency, lastPaymentAttempt, amountPaid, new ArrayList<InvoiceItem>());
+            return new DefaultInvoice(id, accountId, invoiceDate, targetDate, currency);
         }
     }
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoice.java b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoice.java
index 97f82e0..ae10d41 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoice.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoice.java
@@ -19,6 +19,7 @@ package com.ning.billing.invoice.model;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceItem;
+import com.ning.billing.invoice.api.InvoicePayment;
 import com.ning.billing.util.clock.DefaultClock;
 import org.joda.time.DateTime;
 
@@ -28,55 +29,65 @@ import java.util.List;
 import java.util.UUID;
 
 public class DefaultInvoice implements Invoice {
-    private final InvoiceItemList items = new InvoiceItemList();
+    private final InvoiceItemList invoiceItems = new InvoiceItemList();
+    private final List<InvoicePayment> payments = new ArrayList<InvoicePayment>();
     private final UUID id;
     private UUID accountId;
     private final DateTime invoiceDate;
     private final DateTime targetDate;
     private Currency currency;
-    private BigDecimal amountPaid;
-    private DateTime lastPaymentAttempt;
 
     public DefaultInvoice(UUID accountId, DateTime targetDate, Currency currency) {
-        this(UUID.randomUUID(), accountId, new DefaultClock().getUTCNow(), targetDate, currency, null, BigDecimal.ZERO, new ArrayList<InvoiceItem>());
+        this(UUID.randomUUID(), accountId, new DefaultClock().getUTCNow(), targetDate, currency);
     }
 
     public DefaultInvoice(UUID invoiceId, UUID accountId, DateTime invoiceDate, DateTime targetDate,
-                          Currency currency, DateTime lastPaymentAttempt, BigDecimal amountPaid) {
-        this(invoiceId, accountId, invoiceDate, targetDate, currency, lastPaymentAttempt, amountPaid, new ArrayList<InvoiceItem>());
-    }
-
-    public DefaultInvoice(UUID invoiceId, UUID accountId, DateTime invoiceDate, DateTime targetDate,
-                          Currency currency, DateTime lastPaymentAttempt, BigDecimal amountPaid,
-                          List<InvoiceItem> invoiceItems) {
+                          Currency currency) {
         this.id = invoiceId;
         this.accountId = accountId;
         this.invoiceDate = invoiceDate;
         this.targetDate = targetDate;
         this.currency = currency;
-        this.lastPaymentAttempt= lastPaymentAttempt;
-        this.amountPaid = amountPaid;
-        this.items.addAll(invoiceItems);
     }
 
     @Override
-    public boolean add(InvoiceItem item) {
-        return items.add(item);
+    public boolean addInvoiceItem(final InvoiceItem item) {
+        return invoiceItems.add(item);
     }
 
     @Override
-    public boolean add(List<InvoiceItem> items) {
-        return this.items.addAll(items);
+    public boolean addInvoiceItems(final List<InvoiceItem> items) {
+        return this.invoiceItems.addAll(items);
     }
 
     @Override
-    public List<InvoiceItem> getItems() {
-        return items;
+    public List<InvoiceItem> getInvoiceItems() {
+        return invoiceItems;
     }
 
     @Override
     public int getNumberOfItems() {
-        return items.size();
+        return invoiceItems.size();
+    }
+
+    @Override
+    public boolean addPayment(final InvoicePayment payment) {
+        return payments.add(payment);
+    }
+
+    @Override
+    public boolean addPayments(final List<InvoicePayment> payments) {
+        return this.payments.addAll(payments);
+    }
+
+    @Override
+    public List<InvoicePayment> getPayments() {
+        return payments;
+    }
+
+    @Override
+    public int getNumberOfPayments() {
+        return payments.size();
     }
 
     @Override
@@ -106,21 +117,40 @@ public class DefaultInvoice implements Invoice {
 
     @Override
     public DateTime getLastPaymentAttempt() {
+        DateTime lastPaymentAttempt = null;
+
+        for (final InvoicePayment paymentAttempt : payments) {
+            DateTime paymentAttemptDate = paymentAttempt.getPaymentDate();
+            if (lastPaymentAttempt == null) {
+                lastPaymentAttempt = paymentAttemptDate;
+            }
+
+            if (lastPaymentAttempt.isBefore(paymentAttemptDate)) {
+                lastPaymentAttempt = paymentAttemptDate;
+            }
+        }
+
         return lastPaymentAttempt;
     }
 
     @Override
     public BigDecimal getAmountPaid() {
+        BigDecimal amountPaid = BigDecimal.ZERO;
+        for (final InvoicePayment payment : payments) {
+            if (payment.getAmount() != null) {
+                amountPaid = amountPaid.add(payment.getAmount());
+            }
+        }
         return amountPaid;
     }
 
     @Override
     public BigDecimal getTotalAmount() {
-        return items.getTotalAmount();
+        return invoiceItems.getTotalAmount();
     }
 
     @Override
-    public BigDecimal getAmountOutstanding() {
+    public BigDecimal getBalance() {
         return getTotalAmount().subtract(getAmountPaid());
     }
 
@@ -130,6 +160,7 @@ public class DefaultInvoice implements Invoice {
             return false;
         }
 
+        DateTime lastPaymentAttempt = getLastPaymentAttempt();
         if (lastPaymentAttempt == null) {
             return true;
         }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java
index 575604e..cb30d15 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java
@@ -41,21 +41,24 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
     private static final Logger log = LoggerFactory.getLogger(DefaultInvoiceGenerator.class);
 
     @Override
-    public Invoice generateInvoice(final UUID accountId, final BillingEventSet events, final InvoiceItemList existingItems, final DateTime targetDate, final Currency targetCurrency) {
+    public Invoice generateInvoice(final UUID accountId, final BillingEventSet events,
+                                   final InvoiceItemList existingItems, final DateTime targetDate,
+                                   final Currency targetCurrency) {
         if (events == null) {return new DefaultInvoice(accountId, targetDate, targetCurrency);}
         if (events.size() == 0) {return new DefaultInvoice(accountId, targetDate, targetCurrency);}
 
         DefaultInvoice invoice = new DefaultInvoice(accountId, targetDate, targetCurrency);
         InvoiceItemList currentItems = generateInvoiceItems(events, invoice.getId(), targetDate, targetCurrency);
         InvoiceItemList itemsToPost = reconcileInvoiceItems(invoice.getId(), currentItems, existingItems);
-        invoice.add(itemsToPost);
+        invoice.addInvoiceItems(itemsToPost);
 
         return invoice;
     }
 
-    private InvoiceItemList reconcileInvoiceItems(final UUID invoiceId, final InvoiceItemList currentInvoiceItems, final InvoiceItemList existingInvoiceItems) {
+    private InvoiceItemList reconcileInvoiceItems(final UUID invoiceId, final InvoiceItemList currentInvoiceItems,
+                                                  final InvoiceItemList existingInvoiceItems) {
         InvoiceItemList currentItems = new InvoiceItemList();
-        for (InvoiceItem item : currentInvoiceItems) {
+        for (final InvoiceItem item : currentInvoiceItems) {
             currentItems.add(new DefaultInvoiceItem(item, invoiceId));
         }
 
@@ -66,9 +69,9 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
 
         List<InvoiceItem> existingItemsToRemove = new ArrayList<InvoiceItem>();
 
-        for (InvoiceItem currentItem : currentItems) {
+        for (final InvoiceItem currentItem : currentItems) {
             // see if there are any existing items that are covered by the current item
-            for (InvoiceItem existingItem : existingItems) {
+            for (final InvoiceItem existingItem : existingItems) {
                 if (currentItem.duplicates(existingItem)) {
                     currentItem.subtract(existingItem);
                     existingItemsToRemove.add(existingItem);
@@ -85,14 +88,15 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
         currentItems.removeZeroDollarItems();
 
         // add existing items that aren't covered by current items as credit items
-        for (InvoiceItem existingItem : existingItems) {
+        for (final InvoiceItem existingItem : existingItems) {
             currentItems.add(existingItem.asCredit(invoiceId));
         }
 
         return currentItems;
     }
 
-    private InvoiceItemList generateInvoiceItems(BillingEventSet events, UUID invoiceId, DateTime targetDate, Currency targetCurrency) {
+    private InvoiceItemList generateInvoiceItems(final BillingEventSet events, final UUID invoiceId,
+                                                 final DateTime targetDate, final Currency targetCurrency) {
         InvoiceItemList items = new InvoiceItemList();
 
         // sort events; this relies on the sort order being by subscription id then start date
@@ -119,7 +123,8 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
         return items;
     }
 
-    private void processEvent(UUID invoiceId, BillingEvent event, List<InvoiceItem> items, DateTime targetDate, Currency targetCurrency) {
+    private void processEvent(final UUID invoiceId, final BillingEvent event, final List<InvoiceItem> items,
+                              final DateTime targetDate, final Currency targetCurrency) {
     	try {
     		//TODO: Jeff getPrice() -> getRecurringPrice()
     		BigDecimal rate = event.getRecurringPrice(targetCurrency);
@@ -135,7 +140,8 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
         }
     }
 
-    private void processEvents(UUID invoiceId, BillingEvent firstEvent, BillingEvent secondEvent, List<InvoiceItem> items, DateTime targetDate, Currency targetCurrency) {
+    private void processEvents(final UUID invoiceId, final BillingEvent firstEvent, final BillingEvent secondEvent,
+                               final List<InvoiceItem> items, final DateTime targetDate, final Currency targetCurrency) {
     	//TODO: Jeff getPrice() -> getRecurringPrice()
     	try {
     		BigDecimal rate = firstEvent.getRecurringPrice(targetCurrency);
@@ -151,14 +157,17 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
         }
     }
 
-    private void addInvoiceItem(UUID invoiceId, List<InvoiceItem> items, BillingEvent event, DateTime billThroughDate, BigDecimal amount, BigDecimal rate, Currency currency) {
+    private void addInvoiceItem(final UUID invoiceId, final List<InvoiceItem> items, final BillingEvent event,
+                                final DateTime billThroughDate, final BigDecimal amount, final BigDecimal rate,
+                                final Currency currency) {
         if (!(amount.compareTo(BigDecimal.ZERO) == 0)) {
             DefaultInvoiceItem item = new DefaultInvoiceItem(invoiceId, event.getSubscription().getId(), event.getEffectiveDate(), billThroughDate, event.getDescription(), amount, rate, currency);
             items.add(item);
         }
     }
 
-    private BigDecimal calculateInvoiceItemAmount(BillingEvent event, DateTime targetDate, BigDecimal rate){
+    private BigDecimal calculateInvoiceItemAmount(final BillingEvent event, final DateTime targetDate,
+                                                  final BigDecimal rate){
         BillingMode billingMode = getBillingMode(event.getBillingMode());
         DateTime startDate = event.getEffectiveDate();
         int billingCycleDay = event.getBillCycleDay();
@@ -174,7 +183,8 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
         }
     }
 
-    private BigDecimal calculateInvoiceItemAmount(BillingEvent firstEvent, BillingEvent secondEvent, DateTime targetDate, BigDecimal rate) {
+    private BigDecimal calculateInvoiceItemAmount(final BillingEvent firstEvent, final BillingEvent secondEvent,
+                                                  final DateTime targetDate, final BigDecimal rate) {
         BillingMode billingMode = getBillingMode(firstEvent.getBillingMode());
         DateTime startDate = firstEvent.getEffectiveDate();
         int billingCycleDay = firstEvent.getBillCycleDay();
@@ -192,7 +202,7 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
         }
     }
 
-    private BillingMode getBillingMode(BillingModeType billingModeType) {
+    private BillingMode getBillingMode(final BillingModeType billingModeType) {
         switch (billingModeType) {
             case IN_ADVANCE:
                 return new InAdvanceBillingMode();
diff --git a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoicePayment.java b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoicePayment.java
new file mode 100644
index 0000000..ef1ed29
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoicePayment.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.model;
+
+import java.math.BigDecimal;
+import java.util.UUID;
+import org.joda.time.DateTime;
+import com.ning.billing.catalog.api.Currency;
+import com.ning.billing.invoice.api.InvoicePayment;
+import com.ning.billing.util.entity.EntityBase;
+
+public class DefaultInvoicePayment extends EntityBase<InvoicePayment> implements InvoicePayment {
+    private final UUID invoiceId;
+    private final DateTime paymentDate;
+    private final BigDecimal amount;
+    private final Currency currency;
+
+    public DefaultInvoicePayment(final UUID invoiceId, final DateTime paymentDate,
+                                 final BigDecimal amount, final Currency currency) {
+        this(UUID.randomUUID(), invoiceId, paymentDate, amount, currency);
+    }
+
+    public DefaultInvoicePayment(final UUID id, final UUID invoiceId, final DateTime paymentDate,
+                                 final BigDecimal amount, final Currency currency) {
+        super(id);
+        this.amount = amount;
+        this.invoiceId = invoiceId;
+        this.paymentDate = paymentDate;
+        this.currency = currency;
+    }
+
+    @Override
+    public UUID getInvoiceId() {
+        return invoiceId;
+    }
+
+    @Override
+    public DateTime getPaymentDate() {
+        return paymentDate;
+    }
+
+    @Override
+    public BigDecimal getAmount() {
+        return amount;
+    }
+
+    @Override
+    public Currency getCurrency() {
+        return currency;
+    }
+}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
new file mode 100644
index 0000000..ebf0b5b
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.config.InvoiceConfig;
+import com.ning.billing.lifecycle.KillbillService;
+import com.ning.billing.lifecycle.LifecycleHandlerType;
+import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
+import com.ning.billing.util.eventbus.Bus;
+import com.ning.billing.util.eventbus.Bus.EventBusException;
+import com.ning.billing.util.notificationq.NotificationConfig;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotficationQueueAlreadyExists;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public class DefaultNextBillingDateNotifier implements KillbillService, NextBillingDateNotifier {
+    private final static Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+
+    private static final String NEXT_BILLING_DATE_NOTIFIER_SERVICE_NAME = "next-billing-date-notifier";
+    private static final String NEXT_BILLING_DATE_NOTIFIER_QUEUE = "next-billing-date-queue";
+    
+    private final Bus eventBus;
+    private final NotificationQueueService notificationQueueService;
+    private NotificationQueue nextBillingQueue;
+	private InvoiceConfig config;
+
+	@Inject
+	public DefaultNextBillingDateNotifier(NotificationQueueService notificationQueueService, Bus eventBus, InvoiceConfig config){
+		this.notificationQueueService = notificationQueueService;
+		this.config = config;
+		this.eventBus = eventBus;
+	}
+	
+	
+	@LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+    public void initialize() {
+		try {
+            nextBillingQueue = notificationQueueService.createNotificationQueue(NEXT_BILLING_DATE_NOTIFIER_SERVICE_NAME,
+            		NEXT_BILLING_DATE_NOTIFIER_QUEUE,
+                    new NotificationQueueHandler() {
+                @Override
+                public void handleReadyNotification(String notificationKey) {
+                	UUID subscriptionId;
+                	try {
+                		subscriptionId = UUID.fromString(notificationKey);
+                	} catch (IllegalArgumentException e) {
+                		log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID",e);
+                		return;
+                	}
+                    
+                    processEvent(subscriptionId);
+                }
+            },
+            new NotificationConfig() {
+                @Override
+                public boolean isNotificationProcessingOff() {
+                    return config.isEventProcessingOff();
+                }
+                @Override
+                public long getNotificationSleepTimeMs() {
+                    return config.getNotificationSleepTimeMs();
+                }
+                @Override
+                public int getDaoMaxReadyEvents() {
+                    return config.getDaoMaxReadyEvents();
+                }
+                @Override
+                public long getDaoClaimTimeMs() {
+                    return config.getDaoMaxReadyEvents();
+                }
+            });
+        } catch (NotficationQueueAlreadyExists e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+    public void start() {
+    	nextBillingQueue.startQueue();
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+    public void stop() {
+        if (nextBillingQueue != null) {
+        	nextBillingQueue.stopQueue();
+        }
+    }
+     
+	@Override
+	public String getName() {
+		return NEXT_BILLING_DATE_NOTIFIER_SERVICE_NAME;
+	}
+	
+    private void processEvent(UUID subscriptionId) {
+        try {
+            eventBus.post(new NextBillingDateEvent(subscriptionId));
+        } catch (EventBusException e) {
+            log.error("Failed to post entitlement event " + subscriptionId, e);
+        }
+    }
+    
+    @Override
+    public void insertNextBillingNotification(Transmogrifier transactionalDao, final UUID subscriptionId, DateTime futureNotificationTime) {
+    	nextBillingQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NotificationKey(){
+    		public String toString() {
+    			return subscriptionId.toString();
+    		}
+    	});
+    }
+
+}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateEvent.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateEvent.java
new file mode 100644
index 0000000..59ec8a2
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import java.util.UUID;
+
+import com.ning.billing.util.eventbus.BusEvent;
+
+public class NextBillingDateEvent implements BusEvent{
+	private final UUID subscriptionId;
+
+	public NextBillingDateEvent(UUID subscriptionId) {
+		super();
+		this.subscriptionId = subscriptionId;
+	}
+
+	public UUID getSubscriptionId() {
+		return subscriptionId;
+	}
+}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
new file mode 100644
index 0000000..5371824
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+public interface NextBillingDateNotifier {
+
+	void insertNextBillingNotification(Transmogrifier transactionalDao,
+			UUID subscriptionId, DateTime futureNotificationTime);
+
+}
diff --git a/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.sql.stg b/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.sql.stg
new file mode 100644
index 0000000..ef91849
--- /dev/null
+++ b/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoicePaymentSqlDao.sql.stg
@@ -0,0 +1,34 @@
+group InvoicePayment;
+
+create() ::= <<
+  INSERT INTO invoice_payments(invoice_id, payment_id, payment_date, amount, currency)
+  VALUES(:invoiceId, :paymentId, :paymentDate, :amount, :currency)
+>>
+
+update() ::= <<
+  UPDATE invoice_payments
+  SET payment_date = :paymentDate, amount = :amount, currency = :currency
+  WHERE invoice_id = :invoiceId, payment_id = :paymentId
+>>
+
+getById() ::= <<
+  SELECT invoice_id, payment_id, payment_date, amount, currency
+  FROM invoice_payments
+  WHERE payment_id = :id
+>>
+
+get() ::= <<
+  SELECT invoice_id, payment_id, payment_date, amount, currency
+  FROM invoice_payments
+>>
+
+getPaymentsForInvoice() ::= <<
+  SELECT invoice_id, payment_id, payment_date, amount, currency
+  FROM invoice_payments
+  WHERE invoice_id = :invoiceId
+>>
+
+test() ::= <<
+  SELECT 1 FROM invoice_payments;
+>>
+;
\ No newline at end of file
diff --git a/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoiceSqlDao.sql.stg b/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoiceSqlDao.sql.stg
index 8766e18..f7e94e1 100644
--- a/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoiceSqlDao.sql.stg
+++ b/invoice/src/main/resources/com/ning/billing/invoice/dao/InvoiceSqlDao.sql.stg
@@ -1,8 +1,7 @@
 group InvoiceDao;
 
 get() ::= <<
-  SELECT i.id, i.account_id, i.invoice_date, i.target_date, i.currency, SUM(ii.amount) AS amount,
-         SUM(ip.amount) AS amount_paid, MAX(ip.payment_date) AS last_payment_attempt
+  SELECT i.id, i.account_id, i.invoice_date, i.target_date, i.currency, SUM(ii.amount) AS amount
   FROM invoices i
   LEFT JOIN invoice_payments ip ON ip.invoice_id = i.id
   LEFT JOIN invoice_items ii ON ii.invoice_id = i.id
@@ -11,8 +10,7 @@ get() ::= <<
 >>
 
 getInvoicesByAccount() ::= <<
-  SELECT i.id, i.account_id, i.invoice_date, i.target_date, i.currency, SUM(ii.amount) AS amount,
-         SUM(ip.amount) AS amount_paid, MAX(ip.payment_date) AS last_payment_attempt
+  SELECT i.id, i.account_id, i.invoice_date, i.target_date, i.currency, SUM(ii.amount) AS amount
   FROM invoices i
   LEFT JOIN invoice_payments ip ON ip.invoice_id = i.id
   LEFT JOIN invoice_items ii ON ii.invoice_id = i.id
@@ -22,8 +20,7 @@ getInvoicesByAccount() ::= <<
 >>
 
 getInvoicesBySubscription() ::= <<
-  SELECT i.id, i.account_id, i.invoice_date, i.target_date, i.currency, SUM(ii.amount) AS amount,
-         SUM(ip.amount) AS amount_paid, MAX(ip.payment_date) AS last_payment_attempt
+  SELECT i.id, i.account_id, i.invoice_date, i.target_date, i.currency, SUM(ii.amount) AS amount
   FROM invoices i
   LEFT JOIN invoice_items ii ON i.id = ii.invoice_id
   LEFT JOIN invoice_payments ip ON ip.invoice_id = i.id
@@ -43,8 +40,7 @@ getInvoicesForPayment() ::= <<
 >>
 
 getById() ::= <<
-  SELECT i.id, i.account_id, i.invoice_date, i.target_date, i.currency, SUM(ii.amount) AS amount,
-         SUM(ip.amount) AS amount_paid, MAX(ip.payment_date) AS last_payment_attempt
+  SELECT i.id, i.account_id, i.invoice_date, i.target_date, i.currency, SUM(ii.amount) AS amount
   FROM invoices i
   LEFT JOIN invoice_items ii ON i.id = ii.invoice_id
   LEFT JOIN invoice_payments ip ON ip.invoice_id = i.id
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
index 268aaf5..7d41ed8 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
@@ -24,7 +24,7 @@ import com.google.inject.Injector;
 import com.google.inject.Stage;
 import com.ning.billing.invoice.glue.InvoiceModuleMock;
 import com.ning.billing.util.eventbus.DefaultEventBusService;
-import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.BusService;
 
 import static org.testng.Assert.fail;
 
@@ -47,7 +47,7 @@ public abstract class InvoiceDaoTestBase {
 
             invoiceItemDao = module.getInvoiceItemDao();
 
-            EventBusService busService = injector.getInstance(EventBusService.class);
+            BusService busService = injector.getInstance(BusService.class);
             ((DefaultEventBusService) busService).startBus();
         }
         catch (Throwable t) {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTests.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTests.java
index 3fe72ed..854d945 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTests.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import org.joda.time.DateTime;
-import org.joda.time.Days;
 import org.testng.annotations.Test;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.invoice.api.Invoice;
@@ -67,15 +66,15 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         DateTime startDate = new DateTime(2010, 1, 1, 0, 0, 0, 0);
         DateTime endDate = new DateTime(2010, 4, 1, 0, 0, 0, 0);
         InvoiceItem invoiceItem = new DefaultInvoiceItem(invoiceId, subscriptionId, startDate, endDate, "test", new BigDecimal("21.00"), new BigDecimal("7.00"), Currency.USD);
-        invoice.add(invoiceItem);
+        invoice.addInvoiceItem(invoiceItem);
         invoiceDao.create(invoice);
 
         Invoice savedInvoice = invoiceDao.getById(invoiceId.toString());
         assertNotNull(savedInvoice);
         assertEquals(savedInvoice.getTotalAmount().compareTo(new BigDecimal("21.00")), 0);
-        assertEquals(savedInvoice.getAmountOutstanding().compareTo(new BigDecimal("21.00")), 0);
+        assertEquals(savedInvoice.getBalance().compareTo(new BigDecimal("21.00")), 0);
         assertEquals(savedInvoice.getAmountPaid(), BigDecimal.ZERO);
-        assertEquals(savedInvoice.getItems().size(), 1);
+        assertEquals(savedInvoice.getInvoiceItems().size(), 1);
 
         BigDecimal paymentAmount = new BigDecimal("11.00");
         String paymentId = UUID.randomUUID().toString();
@@ -83,9 +82,9 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
 
         Invoice retrievedInvoice = invoiceDao.getById(invoiceId.toString());
         assertNotNull(retrievedInvoice);
-        assertEquals(retrievedInvoice.getItems().size(), 1);
+        assertEquals(retrievedInvoice.getInvoiceItems().size(), 1);
         assertEquals(retrievedInvoice.getTotalAmount().compareTo(new BigDecimal("21.00")), 0);
-        assertEquals(retrievedInvoice.getAmountOutstanding().compareTo(new BigDecimal("10.00")), 0);
+        assertEquals(retrievedInvoice.getBalance().compareTo(new BigDecimal("10.00")), 0);
         assertEquals(retrievedInvoice.getAmountPaid().compareTo(new BigDecimal("11.00")), 0);
     }
 
@@ -111,6 +110,7 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         invoice = invoiceDao.getById(invoice.getId().toString());
         assertEquals(invoice.getAmountPaid().compareTo(paymentAmount), 0);
         assertTrue(invoice.getLastPaymentAttempt().equals(paymentAttemptDate));
+        assertEquals(invoice.getNumberOfPayments(), 1);
     }
 
     @Test
@@ -162,7 +162,7 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         BigDecimal amount = rate.multiply(new BigDecimal("3.0"));
 
         DefaultInvoiceItem item = new DefaultInvoiceItem(invoiceId, subscriptionId, targetDate, endDate, "test", amount, rate, Currency.USD);
-        invoice.add(item);
+        invoice.addInvoiceItem(item);
         invoiceDao.create(invoice);
 
         // ensure that the number of invoices for payment has increased by 1
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
new file mode 100644
index 0000000..c9f5aa9
--- /dev/null
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import org.skife.config.ConfigurationObjectFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Stage;
+import com.ning.billing.config.CatalogConfig;
+import com.ning.billing.config.InvoiceConfig;
+import com.ning.billing.lifecycle.KillbillService.ServiceException;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.clock.DefaultClock;
+import com.ning.billing.util.eventbus.Bus;
+import com.ning.billing.util.eventbus.MemoryEventBus;
+import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
+public class TestNextBillingDateNotifier {
+
+	private Clock clock;
+	private NextBillingDateNotifier notifier;
+	
+	@BeforeClass(groups={"setup"})
+	public void setup() throws ServiceException {
+		//TestApiBase.loadSystemPropertiesFromClasspath("/entitlement.properties");
+        final Injector g = Guice.createInjector(Stage.PRODUCTION, new AbstractModule() {
+			protected void configure() {
+				 bind(Clock.class).to(DefaultClock.class).asEagerSingleton();
+				 bind(NextBillingDateNotifier.class).to(DefaultNextBillingDateNotifier.class).asEagerSingleton();
+				 bind(Bus.class).to(MemoryEventBus.class).asEagerSingleton();
+				 bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+				 final InvoiceConfig config = new ConfigurationObjectFactory(System.getProperties()).build(InvoiceConfig.class);
+			        bind(InvoiceConfig.class).toInstance(config);
+			}  	
+        });
+
+        notifier = g.getInstance(NextBillingDateNotifier.class);
+        clock = g.getInstance(Clock.class);
+       
+	}
+	
+	@Test(enabled=false, groups="fast")
+	public void test() {
+		
+	}
+}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java
index 5d36d55..67de300 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java
@@ -27,6 +27,7 @@ import org.testng.annotations.Test;
 
 import com.ning.billing.catalog.MockCatalog;
 import com.ning.billing.catalog.api.BillingPeriod;
+import com.ning.billing.catalog.api.Catalog;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.PlanPhase;
@@ -44,9 +45,10 @@ import com.ning.billing.invoice.model.DefaultInvoiceItem;
 import com.ning.billing.invoice.model.InvoiceGenerator;
 import com.ning.billing.invoice.model.InvoiceItemList;
 
-@Test(groups = {"invoicing", "invoiceGenerator"})
+@Test(groups = {"fast", "invoicing", "invoiceGenerator"})
 public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
     private final InvoiceGenerator generator = new DefaultInvoiceGenerator();
+    private final MockCatalog catalog = new MockCatalog();
 
     @Test
     public void testWithNullEventSetAndNullInvoiceSet() {
@@ -82,8 +84,9 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         PlanPhase phase = plan.getAllPhases()[0];
         
         BillingEvent event = new DefaultBillingEvent(sub, startDate, plan, phase,
-                new InternationalPriceMock(ZERO),new InternationalPriceMock(TEN), BillingPeriod.MONTHLY,
-                                               1, BillingModeType.IN_ADVANCE, "Test");
+                                                     new InternationalPriceMock(ZERO),
+                                                     new InternationalPriceMock(TEN), BillingPeriod.MONTHLY,
+                                                     1, BillingModeType.IN_ADVANCE, "Test");
         events.add(event);
 
         InvoiceItemList existingInvoiceItems = new InvoiceItemList();
@@ -108,8 +111,9 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         PlanPhase phase = plan.getAllPhases()[0];
         BigDecimal rate = TEN;
         BillingEvent event = new DefaultBillingEvent(sub, startDate, plan, phase,
-                new InternationalPriceMock(ZERO), new InternationalPriceMock(rate), BillingPeriod.MONTHLY,
-                                               15, BillingModeType.IN_ADVANCE,"Test");
+                                                     new InternationalPriceMock(ZERO),
+                                                     new InternationalPriceMock(rate), BillingPeriod.MONTHLY,
+                                                     15, BillingModeType.IN_ADVANCE,"Test");
         events.add(event);
 
         InvoiceItemList existingInvoiceItems = new InvoiceItemList();
@@ -131,7 +135,6 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
     public void testTwoMonthlySubscriptionsWithAlignedBillingDates() {
         BillingEventSet events = new BillingEventSet();
 
-        MockCatalog catalog = new MockCatalog();
         Plan plan1 = catalog.getCurrentPlans()[0];
         PlanPhase phase1 = plan1.getAllPhases()[0];
         Plan plan2 = catalog.getCurrentPlans()[1];
@@ -139,16 +142,16 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         
         Subscription sub = new SubscriptionData(new SubscriptionBuilder().setId(UUID.randomUUID()));
 
-        BillingEvent event1 = new DefaultBillingEvent(sub, buildDateTime(2011, 9, 1),
-                                               plan1,phase1,
-                                               new InternationalPriceMock(ZERO), new InternationalPriceMock(FIVE), BillingPeriod.MONTHLY,
-                                               1, BillingModeType.IN_ADVANCE, "Test");
+        BillingEvent event1 = new DefaultBillingEvent(sub, buildDateTime(2011, 9, 1), plan1, phase1,
+                                                      new InternationalPriceMock(ZERO),
+                                                      new InternationalPriceMock(FIVE), BillingPeriod.MONTHLY,
+                                                      1, BillingModeType.IN_ADVANCE, "Test");
         events.add(event1);
 
-        BillingEvent event2 = new DefaultBillingEvent(sub, buildDateTime(2011, 10, 1),
-                                               plan2,phase2,
-                                               new InternationalPriceMock(ZERO), new InternationalPriceMock(TEN), BillingPeriod.MONTHLY,
-                                               1, BillingModeType.IN_ADVANCE,"Test");
+        BillingEvent event2 = new DefaultBillingEvent(sub, buildDateTime(2011, 10, 1), plan2, phase2,
+                                                      new InternationalPriceMock(ZERO),
+                                                      new InternationalPriceMock(TEN), BillingPeriod.MONTHLY,
+                                                      1, BillingModeType.IN_ADVANCE, "Test");
         events.add(event2);
 
         InvoiceItemList existingInvoiceItems = new InvoiceItemList();
@@ -158,7 +161,7 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
 
         assertNotNull(invoice);
         assertEquals(invoice.getNumberOfItems(), 2);
-        assertEquals(invoice.getTotalAmount(), FIVE.multiply(TWO).add(TEN).setScale(NUMBER_OF_DECIMALS));
+        assertEquals(invoice.getTotalAmount(), FIVE.add(TEN).setScale(NUMBER_OF_DECIMALS));
     }
 
     @Test
@@ -283,35 +286,37 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         // plan 5: addon to plan 2, with bill cycle alignment to plan; immediate cancellation
 
         UUID subscriptionId1 = UUID.randomUUID();
-        String planName1 = "Change from trial to discount with immediate cancellation";
-        String plan1PhaseName1 = "Trial"; String plan1PhaseName2 = "Discount"; String plan1phase3 = "Cancel";
+        Plan plan1 = catalog.getCurrentPlans()[0];
+        PlanPhase plan1Phase1 = plan1.getAllPhases()[0]; PlanPhase plan1Phase2 = plan1.getAllPhases()[0]; PlanPhase plan1Phase3 = plan1.getAllPhases()[0];
         DateTime plan1StartDate = buildDateTime(2011, 1, 5);
         DateTime plan1PhaseChangeDate = buildDateTime(2011, 4, 5);
         DateTime plan1CancelDate = buildDateTime(2011, 4, 29);
 
         UUID subscriptionId2 = UUID.randomUUID();
-        String planName2 = "Change phase from trial to discount to evergreen";
-        String plan2PhaseName1 = "Trial"; String plan2PhaseName2 = "Discount"; String plan2PhaseName3 = "Evergreen";
+        Plan plan2 = catalog.getCurrentPlans()[1];
+        PlanPhase plan2Phase1 = plan2.getAllPhases()[0]; PlanPhase plan2Phase2 = plan2.getAllPhases()[0]; PlanPhase plan2Phase3 = plan2.getAllPhases()[0];
         DateTime plan2StartDate = buildDateTime(2011, 3, 10);
         DateTime plan2PhaseChangeToDiscountDate = buildDateTime(2011, 6, 10);
         DateTime plan2PhaseChangeToEvergreenDate = buildDateTime(2011, 9, 10);
 
         UUID subscriptionId3 = UUID.randomUUID();
-        String planName3 = "Upgrade with immediate change, BCD = 31";
-        String plan3PhaseName1 = "Evergreen monthly"; String plan3PhaseName2 = "Evergreen annual";
+        Plan plan3 = catalog.getCurrentPlans()[2];
+        PlanPhase plan3Phase1 = plan3.getAllPhases()[0]; PlanPhase plan3Phase2 = plan3.getAllPhases()[0];
         DateTime plan3StartDate = buildDateTime(2011, 5, 20);
         DateTime plan3UpgradeToAnnualDate = buildDateTime(2011, 7, 31);
 
         UUID subscriptionId4 = UUID.randomUUID();
-        String planName4a = "Plan change effective EOT; plan 1";
-        String planName4b = "Plan change effective EOT; plan 2";
-        String plan4PhaseName = "Evergreen";
+        Plan plan4a = catalog.getCurrentPlans()[0];
+        Plan plan4b = catalog.getCurrentPlans()[1];
+        PlanPhase plan4aPhase1 = plan4a.getAllPhases()[0];
+        PlanPhase plan4bPhase1 = plan4b.getAllPhases()[0];
+
         DateTime plan4StartDate = buildDateTime(2011, 6, 7);
         DateTime plan4ChangeOfPlanDate = buildDateTime(2011, 8, 7);
 
         UUID subscriptionId5 = UUID.randomUUID();
-        String planName5 = "Add-on";
-        String plan5PhaseName1 = "Evergreen"; String plan5PhaseName2 = "Cancel";
+        Plan plan5 = catalog.getCurrentPlans()[2];
+        PlanPhase plan5Phase1 = plan5.getAllPhases()[0]; PlanPhase plan5Phase2 = plan5.getAllPhases()[0];
         DateTime plan5StartDate = buildDateTime(2011, 6, 21);
         DateTime plan5CancelDate = buildDateTime(2011, 10, 7);
 
@@ -320,7 +325,7 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         BillingEventSet events = new BillingEventSet();
 
         // on 1/5/2011, create subscription 1 (trial)
-        events.add(createBillingEvent(subscriptionId1, plan1StartDate, planName1, plan1PhaseName1, EIGHT, 5));
+        events.add(createBillingEvent(subscriptionId1, plan1StartDate, plan1, plan1Phase1, EIGHT, 5));
         expectedAmount = EIGHT;
         testInvoiceGeneration(events, invoiceItems, plan1StartDate, 1, expectedAmount);
 
@@ -333,12 +338,12 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         testInvoiceGeneration(events, invoiceItems, buildDateTime(2011, 3, 5), 1, expectedAmount);
 
         // on 3/10/2011, create subscription 2 (trial)
-        events.add(createBillingEvent(subscriptionId2, plan2StartDate, planName2, plan2PhaseName1, TWENTY, 10));
+        events.add(createBillingEvent(subscriptionId2, plan2StartDate, plan2, plan2Phase1, TWENTY, 10));
         expectedAmount = TWENTY;
         testInvoiceGeneration(events, invoiceItems, plan2StartDate, 1, expectedAmount);
 
         // on 4/5/2011, invoice subscription 1 (discount)
-        events.add(createBillingEvent(subscriptionId1, plan1PhaseChangeDate, planName1, plan1PhaseName2, TWELVE, 5));
+        events.add(createBillingEvent(subscriptionId1, plan1PhaseChangeDate, plan1, plan1Phase2, TWELVE, 5));
         expectedAmount = TWELVE;
         testInvoiceGeneration(events, invoiceItems, plan1PhaseChangeDate, 1, expectedAmount);
 
@@ -347,7 +352,7 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         testInvoiceGeneration(events, invoiceItems, buildDateTime(2011, 4, 10), 1, expectedAmount);
 
         // on 4/29/2011, cancel subscription 1
-        events.add(createBillingEvent(subscriptionId1, plan1CancelDate, planName1, plan1phase3, ZERO, 5));
+        events.add(createBillingEvent(subscriptionId1, plan1CancelDate, plan1, plan1Phase3, ZERO, 5));
         expectedAmount = TWELVE.multiply(SIX.divide(THIRTY, NUMBER_OF_DECIMALS, ROUNDING_METHOD)).negate().setScale(NUMBER_OF_DECIMALS);
         testInvoiceGeneration(events, invoiceItems, plan1CancelDate, 2, expectedAmount);
 
@@ -356,17 +361,17 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         testInvoiceGeneration(events, invoiceItems, buildDateTime(2011, 5, 10), 1, expectedAmount);
 
         // on 5/20/2011, create subscription 3 (monthly)
-        events.add(createBillingEvent(subscriptionId3, plan3StartDate, planName3, plan3PhaseName1, TEN, 20));
+        events.add(createBillingEvent(subscriptionId3, plan3StartDate, plan3, plan3Phase1, TEN, 20));
         expectedAmount = TEN;
         testInvoiceGeneration(events, invoiceItems, plan3StartDate, 1, expectedAmount);
 
         // on 6/7/2011, create subscription 4
-        events.add(createBillingEvent(subscriptionId4, plan4StartDate, planName4a, plan4PhaseName, FIFTEEN, 7));
+        events.add(createBillingEvent(subscriptionId4, plan4StartDate, plan4a, plan4aPhase1, FIFTEEN, 7));
         expectedAmount = FIFTEEN;
         testInvoiceGeneration(events, invoiceItems, plan4StartDate, 1, expectedAmount);
 
         // on 6/10/2011, invoice subscription 2 (discount)
-        events.add(createBillingEvent(subscriptionId2, plan2PhaseChangeToDiscountDate, planName2, plan2PhaseName2, THIRTY, 10));
+        events.add(createBillingEvent(subscriptionId2, plan2PhaseChangeToDiscountDate, plan2, plan2Phase2, THIRTY, 10));
         expectedAmount = THIRTY;
         testInvoiceGeneration(events, invoiceItems, plan2PhaseChangeToDiscountDate, 1, expectedAmount);
 
@@ -375,7 +380,7 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         testInvoiceGeneration(events, invoiceItems, buildDateTime(2011, 6, 20), 1, expectedAmount);
 
         // on 6/21/2011, create add-on (subscription 5)
-        events.add(createBillingEvent(subscriptionId5, plan5StartDate, planName5, plan5PhaseName1, TWENTY, 10));
+        events.add(createBillingEvent(subscriptionId5, plan5StartDate, plan5, plan5Phase1, TWENTY, 10));
         expectedAmount = TWENTY.multiply(NINETEEN.divide(THIRTY, NUMBER_OF_DECIMALS, ROUNDING_METHOD)).setScale(NUMBER_OF_DECIMALS);
         testInvoiceGeneration(events, invoiceItems, plan5StartDate, 1, expectedAmount);
 
@@ -392,14 +397,14 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         testInvoiceGeneration(events, invoiceItems, buildDateTime(2011, 7, 20), 1, expectedAmount);
 
         // on 7/31/2011, convert subscription 3 to annual
-        events.add(createAnnualBillingEvent(subscriptionId3, plan3UpgradeToAnnualDate, planName3, plan3PhaseName2, ONE_HUNDRED, 31));
+        events.add(createAnnualBillingEvent(subscriptionId3, plan3UpgradeToAnnualDate, plan3, plan3Phase2, ONE_HUNDRED, 31));
         expectedAmount = ONE_HUNDRED.subtract(TEN);
         expectedAmount = expectedAmount.add(TEN.multiply(ELEVEN.divide(THIRTY_ONE, NUMBER_OF_DECIMALS, ROUNDING_METHOD)));
         expectedAmount = expectedAmount.setScale(NUMBER_OF_DECIMALS);
         testInvoiceGeneration(events, invoiceItems, plan3UpgradeToAnnualDate, 3, expectedAmount);
 
         // on 8/7/2011, invoice subscription 4 (plan 2)
-        events.add(createBillingEvent(subscriptionId4, plan4ChangeOfPlanDate, planName4b, plan4PhaseName, TWENTY_FOUR, 7));
+        events.add(createBillingEvent(subscriptionId4, plan4ChangeOfPlanDate, plan4b, plan4bPhase1, TWENTY_FOUR, 7));
         expectedAmount = TWENTY_FOUR;
         testInvoiceGeneration(events, invoiceItems, plan4ChangeOfPlanDate, 1, expectedAmount);
 
@@ -412,12 +417,12 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         testInvoiceGeneration(events, invoiceItems, buildDateTime(2011, 9, 7), 1, expectedAmount);
 
         // on 9/10/2011, invoice plan 2 (evergreen), invoice subscription 5
-        events.add(createBillingEvent(subscriptionId2, plan2PhaseChangeToEvergreenDate, planName2, plan2PhaseName3, FORTY, 10));
+        events.add(createBillingEvent(subscriptionId2, plan2PhaseChangeToEvergreenDate, plan2, plan2Phase3, FORTY, 10));
         expectedAmount = FORTY.add(TWENTY);
         testInvoiceGeneration(events, invoiceItems, plan2PhaseChangeToEvergreenDate, 2, expectedAmount);
 
         // on 10/7/2011, invoice subscription 4 (plan 2), cancel subscription 5
-        events.add(createBillingEvent(subscriptionId5, plan5CancelDate, planName5, plan5PhaseName2, ZERO, 10));
+        events.add(createBillingEvent(subscriptionId5, plan5CancelDate, plan5, plan5Phase2, ZERO, 10));
         expectedAmount = TWENTY_FOUR.add(TWENTY.multiply(THREE.divide(THIRTY)).negate().setScale(NUMBER_OF_DECIMALS));
         testInvoiceGeneration(events, invoiceItems, plan5CancelDate, 3, expectedAmount);
 
@@ -426,33 +431,34 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         testInvoiceGeneration(events, invoiceItems, buildDateTime(2011, 10, 10), 1, expectedAmount);
     }
 
-    private DefaultBillingEvent createBillingEvent(UUID subscriptionId, DateTime startDate, String planName, String planPhaseName,
-                                            BigDecimal rate, int billCycleDay) {
-        MockCatalog catalog = new MockCatalog();
-        Plan plan = catalog.getCurrentPlans()[0];
-        PlanPhase phase = plan.getAllPhases()[0];
-        Subscription sub = new SubscriptionData(new SubscriptionBuilder().setId(UUID.randomUUID()));
-        return new DefaultBillingEvent(sub, startDate, plan, phase,
-                new InternationalPriceMock(ZERO),new InternationalPriceMock(rate), BillingPeriod.MONTHLY,
-                                billCycleDay, BillingModeType.IN_ADVANCE,"Test");
+    private DefaultBillingEvent createBillingEvent(final UUID subscriptionId, final DateTime startDate,
+                                                   final Plan plan, final PlanPhase planPhase,
+                                                   final BigDecimal rate, final int billCycleDay) {
+        Subscription sub = new SubscriptionData(new SubscriptionBuilder().setId(subscriptionId));
+
+        return new DefaultBillingEvent(sub, startDate, plan, planPhase,
+                                       new InternationalPriceMock(ZERO),
+                                       new InternationalPriceMock(rate), BillingPeriod.MONTHLY,
+                                       billCycleDay, BillingModeType.IN_ADVANCE,"Test");
     }
 
-    private DefaultBillingEvent createAnnualBillingEvent(UUID subscriptionId, DateTime startDate, String planName, String planPhaseName,
-                                                  BigDecimal rate, int billCycleDay) {
-        MockCatalog catalog = new MockCatalog();
-        Plan plan = catalog.getCurrentPlans()[0];
-        PlanPhase phase = plan.getAllPhases()[0];
-        Subscription sub = new SubscriptionData(new SubscriptionBuilder().setId(UUID.randomUUID()));
-        return new DefaultBillingEvent(sub, startDate, plan, phase,
-                new InternationalPriceMock(ZERO),new InternationalPriceMock(rate), BillingPeriod.ANNUAL,
-                                billCycleDay, BillingModeType.IN_ADVANCE,"Test");
+    private DefaultBillingEvent createAnnualBillingEvent(final UUID subscriptionId, final DateTime startDate,
+                                                         final Plan plan, final PlanPhase planPhase,
+                                                         final BigDecimal rate, final int billCycleDay) {
+        Subscription sub = new SubscriptionData(new SubscriptionBuilder().setId(subscriptionId));
+        return new DefaultBillingEvent(sub, startDate, plan, planPhase,
+                                       new InternationalPriceMock(ZERO),
+                                       new InternationalPriceMock(rate), BillingPeriod.ANNUAL,
+                                       billCycleDay, BillingModeType.IN_ADVANCE,"Test");
     }
 
-    private void testInvoiceGeneration(BillingEventSet events, InvoiceItemList existingInvoiceItems, DateTime targetDate, int expectedNumberOfItems, BigDecimal expectedAmount) {
+    private void testInvoiceGeneration(final BillingEventSet events, final InvoiceItemList existingInvoiceItems,
+                                       final DateTime targetDate, final int expectedNumberOfItems,
+                                       final BigDecimal expectedAmount) {
         Currency currency = Currency.USD;
         UUID accountId = UUID.randomUUID();
         Invoice invoice = generator.generateInvoice(accountId, events, existingInvoiceItems, targetDate, currency);
-        existingInvoiceItems.addAll(invoice.getItems());
+        existingInvoiceItems.addAll(invoice.getInvoiceItems());
         assertNotNull(invoice);
         assertEquals(invoice.getNumberOfItems(), expectedNumberOfItems);
         assertEquals(invoice.getTotalAmount(), expectedAmount);
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/DoubleProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/DoubleProRationTests.java
index 84bf795..64fa95c 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/DoubleProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/DoubleProRationTests.java
@@ -24,7 +24,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class DoubleProRationTests extends ProRationInAdvanceTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/GenericProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/GenericProRationTests.java
index f612f10..8d0eb06 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/GenericProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/GenericProRationTests.java
@@ -22,7 +22,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class GenericProRationTests extends GenericProRationTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/LeadingProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/LeadingProRationTests.java
index f44876e..a009fba 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/LeadingProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/LeadingProRationTests.java
@@ -24,7 +24,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class LeadingProRationTests extends ProRationInAdvanceTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/ProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/ProRationTests.java
index 274a8e7..f882005 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/ProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/ProRationTests.java
@@ -24,7 +24,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class ProRationTests extends ProRationInAdvanceTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/TrailingProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/TrailingProRationTests.java
index 0689c69..e0216b5 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/TrailingProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/TrailingProRationTests.java
@@ -24,7 +24,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class TrailingProRationTests extends ProRationInAdvanceTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/GenericProRationTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/GenericProRationTestBase.java
index d4e4c24..60892e6 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/GenericProRationTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/GenericProRationTestBase.java
@@ -22,7 +22,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public abstract class GenericProRationTestBase extends ProRationInAdvanceTestBase {
     /**
      * used for testing cancellation in less than a single billing period
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/DoubleProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/DoubleProRationTests.java
index 4799e40..240b8aa 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/DoubleProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/DoubleProRationTests.java
@@ -24,7 +24,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class DoubleProRationTests extends ProRationInAdvanceTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/GenericProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/GenericProRationTests.java
index eb3b1c9..8b40db8 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/GenericProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/GenericProRationTests.java
@@ -22,7 +22,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class GenericProRationTests extends GenericProRationTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/LeadingProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/LeadingProRationTests.java
index 5db6911..998e566 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/LeadingProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/LeadingProRationTests.java
@@ -24,7 +24,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class LeadingProRationTests extends ProRationInAdvanceTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/ProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/ProRationTests.java
index 13c75df..c3748d1 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/ProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/ProRationTests.java
@@ -24,7 +24,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class ProRationTests extends ProRationInAdvanceTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/TrailingProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/TrailingProRationTests.java
index 9a8e635..6a5e5ef 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/TrailingProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/monthly/TrailingProRationTests.java
@@ -24,7 +24,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class TrailingProRationTests extends ProRationInAdvanceTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/ProRationInAdvanceTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/ProRationInAdvanceTestBase.java
index 14392a8..18bd096 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/ProRationInAdvanceTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/ProRationInAdvanceTestBase.java
@@ -21,7 +21,7 @@ import com.ning.billing.invoice.model.InAdvanceBillingMode;
 import com.ning.billing.invoice.tests.ProRationTestBase;
 import org.testng.annotations.Test;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public abstract class ProRationInAdvanceTestBase extends ProRationTestBase {
     @Override
     protected BillingMode getBillingMode() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/DoubleProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/DoubleProRationTests.java
index 68e29fd..c1d6085 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/DoubleProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/DoubleProRationTests.java
@@ -24,7 +24,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class DoubleProRationTests extends ProRationInAdvanceTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/GenericProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/GenericProRationTests.java
index 8306716..c4237a6 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/GenericProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/GenericProRationTests.java
@@ -22,7 +22,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class GenericProRationTests extends GenericProRationTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/LeadingProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/LeadingProRationTests.java
index fe614a9..04ec683 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/LeadingProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/LeadingProRationTests.java
@@ -24,7 +24,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class LeadingProRationTests extends ProRationInAdvanceTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/ProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/ProRationTests.java
index c99aa5c..e13db0d 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/ProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/ProRationTests.java
@@ -24,7 +24,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class ProRationTests extends ProRationInAdvanceTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/TrailingProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/TrailingProRationTests.java
index 89e138e..8f63010 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/TrailingProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/TrailingProRationTests.java
@@ -24,7 +24,7 @@ import org.testng.annotations.Test;
 
 import java.math.BigDecimal;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class TrailingProRationTests extends ProRationInAdvanceTestBase {
     @Override
     protected BillingPeriod getBillingPeriod() {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/ValidationProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/ValidationProRationTests.java
index 9de01bd..b38d076 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/ValidationProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/ValidationProRationTests.java
@@ -28,7 +28,7 @@ import java.math.BigDecimal;
 
 import static org.testng.Assert.assertEquals;
 
-@Test(groups = {"invoicing", "proRation"})
+@Test(groups = {"fast", "invoicing", "proRation"})
 public class ValidationProRationTests extends ProRationTestBase {
     protected BillingPeriod getBillingPeriod() {
         return BillingPeriod.MONTHLY;
diff --git a/util/src/main/java/com/ning/billing/util/eventbus/DefaultEventBusService.java b/util/src/main/java/com/ning/billing/util/eventbus/DefaultEventBusService.java
index afcae5e..01ad9b8 100644
--- a/util/src/main/java/com/ning/billing/util/eventbus/DefaultEventBusService.java
+++ b/util/src/main/java/com/ning/billing/util/eventbus/DefaultEventBusService.java
@@ -20,14 +20,14 @@ import com.google.inject.Inject;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
 import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
 
-public class DefaultEventBusService implements EventBusService {
+public class DefaultEventBusService implements BusService {
 
     private final static String EVENT_BUS_SERVICE = "eventbus-service";
 
-    private final EventBus eventBus;
+    private final Bus eventBus;
 
     @Inject
-    public DefaultEventBusService(EventBus eventBus) {
+    public DefaultEventBusService(Bus eventBus) {
         this.eventBus = eventBus;
     }
 
@@ -47,7 +47,7 @@ public class DefaultEventBusService implements EventBusService {
     }
 
     @Override
-    public EventBus getEventBus() {
+    public Bus getBus() {
         return eventBus;
     }
 
diff --git a/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
index 259736d..7312e5b 100644
--- a/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
+++ b/util/src/main/java/com/ning/billing/util/eventbus/MemoryEventBus.java
@@ -26,7 +26,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class MemoryEventBus implements EventBus {
+public class MemoryEventBus implements Bus {
 
     // STEPH config ?
     private final static int MAX_EVENT_THREADS = 1;
@@ -92,13 +92,13 @@ public class MemoryEventBus implements EventBus {
     }
 
     @Override
-    public void post(EventBusNotification event) throws EventBusException {
+    public void post(BusEvent event) throws EventBusException {
         checkInitialized("post");
         delegate.post(event);
     }
 
     @Override
-    public void postFromTransaction(EventBusNotification event, Transmogrifier dao) throws EventBusException {
+    public void postFromTransaction(BusEvent event, Transmogrifier dao) throws EventBusException {
         checkInitialized("postFromTransaction");
         delegate.post(event);
     }
diff --git a/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java b/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
index 078c331..8e6830e 100644
--- a/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/EventBusModule.java
@@ -18,16 +18,16 @@ package com.ning.billing.util.glue;
 
 import com.google.inject.AbstractModule;
 import com.ning.billing.util.eventbus.DefaultEventBusService;
-import com.ning.billing.util.eventbus.EventBus;
-import com.ning.billing.util.eventbus.EventBusService;
+import com.ning.billing.util.eventbus.Bus;
+import com.ning.billing.util.eventbus.BusService;
 import com.ning.billing.util.eventbus.MemoryEventBus;
 
 public class EventBusModule extends AbstractModule {
 
     @Override
     protected void configure() {
-        bind(EventBusService.class).to(DefaultEventBusService.class);
-        bind(EventBus.class).to(MemoryEventBus.class).asEagerSingleton();
+        bind(BusService.class).to(DefaultEventBusService.class);
+        bind(Bus.class).to(MemoryEventBus.class).asEagerSingleton();
 
     }
 
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 80f7385..2f18379 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -40,6 +40,8 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
         this.dao = dbi.onDemand(NotificationSqlDao.class);
     }
 
+
+
     @Override
     protected void doProcessEvents(int sequenceId) {
         List<Notification> notifications = getReadyNotifications(sequenceId);
@@ -116,4 +118,5 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
         }
         return result;
     }
+    
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationError.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationError.java
new file mode 100644
index 0000000..4e771ba
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationError.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+public class NotificationError extends Error {
+
+    private static final long serialVersionUID = 131398536;
+
+    public NotificationError() {
+        super();
+    }
+
+    public NotificationError(String msg, Throwable arg1) {
+        super(msg, arg1);
+    }
+
+    public NotificationError(String msg) {
+        super(msg);
+    }
+
+    public NotificationError(Throwable msg) {
+        super(msg);
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 23f0de0..4ea38f7 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -42,18 +42,17 @@ public interface NotificationQueue {
    public void processReadyNotification();
 
    /**
-    * Stops the queue.
+    * Stops the queue. Blocks until queue is completely stopped.
     *
     * @see NotificationQueueHandler.completedQueueStop to be notified when the notification thread exited
     */
    public void stopQueue();
 
    /**
-    * Starts the queue.
+    * Starts the queue. Blocks until queue has completely started.
     *
     * @see NotificationQueueHandler.completedQueueStart to be notified when the notification thread started
     */
    public void startQueue();
 
-
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 6eaf33f..9a42d2e 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -17,34 +17,28 @@
 package com.ning.billing.util.notificationq;
 
 import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.joda.time.DateTime;
-import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.Transaction;
-import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.util.Hostname;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
-import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
 
 public abstract class NotificationQueueBase implements NotificationQueue {
 
     protected final static Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
 
+    private static final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
+    private static final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
+    private static final long NANO_TO_MS = (1000 * 1000);
+
     protected static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
     protected final long STOP_WAIT_TIMEOUT_MS = 60000;
 
@@ -63,7 +57,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
     // Use this object's monitor for synchronization (no need for volatile)
     protected boolean isProcessingEvents;
-
+    
+    private boolean startedComplete = false;
+    private boolean stoppedComplete = false;
+    
     // Package visibility on purpose
     NotificationQueueBase(final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
         this.clock = clock;
@@ -92,14 +89,14 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
     @Override
     public void processReadyNotification() {
-        // STEPH to be implemented
+        doProcessEvents(sequenceId.incrementAndGet());
     }
 
 
     @Override
     public void stopQueue() {
         if (config.isNotificationProcessingOff()) {
-            handler.completedQueueStop();
+            completedQueueStop();
             return;
         }
 
@@ -113,7 +110,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
                 log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
             }
         }
-
+        waitForNotificationStopCompletion();
     }
 
     @Override
@@ -125,7 +122,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
         if (config.isNotificationProcessingOff()) {
             log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
-            handler.completedQueueStart();
+            completedQueueStart();
             return;
         }
         final NotificationQueueBase notificationQueue = this;
@@ -139,7 +136,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
                         Thread.currentThread().getId()));
 
                 // Thread is now started, notify the listener
-                handler.completedQueueStart();
+                completedQueueStart();
 
                 try {
                     while (true) {
@@ -171,7 +168,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
                     // Just to make it really obvious in the log
                     e.printStackTrace();
                 } finally {
-                    handler.completedQueueStop();
+                    completedQueueStop();
                     log.info(String.format("NotificationQueue thread  %s  [%d] exited...",
                             Thread.currentThread().getName(),
                             Thread.currentThread().getId()));
@@ -182,8 +179,59 @@ public abstract class NotificationQueueBase implements NotificationQueue {
                 Thread.sleep(config.getNotificationSleepTimeMs());
             }
         });
+        waitForNotificationStartCompletion();
+    }
+    
+    private void completedQueueStop() {
+    	synchronized (this) {
+    		stoppedComplete = true;
+            this.notifyAll();
+        }
+    }
+    
+    private void completedQueueStart() {
+        synchronized (this) {
+        	startedComplete = true;
+            this.notifyAll();
+        }
     }
 
+    private void waitForNotificationStartCompletion() {
+        waitForNotificationEventCompletion(true);
+    }
+
+    private void waitForNotificationStopCompletion() {
+        waitForNotificationEventCompletion(false);
+    }
+
+    private void waitForNotificationEventCompletion(boolean startEvent) {
+
+        long ini = System.nanoTime();
+        synchronized(this) {
+            do {
+                if ((startEvent ? startedComplete : stoppedComplete)) {
+                    break;
+                }
+                try {
+                    this.wait(NOTIFICATION_THREAD_WAIT_INCREMENT_MS);
+                } catch (InterruptedException e ) {
+                    Thread.currentThread().interrupt();
+                    throw new NotificationError(e);
+                }
+            } while (!(startEvent ? startedComplete : stoppedComplete) &&
+                    (System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
+
+            if (!(startEvent ? startedComplete : stoppedComplete)) {
+                log.error("Could not {} notification thread in {} msec !!!",
+                        (startEvent ? "start" : "stop"),
+                        MAX_NOTIFICATION_THREAD_WAIT_MS);
+                throw new NotificationError("Failed to start service!!");
+            }
+            log.info("Notification thread has been {} in {} ms",
+                    (startEvent ? "started" : "stopped"),
+                    (System.nanoTime() - ini) / NANO_TO_MS);
+        }
+    }
 
     protected String getFullQName() {
         return svcName + ":" +  queueName;
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index a18906b..c1feca1 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -23,21 +23,12 @@ public interface NotificationQueueService {
 
     public interface NotificationQueueHandler {
         /**
-         * Called when the Notification thread has been started
-         */
-        public void completedQueueStart();
-
-        /**
          * Called for each notification ready
          *
          * @param key the notification key associated to that notification entry
          */
         public void handleReadyNotification(String notificationKey);
-        /**
-         * Called right before the Notification thread is about to exit
-         */
-        public void completedQueueStop();
-    }
+     }
 
     public static final class NotficationQueueAlreadyExists extends Exception {
         private static final long serialVersionUID = 1541281L;
diff --git a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
index 4b0f4a2..0091ab6 100644
--- a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
+++ b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
@@ -28,7 +28,7 @@ public class TestEventBus {
 
     private static final Logger log = LoggerFactory.getLogger(TestEventBus.class);
 
-    private EventBus eventBus;
+    private Bus eventBus;
 
 
     @BeforeClass
@@ -42,7 +42,7 @@ public class TestEventBus {
         eventBus.stop();
     }
 
-    public static final class MyEvent implements EventBusNotification {
+    public static final class MyEvent implements BusEvent {
         String name;
         Long value;
 
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 2e3bb3c..8058561 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -17,7 +17,6 @@
 package com.ning.billing.util.notificationq;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -28,18 +27,11 @@ import java.util.UUID;
 
 import org.apache.commons.io.IOUtils;
 import org.joda.time.DateTime;
-import org.skife.config.ConfigurationObjectFactory;
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
 import org.skife.jdbi.v2.tweak.HandleCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Guice;
@@ -49,8 +41,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
-import com.ning.billing.dbi.DBIProvider;
-import com.ning.billing.dbi.DbiConfig;
 import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
@@ -59,370 +49,243 @@ import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
 @Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
 public class TestNotificationQueue {
-
-    private final static Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
-
-    @Inject
-    private DBI dbi;
-
-    @Inject
-    MysqlTestingHelper helper;
-
-    @Inject
-    private Clock clock;
-
-    private DummySqlTest dao;
-
-   // private NotificationQueue queue;
-
-    private void startMysql() throws IOException, ClassNotFoundException, SQLException {
-        final String ddl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
-        final String testDdl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl_test.sql"));
-        helper.startMysql();
-        helper.initDb(ddl);
-        helper.initDb(testDdl);
-    }
-
-    @BeforeSuite(alwaysRun = true)
-    public void setup() throws Exception {
-        startMysql();
-        dao = dbi.onDemand(DummySqlTest.class);
-    }
-
-    @BeforeTest
-    public void beforeTest() {
-        dbi.withHandle(new HandleCallback<Void>() {
-
-            @Override
-            public Void withHandle(Handle handle) throws Exception {
-                handle.execute("delete from notifications");
-                handle.execute("delete from claimed_notifications");
-                handle.execute("delete from dummy");
-                return null;
-            }
-        });
-        // Reset time to real value
-        ((ClockMock) clock).resetDeltaFromReality();
-    }
-
-
-
-    /**
-     * Verify that we can call start/stop on a disabled queue and that both start/stop callbacks are called
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void testSimpleQueueDisabled() throws InterruptedException {
-
-        final TestStartStop testStartStop = new TestStartStop(false, false);
-        DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "dead",
-                new NotificationQueueHandler() {
-                    @Override
-                    public void handleReadyNotification(String notificationKey) {
-                    }
-                    @Override
-                    public void completedQueueStop() {
-                        testStartStop.stopped();
-                    }
-                    @Override
-                    public void completedQueueStart() {
-                        testStartStop.started();
-                    }
-                },
-                getNotificationConfig(true, 100, 1, 10000));
-
-        executeTest(testStartStop, queue, new WithTest() {
-            @Override
-            public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
-                // Do nothing
-            }
-        });
-        assertTrue(true);
-    }
-
-    /**
-     * Test that we can post a notification in the future from a transaction and get the notification
-     * callback with the correct key when the time is ready
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void testSimpleNotification() throws InterruptedException {
-
-        final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
-
-        final TestStartStop testStartStop = new TestStartStop(false, false);
-        DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
-                new NotificationQueueHandler() {
-                    @Override
-                    public void handleReadyNotification(String notificationKey) {
-                        synchronized (expectedNotifications) {
-                            expectedNotifications.put(notificationKey, Boolean.TRUE);
-                            expectedNotifications.notify();
-                        }
-                    }
-                    @Override
-                    public void completedQueueStop() {
-                        testStartStop.stopped();
-                    }
-                    @Override
-                    public void completedQueueStart() {
-                        testStartStop.started();
-                    }
-                },
-                getNotificationConfig(false, 100, 1, 10000));
-
-
-        executeTest(testStartStop, queue, new WithTest() {
-            @Override
-            public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
-
-                final UUID key = UUID.randomUUID();
-                final DummyObject obj = new DummyObject("foo", key);
-                final DateTime now = new DateTime();
-                final DateTime readyTime = now.plusMillis(2000);
-                final NotificationKey notificationKey = new NotificationKey() {
-                    @Override
-                    public String toString() {
-                        return key.toString();
-                    }
-                };
-                expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
-
-
-                // Insert dummy to be processed in 2 sec'
-                dao.inTransaction(new Transaction<Void, DummySqlTest>() {
-                    @Override
-                    public Void inTransaction(DummySqlTest transactional,
-                            TransactionStatus status) throws Exception {
-
-                        transactional.insertDummy(obj);
-                        readyQueue.recordFutureNotificationFromTransaction(transactional,
-                                readyTime, notificationKey);
-                        return null;
-                    }
-                });
-
-                // Move time in the future after the notification effectiveDate
-                ((ClockMock) clock).setDeltaFromReality(3000);
-
-                // Notification should have kicked but give it at least a sec' for thread scheduling
-                int nbTry = 1;
-                boolean success = false;
-                do {
-                    synchronized(expectedNotifications) {
-                        if (expectedNotifications.get(notificationKey.toString())) {
-                            success = true;
-                            break;
-                        }
-                        expectedNotifications.wait(1000);
-                    }
-                } while (nbTry-- > 0);
-                assertEquals(success, true);
-            }
-        });
-    }
-
-    @Test
-    public void testManyNotifications() throws InterruptedException {
-        final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
-
-        final TestStartStop testStartStop = new TestStartStop(false, false);
-        DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
-                new NotificationQueueHandler() {
-                    @Override
-                    public void handleReadyNotification(String notificationKey) {
-                        synchronized (expectedNotifications) {
-                            expectedNotifications.put(notificationKey, Boolean.TRUE);
-                            expectedNotifications.notify();
-                        }
-                    }
-                    @Override
-                    public void completedQueueStop() {
-                        testStartStop.stopped();
-                    }
-                    @Override
-                    public void completedQueueStart() {
-                        testStartStop.started();
-                    }
-                },
-                getNotificationConfig(false, 100, 10, 10000));
-
-
-        executeTest(testStartStop, queue, new WithTest() {
-            @Override
-            public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
-
-                final DateTime now = clock.getUTCNow();
-                final int MAX_NOTIFICATIONS = 100;
-                for (int i = 0; i < MAX_NOTIFICATIONS; i++) {
-
-                    final int nextReadyTimeIncrementMs = 1000;
-
-                    final UUID key = UUID.randomUUID();
-                    final DummyObject obj = new DummyObject("foo", key);
-                    final int currentIteration = i;
-
-                    final NotificationKey notificationKey = new NotificationKey() {
-                        @Override
-                        public String toString() {
-                            return key.toString();
-                        }
-                    };
-                    expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
-
-                    dao.inTransaction(new Transaction<Void, DummySqlTest>() {
-                        @Override
-                        public Void inTransaction(DummySqlTest transactional,
-                                TransactionStatus status) throws Exception {
-
-                            transactional.insertDummy(obj);
-                            readyQueue.recordFutureNotificationFromTransaction(transactional,
-                                    now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
-                            return null;
-                        }
-                    });
-
-                    // Move time in the future after the notification effectiveDate
-                    if (i == 0) {
-                        ((ClockMock) clock).setDeltaFromReality(nextReadyTimeIncrementMs);
-                    } else {
-                        ((ClockMock) clock).addDeltaFromReality(nextReadyTimeIncrementMs);
-                    }
-                }
-
-                // Wait a little longer since there are a lot of callback that need to happen
-                int nbTry = MAX_NOTIFICATIONS + 1;
-                boolean success = false;
-                do {
-                    synchronized(expectedNotifications) {
-
-                        Collection<Boolean> completed =  Collections2.filter(expectedNotifications.values(), new Predicate<Boolean>() {
-                            @Override
-                            public boolean apply(Boolean input) {
-                                return input;
-                            }
-                        });
-
-                        if (completed.size() == MAX_NOTIFICATIONS) {
-                            success = true;
-                            break;
-                        }
-                        //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
-                        expectedNotifications.wait(1000);
-                    }
-                } while (nbTry-- > 0);
-                assertEquals(success, true);
-            }
-        });
-    }
-
-
-    NotificationConfig getNotificationConfig(final boolean off,
-            final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
-        return new NotificationConfig() {
-            @Override
-            public boolean isNotificationProcessingOff() {
-                return off;
-            }
-            @Override
-            public long getNotificationSleepTimeMs() {
-                return sleepTime;
-            }
-            @Override
-            public int getDaoMaxReadyEvents() {
-                return maxReadyEvents;
-            }
-            @Override
-            public long getDaoClaimTimeMs() {
-                return claimTimeMs;
-            }
-        };
-    }
-
-    private static class TestStartStop {
-        private boolean started;
-        private boolean stopped;
-
-        public TestStartStop(boolean started, boolean stopped) {
-            super();
-            this.started = started;
-            this.stopped = stopped;
-        }
-
-        public void started() {
-            synchronized(this) {
-                started = true;
-                notify();
-            }
-        }
-
-        public void stopped() {
-            synchronized(this) {
-                stopped = true;
-                notify();
-            }
-        }
-
-        public boolean waitForStartComplete(int timeoutMs) throws InterruptedException {
-            return waitForEventCompletion(timeoutMs, true);
-        }
-
-        public boolean waitForStopComplete(int timeoutMs) throws InterruptedException {
-            return waitForEventCompletion(timeoutMs, false);
-        }
-
-        private boolean waitForEventCompletion(int timeoutMs, boolean start) throws InterruptedException {
-            DateTime init = new DateTime();
-            synchronized(this) {
-                while (! ((start ? started : stopped))) {
-                    wait(timeoutMs);
-                    if (init.plusMillis(timeoutMs).isAfterNow()) {
-                        break;
-                    }
-                }
-            }
-            return (start ? started : stopped);
-        }
-    }
-
-    private interface WithTest {
-        public void test(DefaultNotificationQueue readyQueue) throws InterruptedException;
-    }
-
-    private void executeTest(final TestStartStop testStartStop,
-            DefaultNotificationQueue queue, WithTest test) throws InterruptedException{
-
-        queue.startQueue();
-        boolean started = testStartStop.waitForStartComplete(3000);
-        assertEquals(started, true);
-
-        test.test(queue);
-
-        queue.stopQueue();
-        boolean stopped = testStartStop.waitForStopComplete(3000);
-        assertEquals(stopped, true);
-    }
-
-
-    public static class TestNotificationQueueModule extends AbstractModule {
-        @Override
-        protected void configure() {
-
-            bind(Clock.class).to(ClockMock.class);
-
-            final MysqlTestingHelper helper = new MysqlTestingHelper();
-            bind(MysqlTestingHelper.class).toInstance(helper);
-            DBI dbi = helper.getDBI();
-            bind(DBI.class).toInstance(dbi);
-            /*
+	@Inject
+	private DBI dbi;
+
+	@Inject
+	MysqlTestingHelper helper;
+
+	@Inject
+	private Clock clock;
+
+	private DummySqlTest dao;
+
+	// private NotificationQueue queue;
+
+	private void startMysql() throws IOException, ClassNotFoundException, SQLException {
+		final String ddl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
+		final String testDdl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl_test.sql"));
+		helper.startMysql();
+		helper.initDb(ddl);
+		helper.initDb(testDdl);
+	}
+
+	@BeforeSuite(alwaysRun = true)
+	public void setup() throws Exception {
+		startMysql();
+		dao = dbi.onDemand(DummySqlTest.class);
+	}
+
+	@BeforeTest
+	public void beforeTest() {
+		dbi.withHandle(new HandleCallback<Void>() {
+
+			@Override
+			public Void withHandle(Handle handle) throws Exception {
+				handle.execute("delete from notifications");
+				handle.execute("delete from claimed_notifications");
+				handle.execute("delete from dummy");
+				return null;
+			}
+		});
+		// Reset time to real value
+		((ClockMock) clock).resetDeltaFromReality();
+	}
+
+
+
+	/**
+	 * Test that we can post a notification in the future from a transaction and get the notification
+	 * callback with the correct key when the time is ready
+	 *
+	 * @throws InterruptedException
+	 */
+	@Test
+	public void testSimpleNotification() throws InterruptedException {
+
+		final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+		final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
+				new NotificationQueueHandler() {
+			@Override
+			public void handleReadyNotification(String notificationKey) {
+				synchronized (expectedNotifications) {
+					expectedNotifications.put(notificationKey, Boolean.TRUE);
+					expectedNotifications.notify();
+				}
+			}
+		},
+		getNotificationConfig(false, 100, 1, 10000));
+
+
+		queue.startQueue();
+
+		final UUID key = UUID.randomUUID();
+		final DummyObject obj = new DummyObject("foo", key);
+		final DateTime now = new DateTime();
+		final DateTime readyTime = now.plusMillis(2000);
+		final NotificationKey notificationKey = new NotificationKey() {
+			@Override
+			public String toString() {
+				return key.toString();
+			}
+		};
+		expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+
+		// Insert dummy to be processed in 2 sec'
+		dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+			@Override
+			public Void inTransaction(DummySqlTest transactional,
+					TransactionStatus status) throws Exception {
+
+				transactional.insertDummy(obj);
+				queue.recordFutureNotificationFromTransaction(transactional,
+						readyTime, notificationKey);
+				return null;
+			}
+		});
+
+		// Move time in the future after the notification effectiveDate
+		((ClockMock) clock).setDeltaFromReality(3000);
+
+		// Notification should have kicked but give it at least a sec' for thread scheduling
+		int nbTry = 1;
+		boolean success = false;
+		do {
+			synchronized(expectedNotifications) {
+				if (expectedNotifications.get(notificationKey.toString())) {
+					success = true;
+					break;
+				}
+				expectedNotifications.wait(1000);
+			}
+		} while (nbTry-- > 0);
+		assertEquals(success, true);
+	}
+
+	@Test
+	public void testManyNotifications() throws InterruptedException {
+		final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+		final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
+				new NotificationQueueHandler() {
+			@Override
+			public void handleReadyNotification(String notificationKey) {
+				synchronized (expectedNotifications) {
+					expectedNotifications.put(notificationKey, Boolean.TRUE);
+					expectedNotifications.notify();
+				}
+			}
+		},
+		getNotificationConfig(false, 100, 10, 10000));
+
+
+		queue.startQueue();
+
+		final DateTime now = clock.getUTCNow();
+		final int MAX_NOTIFICATIONS = 100;
+		for (int i = 0; i < MAX_NOTIFICATIONS; i++) {
+
+			final int nextReadyTimeIncrementMs = 1000;
+
+			final UUID key = UUID.randomUUID();
+			final DummyObject obj = new DummyObject("foo", key);
+			final int currentIteration = i;
+
+			final NotificationKey notificationKey = new NotificationKey() {
+				@Override
+				public String toString() {
+					return key.toString();
+				}
+			};
+			expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+			dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+				@Override
+				public Void inTransaction(DummySqlTest transactional,
+						TransactionStatus status) throws Exception {
+
+					transactional.insertDummy(obj);
+					queue.recordFutureNotificationFromTransaction(transactional,
+							now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
+					return null;
+				}
+			});
+
+			// Move time in the future after the notification effectiveDate
+			if (i == 0) {
+				((ClockMock) clock).setDeltaFromReality(nextReadyTimeIncrementMs);
+			} else {
+				((ClockMock) clock).addDeltaFromReality(nextReadyTimeIncrementMs);
+			}
+		}
+
+		// Wait a little longer since there are a lot of callback that need to happen
+		int nbTry = MAX_NOTIFICATIONS + 1;
+		boolean success = false;
+		do {
+			synchronized(expectedNotifications) {
+
+				Collection<Boolean> completed =  Collections2.filter(expectedNotifications.values(), new Predicate<Boolean>() {
+					@Override
+					public boolean apply(Boolean input) {
+						return input;
+					}
+				});
+
+				if (completed.size() == MAX_NOTIFICATIONS) {
+					success = true;
+					break;
+				}
+				//log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+				expectedNotifications.wait(1000);
+			}
+		} while (nbTry-- > 0);
+		assertEquals(success, true);
+
+	}
+
+	NotificationConfig getNotificationConfig(final boolean off,
+			final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
+		return new NotificationConfig() {
+			@Override
+			public boolean isNotificationProcessingOff() {
+				return off;
+			}
+			@Override
+			public long getNotificationSleepTimeMs() {
+				return sleepTime;
+			}
+			@Override
+			public int getDaoMaxReadyEvents() {
+				return maxReadyEvents;
+			}
+			@Override
+			public long getDaoClaimTimeMs() {
+				return claimTimeMs;
+			}
+		};
+	}
+
+
+	public static class TestNotificationQueueModule extends AbstractModule {
+		@Override
+		protected void configure() {
+
+			bind(Clock.class).to(ClockMock.class);
+
+			final MysqlTestingHelper helper = new MysqlTestingHelper();
+			bind(MysqlTestingHelper.class).toInstance(helper);
+			DBI dbi = helper.getDBI();
+			bind(DBI.class).toInstance(dbi);
+			/*
             bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
             final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
             bind(DbiConfig.class).toInstance(config);
-            */
-        }
-    }
+			 */
+		}
+	}
 
 
 }