killbill-memoizeit

Changes

account/pom.xml 2(+1 -1)

api/pom.xml 2(+1 -1)

beatrix/pom.xml 24(+23 -1)

catalog/pom.xml 2(+1 -1)

invoice/pom.xml 2(+1 -1)

payment/pom.xml 2(+1 -1)

pom.xml 2(+1 -1)

util/pom.xml 2(+1 -1)

Details

account/pom.xml 2(+1 -1)

diff --git a/account/pom.xml b/account/pom.xml
index 8f2eeb1..0ac4f64 100644
--- a/account/pom.xml
+++ b/account/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.5-SNAPSHOT</version>
+        <version>0.1.6-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-account</artifactId>
diff --git a/analytics/pom.xml b/analytics/pom.xml
index 18bd687..9ee654b 100644
--- a/analytics/pom.xml
+++ b/analytics/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.5-SNAPSHOT</version>
+        <version>0.1.6-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-analytics</artifactId>
diff --git a/analytics/src/test/java/com/ning/billing/analytics/MockSubscription.java b/analytics/src/test/java/com/ning/billing/analytics/MockSubscription.java
index 7197ec6..f592c41 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/MockSubscription.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/MockSubscription.java
@@ -133,7 +133,8 @@ public class MockSubscription implements Subscription
     public List<SubscriptionTransition> getAllTransitions() {
         throw new UnsupportedOperationException();
      }
-    
+
+    @Override
     public SubscriptionTransition getPendingTransition() {
         throw new UnsupportedOperationException();
     }
@@ -147,4 +148,9 @@ public class MockSubscription implements Subscription
 	public DateTime getPaidThroughDate() {
 		throw new UnsupportedOperationException();
 	}
+
+    @Override
+    public SubscriptionTransition getPreviousTransition() {
+        return null;
+    }
 }

api/pom.xml 2(+1 -1)

diff --git a/api/pom.xml b/api/pom.xml
index afbca6a..ca5b331 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.5-SNAPSHOT</version>
+        <version>0.1.6-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-api</artifactId>
diff --git a/api/src/main/java/com/ning/billing/catalog/api/Catalog.java b/api/src/main/java/com/ning/billing/catalog/api/Catalog.java
index 3c04a74..2b3609a 100644
--- a/api/src/main/java/com/ning/billing/catalog/api/Catalog.java
+++ b/api/src/main/java/com/ning/billing/catalog/api/Catalog.java
@@ -24,11 +24,11 @@ public interface Catalog {
     //
     public abstract String getCatalogName();
 
-    public abstract Currency[] getSupportedCurrencies(DateTime requestedDate);
+    public abstract Currency[] getSupportedCurrencies(DateTime requestedDate) throws CatalogApiException;
 
-	public abstract Product[] getProducts(DateTime requestedDate);
+	public abstract Product[] getProducts(DateTime requestedDate) throws CatalogApiException;
 	
-	public abstract Plan[] getPlans(DateTime requestedDate);
+	public abstract Plan[] getPlans(DateTime requestedDate) throws CatalogApiException;
 
 	
 	//
diff --git a/api/src/main/java/com/ning/billing/catalog/api/StaticCatalog.java b/api/src/main/java/com/ning/billing/catalog/api/StaticCatalog.java
index 0db7db5..c93fde8 100644
--- a/api/src/main/java/com/ning/billing/catalog/api/StaticCatalog.java
+++ b/api/src/main/java/com/ning/billing/catalog/api/StaticCatalog.java
@@ -25,13 +25,13 @@ public interface StaticCatalog {
     //
     public abstract String getCatalogName();
     
-    public abstract Date getEffectiveDate();
+    public abstract Date getEffectiveDate() throws CatalogApiException;
 
-    public abstract Currency[] getCurrentSupportedCurrencies();
+    public abstract Currency[] getCurrentSupportedCurrencies() throws CatalogApiException;
 
-	public abstract Product[] getCurrentProducts();
+	public abstract Product[] getCurrentProducts() throws CatalogApiException;
 	
-	public abstract Plan[] getCurrentPlans();
+	public abstract Plan[] getCurrentPlans() throws CatalogApiException;
 	
 	//
 	// Find a plan
diff --git a/api/src/main/java/com/ning/billing/config/EntitlementConfig.java b/api/src/main/java/com/ning/billing/config/EntitlementConfig.java
index 42399ce..1b6f3e2 100644
--- a/api/src/main/java/com/ning/billing/config/EntitlementConfig.java
+++ b/api/src/main/java/com/ning/billing/config/EntitlementConfig.java
@@ -33,7 +33,7 @@ public interface EntitlementConfig {
     @Default("500")
     public long getNotificationSleepTimeMs();
 
-    @Config("killbill.entitlement.engine.events.off")
+    @Config("killbill.notifications.off")
     @Default("false")
     public boolean isEventProcessingOff();
 }
diff --git a/api/src/main/java/com/ning/billing/config/InvoiceConfig.java b/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
index a2b4270..78cc02f 100644
--- a/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
+++ b/api/src/main/java/com/ning/billing/config/InvoiceConfig.java
@@ -33,7 +33,7 @@ public interface InvoiceConfig {
     @Default("500")
     public long getNotificationSleepTimeMs();
 
-    @Config("killbill.invoice.engine.events.off")
+    @Config("killbill.notifications.off")
     @Default("false")
     public boolean isEventProcessingOff();
 }
diff --git a/api/src/main/java/com/ning/billing/entitlement/api/EntitlementService.java b/api/src/main/java/com/ning/billing/entitlement/api/EntitlementService.java
index 53854d1..28084b2 100644
--- a/api/src/main/java/com/ning/billing/entitlement/api/EntitlementService.java
+++ b/api/src/main/java/com/ning/billing/entitlement/api/EntitlementService.java
@@ -18,7 +18,6 @@ package com.ning.billing.entitlement.api;
 
 import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
 import com.ning.billing.entitlement.api.migration.EntitlementMigrationApi;
-import com.ning.billing.entitlement.api.test.EntitlementTestApi;
 import com.ning.billing.entitlement.api.user.EntitlementUserApi;
 import com.ning.billing.lifecycle.KillbillService;
 
@@ -31,7 +30,5 @@ public interface EntitlementService extends KillbillService {
 
     public EntitlementBillingApi getBillingApi();
 
-    public EntitlementTestApi getTestApi();
-
     public EntitlementMigrationApi getMigrationApi();
 }
diff --git a/api/src/main/java/com/ning/billing/entitlement/api/user/Subscription.java b/api/src/main/java/com/ning/billing/entitlement/api/user/Subscription.java
index 41ecd78..5c626fc 100644
--- a/api/src/main/java/com/ning/billing/entitlement/api/user/Subscription.java
+++ b/api/src/main/java/com/ning/billing/entitlement/api/user/Subscription.java
@@ -64,7 +64,7 @@ public interface Subscription {
     public String getCurrentPriceList();
 
     public PlanPhase getCurrentPhase();
-    
+
     public DateTime getChargedThroughDate();
 
     public DateTime getPaidThroughDate();
@@ -76,4 +76,5 @@ public interface Subscription {
 
     public SubscriptionTransition getPendingTransition();
 
+    public SubscriptionTransition getPreviousTransition();
 }
diff --git a/api/src/main/java/com/ning/billing/ErrorCode.java b/api/src/main/java/com/ning/billing/ErrorCode.java
index 3209c8c..f7825e9 100644
--- a/api/src/main/java/com/ning/billing/ErrorCode.java
+++ b/api/src/main/java/com/ning/billing/ErrorCode.java
@@ -29,7 +29,8 @@ public enum ErrorCode {
      *
      */
     /* Generic through APIs */
-    ENT_INVALID_REQUESTED_DATE(1001, "Requested in the future is not allowed : %s"),
+    ENT_INVALID_REQUESTED_FUTURE_DATE(1001, "Requested date %s in the future is not allowed"),
+    ENT_INVALID_REQUESTED_DATE(1001, "Requested date %s is not allowed to be prior to the previous transition %s"),
 
     /* Creation */
     ENT_CREATE_BAD_PHASE(1011, "Can't create plan initial phase %s"),
@@ -93,12 +94,12 @@ public enum ErrorCode {
      */
     CAT_NO_CATALOG_FOR_GIVEN_DATE(2050, "There is no catalog version that applies for the given date '%s'"),
     CAT_NO_CATALOG_ENTRIES_FOR_GIVEN_DATE(2051, "The are no catalog entries that apply for the given date '%s'"),
-    CAT_CATALOG_NAME_MISMATCH(2052, "The catalog name '%s' does not match the name of the catalog we are trying to add '%s'"),  
+    CAT_CATALOG_NAME_MISMATCH(2052, "The catalog name '%s' does not match the name of the catalog we are trying to add '%s'"),
     /*
      * Billing Alignment
      */
     CAT_INVALID_BILLING_ALIGNMENT(2060, "Invalid billing alignment '%s'"),
-    
+
    /*
     *
     * Range 3000 : ACCOUNT
@@ -120,7 +121,7 @@ public enum ErrorCode {
     TAG_DEFINITION_ALREADY_EXISTS(3901, "The tag definition name already exists (name: %s)"),
     TAG_DEFINITION_DOES_NOT_EXIST(3902, "The tag definition name does not exist (name: %s)"),
     TAG_DEFINITION_IN_USE(3903, "The tag definition name is currently in use (name: %s)"),
-   
+
    /*
     *
     * Range 4000: INVOICE
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaypalPaymentMethodInfo.java b/api/src/main/java/com/ning/billing/payment/api/PaypalPaymentMethodInfo.java
index 0977071..3b2ebed 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaypalPaymentMethodInfo.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaypalPaymentMethodInfo.java
@@ -16,6 +16,9 @@
 
 package com.ning.billing.payment.api;
 
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
 import com.google.common.base.Strings;
 
 
@@ -54,11 +57,12 @@ public final class PaypalPaymentMethodInfo extends PaymentMethodInfo {
     private final String baid;
     private final String email;
 
-    public PaypalPaymentMethodInfo(String id,
-                                   String accountId,
-                                   Boolean defaultMethod,
-                                   String baid,
-                                   String email) {
+    @JsonCreator
+    public PaypalPaymentMethodInfo(@JsonProperty("id") String id,
+                                   @JsonProperty("accountId") String accountId,
+                                   @JsonProperty("defaultMethod") Boolean defaultMethod,
+                                   @JsonProperty("baid") String baid,
+                                   @JsonProperty("email") String email) {
         super(id, accountId, defaultMethod, TYPE);
 
         if (Strings.isNullOrEmpty(baid) || Strings.isNullOrEmpty(email)) {

beatrix/pom.xml 24(+23 -1)

diff --git a/beatrix/pom.xml b/beatrix/pom.xml
index c964ba4..64a7b38 100644
--- a/beatrix/pom.xml
+++ b/beatrix/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.5-SNAPSHOT</version>
+        <version>0.1.6-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-beatrix</artifactId>
@@ -98,11 +98,33 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.mysql</groupId>
+            <artifactId>management</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.mysql</groupId>
+            <artifactId>management-dbfiles</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>runtime</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <groups>fast,slow</groups>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-jar-plugin</artifactId>
                 <executions>
                     <execution>
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/MockModule.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/MockModule.java
index 54f6dae..0cb1b01 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/MockModule.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/MockModule.java
@@ -36,6 +36,7 @@ import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.glue.CatalogModule;
 import com.ning.billing.dbi.DBIProvider;
 import com.ning.billing.dbi.DbiConfig;
+import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.entitlement.api.EntitlementService;
 import com.ning.billing.entitlement.glue.EntitlementModule;
 import com.ning.billing.invoice.api.InvoiceService;
@@ -66,9 +67,19 @@ public class MockModule extends AbstractModule {
         bind(Clock.class).to(ClockMock.class).asEagerSingleton();
         bind(ClockMock.class).asEagerSingleton();
         bind(Lifecycle.class).to(SubsetDefaultLifecycle.class).asEagerSingleton();
-        bind(IDBI.class).toProvider(DBIProvider.class).asEagerSingleton();
-        final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
-        bind(DbiConfig.class).toInstance(config);
+
+
+        final MysqlTestingHelper helper = new MysqlTestingHelper();
+        bind(MysqlTestingHelper.class).toInstance(helper);
+        if (helper.isUsingLocalInstance()) {
+            bind(IDBI.class).toProvider(DBIProvider.class).asEagerSingleton();
+            final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
+            bind(DbiConfig.class).toInstance(config);
+        } else {
+            final IDBI dbi = helper.getDBI();
+            bind(IDBI.class).toInstance(dbi);
+        }
+
         install(new GlobalLockerModule());
         install(new BusModule());
         install(new NotificationQueueModule());
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java
index ecf16ae..d04d768 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java
@@ -16,17 +16,28 @@
 
 package com.ning.billing.beatrix.integration;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
 
 import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 
+import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.invoice.api.InvoiceItem;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
 import org.joda.time.Interval;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
@@ -92,6 +103,9 @@ public class TestBasic {
     @Inject
     private AccountService accountService;
 
+    @Inject
+    private MysqlTestingHelper helper;
+
     private EntitlementUserApi entitlementUserApi;
 
     private InvoiceUserApi invoiceUserApi;
@@ -102,9 +116,30 @@ public class TestBasic {
 
 
 
+    private void setupMySQL() throws IOException
+    {
+
+
+        final String accountDdl = IOUtils.toString(TestBasic.class.getResourceAsStream("/com/ning/billing/account/ddl.sql"));
+        final String entitlementDdl = IOUtils.toString(TestBasic.class.getResourceAsStream("/com/ning/billing/entitlement/ddl.sql"));
+        final String invoiceDdl = IOUtils.toString(TestBasic.class.getResourceAsStream("/com/ning/billing/invoice/ddl.sql"));
+        final String paymentDdl = IOUtils.toString(TestBasic.class.getResourceAsStream("/com/ning/billing/payment/ddl.sql"));
+        final String utilDdl = IOUtils.toString(TestBasic.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
+
+        helper.startMysql();
+
+        helper.initDb(accountDdl);
+        helper.initDb(entitlementDdl);
+        helper.initDb(invoiceDdl);
+        helper.initDb(paymentDdl);
+        helper.initDb(utilDdl);
+    }
+
     @BeforeSuite(alwaysRun = true)
     public void setup() throws Exception{
 
+        setupMySQL();
+
         /**
          * Initialize lifecyle for subset of services
          */
@@ -113,6 +148,8 @@ public class TestBasic {
         busService.getBus().register(busHandler);
         lifecycle.fireStartupSequencePostEventRegistration();
 
+
+
         /**
          * Retrieve APIs
          */
@@ -150,7 +187,7 @@ public class TestBasic {
             public Void inTransaction(Handle h, TransactionStatus status)
                     throws Exception {
                 h.execute("truncate table accounts");
-                h.execute("truncate table events");
+                h.execute("truncate table entitlement_events");
                 h.execute("truncate table subscriptions");
                 h.execute("truncate table bundles");
                 h.execute("truncate table notifications");
@@ -169,60 +206,99 @@ public class TestBasic {
         });
     }
 
-    private DateTime checkAndGetCTD(UUID subscriptionId) {
-
+    private void verifyTestResult(UUID accountId, UUID subscriptionId,
+                                  DateTime startDate, DateTime endDate,
+                                  BigDecimal amount, DateTime chargeThroughDate) {
         SubscriptionData subscription = (SubscriptionData) entitlementUserApi.getSubscriptionFromId(subscriptionId);
+
+        List<InvoiceItem> invoiceItems = invoiceUserApi.getInvoiceItemsByAccount(accountId);
+        boolean wasFound = false;
+
+        Iterator<InvoiceItem> invoiceItemIterator = invoiceItems.iterator();
+        while (invoiceItemIterator.hasNext()) {
+            InvoiceItem item = invoiceItemIterator.next();
+            if (item.getStartDate().compareTo(removeMillis(startDate)) == 0) {
+                if (item.getEndDate().compareTo(removeMillis(endDate)) == 0) {
+                    if (item.getAmount().compareTo(amount) == 0) {
+                        wasFound = true;
+                        break;
+                    }
+                }
+            }
+        }
+
+        assertTrue(wasFound);
+
         DateTime ctd = subscription.getChargedThroughDate();
         assertNotNull(ctd);
         log.info("Checking CTD: " + ctd.toString() + "; clock is " + clock.getUTCNow().toString());
         assertTrue(clock.getUTCNow().isBefore(ctd));
-        return ctd;
+        assertTrue(ctd.compareTo(removeMillis(chargeThroughDate)) == 0);
+    }
+
+    private DateTime removeMillis(DateTime input) {
+        return input.toMutableDateTime().millisOfSecond().set(0).toDateTime();
     }
 
     @Test(groups = "fast", enabled = false)
     public void testBasePlanCompleteWithBillingDayInPast() throws Exception {
-        testBasePlanComplete(clock.getUTCNow().minusDays(1).getDayOfMonth());
+        DateTime startDate = new DateTime(2012, 2, 1, 0, 3, 42, 0);
+        testBasePlanComplete(startDate, 31, false);
     }
 
     @Test(groups = "fast", enabled = false)
     public void testBasePlanCompleteWithBillingDayPresent() throws Exception {
-        testBasePlanComplete(clock.getUTCNow().getDayOfMonth());
+        DateTime startDate = new DateTime(2012, 2, 1, 0, 3, 42, 0);
+        testBasePlanComplete(startDate, 1, false);
     }
 
     @Test(groups = "fast", enabled = false)
     public void testBasePlanCompleteWithBillingDayAlignedWithTrial() throws Exception {
-        testBasePlanComplete(clock.getUTCNow().plusDays(30).getDayOfMonth());
+        DateTime startDate = new DateTime(2012, 2, 1, 0, 3, 42, 0);
+        testBasePlanComplete(startDate, 2, false);
     }
 
     @Test(groups = "fast", enabled = false)
     public void testBasePlanCompleteWithBillingDayInFuture() throws Exception {
-        testBasePlanComplete(clock.getUTCNow().plusDays(2).getDayOfMonth());
+        DateTime startDate = new DateTime(2012, 2, 1, 0, 3, 42, 0);
+        testBasePlanComplete(startDate, 3, true);
     }
 
-
     private void waitForDebug() throws Exception {
         Thread.sleep(600000);
     }
 
-
     @Test(groups = "stress", enabled = false)
     public void stressTest() throws Exception {
-        final int maxIterations = 3;
+        final int maxIterations = 7;
         int curIteration = maxIterations;
         for (curIteration = 0; curIteration < maxIterations; curIteration++) {
             log.info("################################  ITERATION " + curIteration + "  #########################");
-            setupTest();
             Thread.sleep(1000);
+            setupTest();
             testBasePlanCompleteWithBillingDayPresent();
+            Thread.sleep(1000);
+            setupTest();
+            testBasePlanCompleteWithBillingDayInPast();
+            Thread.sleep(1000);
+            setupTest();
+            testBasePlanCompleteWithBillingDayAlignedWithTrial();
+            Thread.sleep(1000);
+            setupTest();
+            testBasePlanCompleteWithBillingDayInFuture();
         }
     }
 
-    private void testBasePlanComplete(int billingDay) throws Exception {
-        long DELAY = 5000;
+    private void testBasePlanComplete(DateTime initialCreationDate, int billingDay,
+                                      boolean proRationExpected) throws Exception {
+        long DELAY = 5000 * 10;
 
         Account account = accountUserApi.createAccount(getAccountData(billingDay), null, null);
+        UUID accountId = account.getId();
         assertNotNull(account);
 
+        // set clock to the initial start date
+        clock.setDeltaFromReality(initialCreationDate.getMillis() - DateTime.now().getMillis());
         SubscriptionBundle bundle = entitlementUserApi.createBundleForAccount(account.getId(), "whatever");
 
         String productName = "Shotgun";
@@ -238,17 +314,16 @@ public class TestBasic {
                 new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSetName, null), null);
         assertNotNull(subscription);
 
-
-        //waitForDebug();
-
         assertTrue(busHandler.isCompleted(DELAY));
         log.info("testSimple passed first busHandler checkpoint.");
 
         //
         // VERIFY CTD HAS BEEN SET
         //
-
-        checkAndGetCTD(subscription.getId());
+        DateTime startDate = subscription.getCurrentPhaseStart();
+        DateTime endDate = startDate.plusDays(30);
+        BigDecimal price = subscription.getCurrentPhase().getFixedPrice().getPrice(Currency.USD);
+        verifyTestResult(accountId, subscription.getId(), startDate, endDate, price, endDate);
 
         //
         // CHANGE PLAN IMMEDIATELY AND EXPECT BOTH EVENTS: NextEvent.CHANGE NextEvent.INVOICE
@@ -267,7 +342,10 @@ public class TestBasic {
         //
         // VERIFY AGAIN CTD HAS BEEN SET
         //
-        DateTime ctd = checkAndGetCTD(subscription.getId());
+        startDate = subscription.getCurrentPhaseStart();
+        endDate = startDate.plusMonths(1);
+        price = subscription.getCurrentPhase().getFixedPrice().getPrice(Currency.USD);
+        verifyTestResult(accountId, subscription.getId(), startDate, endDate, price, endDate);
 
         //
         // MOVE TIME TO AFTER TRIAL AND EXPECT BOTH EVENTS :  NextEvent.PHASE NextEvent.INVOICE
@@ -275,6 +353,12 @@ public class TestBasic {
         busHandler.pushExpectedEvent(NextEvent.PHASE);
         busHandler.pushExpectedEvent(NextEvent.INVOICE);
         busHandler.pushExpectedEvent(NextEvent.PAYMENT);
+
+        if (proRationExpected) {
+            busHandler.pushExpectedEvent(NextEvent.INVOICE);
+            busHandler.pushExpectedEvent(NextEvent.PAYMENT);
+        }
+
         clock.setDeltaFromReality(AT_LEAST_ONE_MONTH_MS);
 
         assertTrue(busHandler.isCompleted(DELAY));
@@ -307,13 +391,14 @@ public class TestBasic {
         // MOVE TIME AFTER NEXT BILL CYCLE DAY AND EXPECT EVENT : NextEvent.INVOICE
         //
         int maxCycles = 3;
-        DateTime lastCtd = null;
+        startDate = endDate;
+        endDate = startDate.plusMonths(1);
         do {
             busHandler.pushExpectedEvent(NextEvent.INVOICE);
             busHandler.pushExpectedEvent(NextEvent.PAYMENT);
             clock.addDeltaFromReality(AT_LEAST_ONE_MONTH_MS + 1000);
             assertTrue(busHandler.isCompleted(DELAY));
-            lastCtd = checkAndGetCTD(subscription.getId());
+            verifyTestResult(accountId, subscription.getId(), startDate, endDate, price, endDate);
         } while (maxCycles-- > 0);
 
         //
@@ -324,7 +409,7 @@ public class TestBasic {
 
         // MOVE AFTER CANCEL DATE AND EXPECT EVENT : NextEvent.CANCEL
         busHandler.pushExpectedEvent(NextEvent.CANCEL);
-        Interval it = new Interval(clock.getUTCNow(), lastCtd);
+        Interval it = new Interval(clock.getUTCNow(), endDate);
         clock.addDeltaFromReality(it.toDurationMillis());
         assertTrue(busHandler.isCompleted(DELAY));
 
@@ -335,12 +420,15 @@ public class TestBasic {
         clock.addDeltaFromReality(AT_LEAST_ONE_MONTH_MS + 1000);
         assertTrue(busHandler.isCompleted(DELAY));
 
-
         subscription = (SubscriptionData) entitlementUserApi.getSubscriptionFromId(subscription.getId());
-        lastCtd = subscription.getChargedThroughDate();
+        DateTime lastCtd = subscription.getChargedThroughDate();
         assertNotNull(lastCtd);
         log.info("Checking CTD: " + lastCtd.toString() + "; clock is " + clock.getUTCNow().toString());
         assertTrue(lastCtd.isBefore(clock.getUTCNow()));
+
+        // The invoice system is still working to verify there is nothing to do
+        Thread.sleep(DELAY);
+        log.info("TEST PASSED !");
     }
 
     @Test(enabled=false)
@@ -380,7 +468,6 @@ public class TestBasic {
 
     }
 
-
     protected AccountData getAccountData(final int billingDay) {
 
         final String someRandomKey = RandomStringUtils.randomAlphanumeric(10);
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/lifecycle/TestLifecycle.java b/beatrix/src/test/java/com/ning/billing/beatrix/lifecycle/TestLifecycle.java
index 9bf7c10..f60d61f 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/lifecycle/TestLifecycle.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/lifecycle/TestLifecycle.java
@@ -121,7 +121,7 @@ public class TestLifecycle {
 
 
 
-    @BeforeClass
+    @BeforeClass(alwaysRun=true)
     public void setup() {
         final Injector g = Guice.createInjector(Stage.DEVELOPMENT, new TestLifecycleModule());
         s1 = g.getInstance(Service1.class);

catalog/pom.xml 2(+1 -1)

diff --git a/catalog/pom.xml b/catalog/pom.xml
index 635fac3..5d19c5b 100644
--- a/catalog/pom.xml
+++ b/catalog/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.5-SNAPSHOT</version>
+        <version>0.1.6-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-catalog</artifactId>
diff --git a/catalog/src/main/java/com/ning/billing/catalog/VersionedCatalog.java b/catalog/src/main/java/com/ning/billing/catalog/VersionedCatalog.java
index f652347..ef9b125 100644
--- a/catalog/src/main/java/com/ning/billing/catalog/VersionedCatalog.java
+++ b/catalog/src/main/java/com/ning/billing/catalog/VersionedCatalog.java
@@ -66,24 +66,16 @@ public class VersionedCatalog extends ValidatingConfig<StandaloneCatalog> implem
 
 	public VersionedCatalog(Clock clock) {
 		this.clock = clock;
-		StandaloneCatalog baseline = new StandaloneCatalog(new Date(0)); // init with an empty catalog may need to 
-													 // populate some empty pieces here to make validation work
-		try {
-			add(baseline);
-		} catch (CatalogApiException e) {
-			// This should never happen
-			log.error("This error should never happpen", e);
-		} 
 	}
 
 	//
 	// Private methods
 	//
-	private StandaloneCatalog versionForDate(DateTime date) {
+	private StandaloneCatalog versionForDate(DateTime date) throws CatalogApiException {
 		return versions.get(indexOfVersionForDate(date.toDate()));
 	}
 
-	private List<StandaloneCatalog> versionsBeforeDate(Date date) {
+	private List<StandaloneCatalog> versionsBeforeDate(Date date) throws CatalogApiException {
 		List<StandaloneCatalog> result = new ArrayList<StandaloneCatalog>();
 		int index = indexOfVersionForDate(date);
 		for(int i = 0; i <= index; i++) {
@@ -92,14 +84,14 @@ public class VersionedCatalog extends ValidatingConfig<StandaloneCatalog> implem
 		return result;
 	}
 
-	private int indexOfVersionForDate(Date date) {
-		for(int i = 1; i < versions.size(); i++) {
+	private int indexOfVersionForDate(Date date) throws CatalogApiException {
+		for(int i = versions.size() - 1; i >= 0; i--) {
 			StandaloneCatalog c = versions.get(i);
-			if(c.getEffectiveDate().getTime() > date.getTime()) {
-				return i - 1;
+			if(c.getEffectiveDate().getTime() < date.getTime()) {
+				return i;
 			}
 		}
-		return versions.size() - 1;
+		throw new CatalogApiException(ErrorCode.CAT_NO_CATALOG_FOR_GIVEN_DATE, date.toString());
 	}
 	
 	private class PlanRequestWrapper {
@@ -205,17 +197,17 @@ public class VersionedCatalog extends ValidatingConfig<StandaloneCatalog> implem
 	}
 
 	@Override
-	public DefaultProduct[] getProducts(DateTime requestedDate) {
+	public DefaultProduct[] getProducts(DateTime requestedDate) throws CatalogApiException {
 		return versionForDate(requestedDate).getCurrentProducts();
 	}
 
 	@Override
-	public Currency[] getSupportedCurrencies(DateTime requestedDate) {
+	public Currency[] getSupportedCurrencies(DateTime requestedDate) throws CatalogApiException {
 		return versionForDate(requestedDate).getCurrentSupportedCurrencies();
 	}
 
 	@Override
-	public DefaultPlan[] getPlans(DateTime requestedDate) {
+	public DefaultPlan[] getPlans(DateTime requestedDate) throws CatalogApiException {
 		return versionForDate(requestedDate).getCurrentPlans();
 	}
 
@@ -350,22 +342,22 @@ public class VersionedCatalog extends ValidatingConfig<StandaloneCatalog> implem
 	// Static catalog API
 	//
 	@Override
-	public Date getEffectiveDate() {
+	public Date getEffectiveDate() throws CatalogApiException {
 		return versionForDate(clock.getUTCNow()).getEffectiveDate();
 	}
 
 	@Override
-	public Currency[] getCurrentSupportedCurrencies() {
+	public Currency[] getCurrentSupportedCurrencies() throws CatalogApiException {
 		return versionForDate(clock.getUTCNow()).getCurrentSupportedCurrencies();
 	}
 
 	@Override
-	public Product[] getCurrentProducts() {
+	public Product[] getCurrentProducts() throws CatalogApiException {
 		return versionForDate(clock.getUTCNow()).getCurrentProducts() ;
 	}
 
 	@Override
-	public Plan[] getCurrentPlans() {
+	public Plan[] getCurrentPlans() throws CatalogApiException {
 		return versionForDate(clock.getUTCNow()).getCurrentPlans();
 	}
 
diff --git a/catalog/src/test/java/com/ning/billing/catalog/io/TestVersionedCatalogLoader.java b/catalog/src/test/java/com/ning/billing/catalog/io/TestVersionedCatalogLoader.java
index 9708b99..0d5ac66 100644
--- a/catalog/src/test/java/com/ning/billing/catalog/io/TestVersionedCatalogLoader.java
+++ b/catalog/src/test/java/com/ning/billing/catalog/io/TestVersionedCatalogLoader.java
@@ -121,9 +121,8 @@ public class TestVersionedCatalogLoader {
 	@Test(enabled=true)
 	public void testLoad() throws MalformedURLException, IOException, SAXException, InvalidConfigException, JAXBException, TransformerException, URISyntaxException, ServiceException {
 		VersionedCatalog c = loader.load(Resources.getResource("versionedCatalog").toString());
-		assertEquals(4, c.size());
+		assertEquals(3, c.size());
 		Iterator<StandaloneCatalog> it = c.iterator();
-		it.next(); //discard the baseline
 		DateTime dt = new DateTime("2011-01-01T00:00:00+00:00");
 		assertEquals(dt.toDate(),it.next().getEffectiveDate());
 		dt = new DateTime("2011-02-02T00:00:00+00:00");
diff --git a/catalog/src/test/java/com/ning/billing/catalog/TestVersionedCatalog.java b/catalog/src/test/java/com/ning/billing/catalog/TestVersionedCatalog.java
index ddd623e..fbc99df 100644
--- a/catalog/src/test/java/com/ning/billing/catalog/TestVersionedCatalog.java
+++ b/catalog/src/test/java/com/ning/billing/catalog/TestVersionedCatalog.java
@@ -35,6 +35,7 @@ import org.testng.annotations.Test;
 import org.xml.sax.SAXException;
 
 import com.google.common.io.Resources;
+import com.ning.billing.ErrorCode;
 import com.ning.billing.catalog.api.CatalogApiException;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.catalog.api.InvalidConfigException;
@@ -53,14 +54,14 @@ public class TestVersionedCatalog {
 		vc = loader.load(Resources.getResource("versionedCatalog").toString());
 	}
 
-	@Test(enabled=true)
+	@Test(groups={"fast"},enabled=true)
 	public void testAddCatalog() throws MalformedURLException, IOException, SAXException, InvalidConfigException, JAXBException, TransformerException, URISyntaxException, ServiceException, CatalogApiException {
 		vc.add(new StandaloneCatalog(new Date()));
-		assertEquals(5, vc.size());
+		assertEquals(4, vc.size());
 	}
 	
 		
-	@Test(enabled=true)
+	@Test(groups={"fast"},enabled=true)
 	public void testFindPlanWithDates() throws Exception {
 		DateTime dt0= new DateTime("2010-01-01T00:00:00+00:00");
 		DateTime dt1 = new DateTime("2011-01-01T00:01:00+00:00");
@@ -97,6 +98,18 @@ public class TestVersionedCatalog {
 		Assert.assertEquals(exSubPlan214.getAllPhases()[1].getRecurringPrice().getPrice(Currency.USD), new BigDecimal("2.0"));
 		Assert.assertEquals(exSubPlan3.getAllPhases()[1].getRecurringPrice().getPrice(Currency.USD), new BigDecimal("2.0"));
 
-		
+	}
+	
+	@Test(groups={"fast"},enabled=true)
+	public void testErrorOnDateTooEarly() {
+		DateTime dt0= new DateTime("1977-01-01T00:00:00+00:00");
+		try {
+			vc.findPlan("foo", dt0);
+			Assert.fail("Date is too early an exception should have been thrown");
+		} catch (CatalogApiException e) {
+			e.printStackTrace();
+			Assert.assertEquals(e.getCode(), ErrorCode.CAT_NO_CATALOG_FOR_GIVEN_DATE.getCode());
+
+		}
 	}
 }
diff --git a/entitlement/pom.xml b/entitlement/pom.xml
index 83388f8..f8a781f 100644
--- a/entitlement/pom.xml
+++ b/entitlement/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.5-SNAPSHOT</version>
+        <version>0.1.6-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-entitlement</artifactId>
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionApiService.java
index d2af1a6..3143b7c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionApiService.java
@@ -99,10 +99,7 @@ public class SubscriptionApiService {
 
             DateTime now = clock.getUTCNow();
             requestedDate = (requestedDate != null) ? DefaultClock.truncateMs(requestedDate) : now;
-            // STEPH needs to check if requestedDate is before last 'erasable event'?
-            if (requestedDate != null && requestedDate.isAfter(now)) {
-                throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_DATE, requestedDate.toString());
-            }
+            validateRequestedDateOnChangeOrCancel(subscription, now, requestedDate);
 
             Plan currentPlan = subscription.getCurrentPlan();
             PlanPhaseSpecifier planPhase = new PlanPhaseSpecifier(currentPlan.getProduct().getName(),
@@ -160,6 +157,7 @@ public class SubscriptionApiService {
         subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId()), catalogService.getFullCatalog());
     }
 
+
     public void changePlan(SubscriptionData subscription, String productName, BillingPeriod term,
             String priceList, DateTime requestedDate)
         throws EntitlementUserApiException {
@@ -169,10 +167,7 @@ public class SubscriptionApiService {
 
             DateTime now = clock.getUTCNow();
             requestedDate = (requestedDate != null) ? DefaultClock.truncateMs(requestedDate) : now;
-            // STEPH needs to check if requestedDate is before last 'erasable event'?
-            if (requestedDate != null && requestedDate.isAfter(now)) {
-                throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_DATE, requestedDate.toString());
-            }
+            validateRequestedDateOnChangeOrCancel(subscription, now, requestedDate);
 
             String currentPriceList = subscription.getCurrentPriceList();
 
@@ -207,7 +202,7 @@ public class SubscriptionApiService {
             PriceList newPriceList = planChangeResult.getNewPriceList();
 
             Plan newPlan = catalogService.getFullCatalog().findPlan(productName, term, newPriceList.getName(), requestedDate);
-            DateTime effectiveDate = subscription.getPlanChangeEffectiveDate(policy, now);
+            DateTime effectiveDate = subscription.getPlanChangeEffectiveDate(policy, requestedDate);
 
             TimedPhase currentTimedPhase = planAligner.getCurrentTimedPhaseOnChange(subscription, newPlan, newPriceList.getName(), requestedDate, effectiveDate);
 
@@ -237,4 +232,18 @@ public class SubscriptionApiService {
             throw new EntitlementUserApiException(e);
         }
     }
+
+    private void validateRequestedDateOnChangeOrCancel(SubscriptionData subscription, DateTime now, DateTime requestedDate)
+        throws EntitlementUserApiException {
+
+        if (requestedDate.isAfter(now) ) {
+            throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_FUTURE_DATE, requestedDate.toString());
+        }
+
+        SubscriptionTransition previousTransition = subscription.getPreviousTransition();
+        if (previousTransition.getEffectiveTransitionTime().isAfter(requestedDate)) {
+            throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_DATE,
+                    requestedDate.toString(), previousTransition.getEffectiveTransitionTime());
+        }
+    }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
index 342c9cc..f4bb22b 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
@@ -104,29 +104,29 @@ public class SubscriptionData implements Subscription {
 
     @Override
     public SubscriptionState getState() {
-        return (getLatestTransition() == null) ? null : getLatestTransition().getNextState();
+        return (getPreviousTransition() == null) ? null : getPreviousTransition().getNextState();
     }
 
     @Override
     public PlanPhase getCurrentPhase() {
-        return (getLatestTransition() == null) ? null : getLatestTransition().getNextPhase();
+        return (getPreviousTransition() == null) ? null : getPreviousTransition().getNextPhase();
     }
 
 
     @Override
     public Plan getCurrentPlan() {
-        return (getLatestTransition() == null) ? null : getLatestTransition().getNextPlan();
+        return (getPreviousTransition() == null) ? null : getPreviousTransition().getNextPlan();
     }
 
     @Override
     public String getCurrentPriceList() {
-        return (getLatestTransition() == null) ? null : getLatestTransition().getNextPriceList();
+        return (getPreviousTransition() == null) ? null : getPreviousTransition().getNextPriceList();
     }
 
 
     @Override
     public DateTime getEndDate() {
-        SubscriptionTransition latestTransition = getLatestTransition();
+        SubscriptionTransition latestTransition = getPreviousTransition();
         if (latestTransition.getNextState() == SubscriptionState.CANCELLED) {
             return latestTransition.getEffectiveTransitionTime();
         }
@@ -188,6 +188,7 @@ public class SubscriptionData implements Subscription {
         return result;
     }
 
+    @Override
     public SubscriptionTransition getPendingTransition() {
         if (transitions == null) {
             return null;
@@ -200,7 +201,7 @@ public class SubscriptionData implements Subscription {
         return null;
     }
 
-    public SubscriptionTransition getLatestTransition() {
+    public SubscriptionTransition getPreviousTransition() {
 
         if (transitions == null) {
             return null;
@@ -240,10 +241,12 @@ public class SubscriptionData implements Subscription {
         return bundleStartDate;
     }
 
+    @Override
     public DateTime getChargedThroughDate() {
         return chargedThroughDate;
     }
 
+    @Override
     public DateTime getPaidThroughDate() {
         return paidThroughDate;
     }
@@ -354,7 +357,7 @@ public class SubscriptionData implements Subscription {
         transitions = new LinkedList<SubscriptionTransitionData>();
         Plan previousPlan = null;
         PlanPhase previousPhase = null;
-        
+
         for (final EntitlementEvent cur : events) {
 
             if (!cur.isActive() || cur.getActiveVersion() < activeVersion) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index ce70ba7..0f5375b 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -31,8 +31,6 @@ import com.ning.billing.entitlement.api.billing.DefaultEntitlementBillingApi;
 import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
 import com.ning.billing.entitlement.api.migration.DefaultEntitlementMigrationApi;
 import com.ning.billing.entitlement.api.migration.EntitlementMigrationApi;
-import com.ning.billing.entitlement.api.test.DefaultEntitlementTestApi;
-import com.ning.billing.entitlement.api.test.EntitlementTestApi;
 import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
 import com.ning.billing.entitlement.api.user.EntitlementUserApi;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
@@ -65,7 +63,6 @@ public class Engine implements EventListener, EntitlementService {
     private final PlanAligner planAligner;
     private final EntitlementUserApi userApi;
     private final EntitlementBillingApi billingApi;
-    private final EntitlementTestApi testApi;
     private final EntitlementMigrationApi migrationApi;
     private final Bus eventBus;
     private final EntitlementConfig config;
@@ -76,7 +73,7 @@ public class Engine implements EventListener, EntitlementService {
     @Inject
     public Engine(Clock clock, EntitlementDao dao, PlanAligner planAligner,
             EntitlementConfig config, DefaultEntitlementUserApi userApi,
-            DefaultEntitlementBillingApi billingApi, DefaultEntitlementTestApi testApi,
+            DefaultEntitlementBillingApi billingApi,
             DefaultEntitlementMigrationApi migrationApi, Bus eventBus,
             NotificationQueueService notificationQueueService) {
         super();
@@ -84,7 +81,6 @@ public class Engine implements EventListener, EntitlementService {
         this.dao = dao;
         this.planAligner = planAligner;
         this.userApi = userApi;
-        this.testApi = testApi;
         this.billingApi = billingApi;
         this.migrationApi = migrationApi;
         this.config = config;
@@ -105,7 +101,7 @@ public class Engine implements EventListener, EntitlementService {
                     NOTIFICATION_QUEUE_NAME,
                     new NotificationQueueHandler() {
                 @Override
-                public void handleReadyNotification(String notificationKey) {
+                public void handleReadyNotification(String notificationKey, DateTime eventDateTime) {
                     EntitlementEvent event = dao.getEventById(UUID.fromString(notificationKey));
                     if (event == null) {
                         log.warn("Failed to extract event for notification key {}", notificationKey);
@@ -161,11 +157,6 @@ public class Engine implements EventListener, EntitlementService {
 
 
     @Override
-    public EntitlementTestApi getTestApi() {
-        return testApi;
-    }
-
-    @Override
     public EntitlementMigrationApi getMigrationApi() {
         return migrationApi;
     }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java b/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
index c27a8f4..4f0dfec 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
@@ -27,8 +27,6 @@ import com.ning.billing.entitlement.api.billing.DefaultEntitlementBillingApi;
 import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
 import com.ning.billing.entitlement.api.migration.DefaultEntitlementMigrationApi;
 import com.ning.billing.entitlement.api.migration.EntitlementMigrationApi;
-import com.ning.billing.entitlement.api.test.DefaultEntitlementTestApi;
-import com.ning.billing.entitlement.api.test.EntitlementTestApi;
 import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
 import com.ning.billing.entitlement.api.user.EntitlementUserApi;
 import com.ning.billing.entitlement.api.user.SubscriptionApiService;
@@ -57,7 +55,6 @@ public class EntitlementModule extends AbstractModule {
         bind(Engine.class).asEagerSingleton();
         bind(PlanAligner.class).asEagerSingleton();
         bind(MigrationPlanAligner.class).asEagerSingleton();
-        bind(EntitlementTestApi.class).to(DefaultEntitlementTestApi.class).asEagerSingleton();
         bind(EntitlementUserApi.class).to(DefaultEntitlementUserApi.class).asEagerSingleton();
         bind(EntitlementBillingApi.class).to(DefaultEntitlementBillingApi.class).asEagerSingleton();
         bind(EntitlementMigrationApi.class).to(DefaultEntitlementMigrationApi.class).asEagerSingleton();
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
index 55ad7f4..dfdc746 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
@@ -1,5 +1,5 @@
-DROP TABLE IF EXISTS events;
-CREATE TABLE events (
+DROP TABLE IF EXISTS entitlement_events;
+CREATE TABLE entitlement_events (
     id int(11) unsigned NOT NULL AUTO_INCREMENT,
     event_id char(36) NOT NULL,
     event_type varchar(9) NOT NULL,
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
index 704e2c7..10f565d 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
@@ -15,14 +15,14 @@ getEventById(event_id) ::= <<
       , plist_name
       , current_version
       , is_active  
-  from events
+  from entitlement_events
   where
       event_id = :event_id
   ;
 >>
 
 insertEvent() ::= <<
-    insert into events (
+    insert into entitlement_events (
       event_id
       , event_type
       , user_type
@@ -54,14 +54,14 @@ insertEvent() ::= <<
 >>
 
 removeEvents(subscription_id) ::= <<
-    delete from events
+    delete from entitlement_events
       where
     subscription_id = :subscription_id
     ;
 >>
 
 unactiveEvent(event_id, now) ::= <<
-    update events
+    update entitlement_events
     set
       is_active = 0
       , updated_dt = :now
@@ -71,7 +71,7 @@ unactiveEvent(event_id, now) ::= <<
 >>
 
 reactiveEvent(event_id, now) ::= <<
-    update events
+    update entitlement_events
     set
       is_active = 1
       , updated_dt = :now
@@ -95,7 +95,7 @@ getFutureActiveEventForSubscription(subscription_id, now) ::= <<
       , plist_name
       , current_version
       , is_active
-    from events
+    from entitlement_events
     where
       subscription_id = :subscription_id
       and is_active = 1
@@ -123,7 +123,7 @@ getEventsForSubscription(subscription_id) ::= <<
       , plist_name
       , current_version
       , is_active
-    from events
+    from entitlement_events
     where
       subscription_id = :subscription_id
     order by
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadSubscription.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadSubscription.java
index e55ba79..98cc376 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadSubscription.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadSubscription.java
@@ -47,98 +47,103 @@ public class BrainDeadSubscription implements Subscription {
 			throws EntitlementUserApiException {
 		throw new UnsupportedOperationException();
 
-		
+
 	}
 
 	@Override
 	public void pause() throws EntitlementUserApiException {
 		throw new UnsupportedOperationException();
 
-		
+
 	}
 
 	@Override
 	public void resume() throws EntitlementUserApiException {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public UUID getId() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public UUID getBundleId() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public SubscriptionState getState() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public DateTime getStartDate() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public DateTime getEndDate() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public Plan getCurrentPlan() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public String getCurrentPriceList() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public PlanPhase getCurrentPhase() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public DateTime getChargedThroughDate() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public DateTime getPaidThroughDate() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public List<SubscriptionTransition> getActiveTransitions() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public List<SubscriptionTransition> getAllTransitions() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
 	@Override
 	public SubscriptionTransition getPendingTransition() {
 		throw new UnsupportedOperationException();
-		
+
 	}
 
+    @Override
+    public SubscriptionTransition getPreviousTransition() {
+        return null;
+    }
+
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
index 2204274..c5881f9 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
@@ -58,7 +58,7 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
 
     public static interface ResetSqlDao extends Transactional<ResetSqlDao>, CloseMe {
 
-        @SqlUpdate("truncate table events")
+        @SqlUpdate("truncate table entitlement_events")
         public void resetEvents();
 
         @SqlUpdate("truncate table subscriptions")

invoice/pom.xml 2(+1 -1)

diff --git a/invoice/pom.xml b/invoice/pom.xml
index cc98907..81f9eb0 100644
--- a/invoice/pom.xml
+++ b/invoice/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.5-SNAPSHOT</version>
+        <version>0.1.6-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-invoice</artifactId>
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index e7a26a3..3b27fa6 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -24,23 +24,31 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
-import com.ning.billing.invoice.model.FixedPriceInvoiceItem;
-import com.ning.billing.invoice.model.RecurringInvoiceItem;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.inject.Inject;
+import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
+import com.ning.billing.invoice.api.DefaultInvoiceService;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceCreationNotification;
 import com.ning.billing.invoice.api.InvoiceItem;
 import com.ning.billing.invoice.api.InvoicePayment;
 import com.ning.billing.invoice.api.user.DefaultInvoiceCreationNotification;
-import com.ning.billing.invoice.notification.NextBillingDateNotifier;
+import com.ning.billing.invoice.model.FixedPriceInvoiceItem;
+import com.ning.billing.invoice.model.RecurringInvoiceItem;
+import com.ning.billing.invoice.notification.DefaultNextBillingDateNotifier;
+import com.ning.billing.invoice.notification.NextBillingDatePoster;
 import com.ning.billing.util.bus.Bus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
 
 public class DefaultInvoiceDao implements InvoiceDao {
     private final static Logger log = LoggerFactory.getLogger(DefaultInvoiceDao.class);
@@ -49,21 +57,23 @@ public class DefaultInvoiceDao implements InvoiceDao {
     private final RecurringInvoiceItemSqlDao recurringInvoiceItemSqlDao;
     private final FixedPriceInvoiceItemSqlDao fixedPriceInvoiceItemSqlDao;
     private final InvoicePaymentSqlDao invoicePaymentSqlDao;
-    private final NextBillingDateNotifier notifier;
     private final EntitlementBillingApi entitlementBillingApi;
 
     private final Bus eventBus;
 
+	private NextBillingDatePoster nextBillingDatePoster;
+
     @Inject
     public DefaultInvoiceDao(final IDBI dbi, final Bus eventBus,
-                             final NextBillingDateNotifier notifier, final EntitlementBillingApi entitlementBillingApi) {
+                             final EntitlementBillingApi entitlementBillingApi,
+                             NextBillingDatePoster nextBillingDatePoster) {
         this.invoiceSqlDao = dbi.onDemand(InvoiceSqlDao.class);
         this.recurringInvoiceItemSqlDao = dbi.onDemand(RecurringInvoiceItemSqlDao.class);
         this.fixedPriceInvoiceItemSqlDao = dbi.onDemand(FixedPriceInvoiceItemSqlDao.class);
         this.invoicePaymentSqlDao = dbi.onDemand(InvoicePaymentSqlDao.class);
         this.eventBus = eventBus;
-        this.notifier = notifier;
         this.entitlementBillingApi = entitlementBillingApi;
+        this.nextBillingDatePoster = nextBillingDatePoster;
     }
 
     @Override
@@ -273,12 +283,12 @@ public class DefaultInvoiceDao implements InvoiceDao {
                 if ((recurringInvoiceItem.getEndDate() != null) &&
                         (recurringInvoiceItem.getAmount() == null ||
                                 recurringInvoiceItem.getAmount().compareTo(BigDecimal.ZERO) >= 0)) {
-                notifier.insertNextBillingNotification(dao, item.getSubscriptionId(), recurringInvoiceItem.getEndDate());
+                	nextBillingDatePoster.insertNextBillingNotification(dao, item.getSubscriptionId(), recurringInvoiceItem.getEndDate());
                 }
             }
         }
     }
-
+    
     private void setChargedThroughDates(final InvoiceSqlDao dao, final Collection<InvoiceItem> fixedPriceItems,
                                         final Collection<InvoiceItem> recurringItems) {
         Map<UUID, DateTime> chargeThroughDates = new HashMap<UUID, DateTime>();
diff --git a/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java b/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java
index 20765eb..1dfac5b 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java
@@ -16,8 +16,8 @@
 
 package com.ning.billing.invoice.glue;
 
-import com.ning.billing.util.glue.GlobalLockerModule;
 import org.skife.config.ConfigurationObjectFactory;
+
 import com.google.inject.AbstractModule;
 import com.ning.billing.config.InvoiceConfig;
 import com.ning.billing.invoice.InvoiceListener;
@@ -32,8 +32,11 @@ import com.ning.billing.invoice.dao.InvoiceDao;
 import com.ning.billing.invoice.model.DefaultInvoiceGenerator;
 import com.ning.billing.invoice.model.InvoiceGenerator;
 import com.ning.billing.invoice.notification.DefaultNextBillingDateNotifier;
+import com.ning.billing.invoice.notification.DefaultNextBillingDatePoster;
 import com.ning.billing.invoice.notification.NextBillingDateNotifier;
+import com.ning.billing.invoice.notification.NextBillingDatePoster;
 import com.ning.billing.util.glue.ClockModule;
+import com.ning.billing.util.glue.GlobalLockerModule;
 
 
 public class InvoiceModule extends AbstractModule {
@@ -64,6 +67,7 @@ public class InvoiceModule extends AbstractModule {
 
     protected void installNotifier() {
         bind(NextBillingDateNotifier.class).to(DefaultNextBillingDateNotifier.class).asEagerSingleton();
+        bind(NextBillingDatePoster.class).to(DefaultNextBillingDatePoster.class).asEagerSingleton();
     }
 
     protected void installInvoiceListener() {
diff --git a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
new file mode 100644
index 0000000..0a23d01
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.ErrorCode;
+import com.ning.billing.account.api.Account;
+import com.ning.billing.account.api.AccountUserApi;
+import com.ning.billing.catalog.api.Currency;
+import com.ning.billing.entitlement.api.billing.BillingEvent;
+import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
+import com.ning.billing.entitlement.api.user.SubscriptionTransition;
+import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.invoice.api.InvoiceApiException;
+import com.ning.billing.invoice.api.InvoiceItem;
+import com.ning.billing.invoice.dao.InvoiceDao;
+import com.ning.billing.invoice.model.BillingEventSet;
+import com.ning.billing.invoice.model.InvoiceGenerator;
+import com.ning.billing.invoice.model.InvoiceItemList;
+import com.ning.billing.util.globallocker.GlobalLock;
+import com.ning.billing.util.globallocker.GlobalLocker;
+import com.ning.billing.util.globallocker.LockFailedException;
+import com.ning.billing.util.globallocker.GlobalLocker.LockerService;
+
+public class InvoiceDispatcher {
+	private final static Logger log = LoggerFactory.getLogger(InvoiceDispatcher.class);
+    private final static int NB_LOCK_TRY = 5;
+
+    private final InvoiceGenerator generator;
+    private final EntitlementBillingApi entitlementBillingApi;
+    private final AccountUserApi accountUserApi;
+    private final InvoiceDao invoiceDao;
+    private final GlobalLocker locker;
+
+    private final static boolean VERBOSE_OUTPUT = false;
+    @Inject
+    public InvoiceDispatcher(final InvoiceGenerator generator, final AccountUserApi accountUserApi,
+                           final EntitlementBillingApi entitlementBillingApi,
+                           final InvoiceDao invoiceDao,
+                           final GlobalLocker locker) {
+        this.generator = generator;
+        this.entitlementBillingApi = entitlementBillingApi;
+        this.accountUserApi = accountUserApi;
+        this.invoiceDao = invoiceDao;
+        this.locker = locker;
+    }
+
+
+    public void processSubscription(final SubscriptionTransition transition) throws InvoiceApiException {
+        UUID subscriptionId = transition.getSubscriptionId();
+        DateTime targetDate = transition.getEffectiveTransitionTime();
+        log.info("Got subscription transition from InvoiceListener. id: " + subscriptionId.toString() + "; targetDate: " + targetDate.toString());
+        log.info("Transition type: " + transition.getTransitionType().toString());
+        processSubscription(subscriptionId, targetDate);
+    }
+
+    public void processSubscription(final UUID subscriptionId, final DateTime targetDate) throws InvoiceApiException {
+        if (subscriptionId == null) {
+            log.error("Failed handling entitlement change.", new InvoiceApiException(ErrorCode.INVOICE_INVALID_TRANSITION));
+            return;
+        }
+
+        UUID accountId = entitlementBillingApi.getAccountIdFromSubscriptionId(subscriptionId);
+        if (accountId == null) {
+            log.error("Failed handling entitlement change.",
+                    new InvoiceApiException(ErrorCode.INVOICE_NO_ACCOUNT_ID_FOR_SUBSCRIPTION_ID, subscriptionId.toString()));
+            return;
+        }
+
+        GlobalLock lock = null;
+        try {
+            lock = locker.lockWithNumberOfTries(LockerService.INVOICE, accountId.toString(), NB_LOCK_TRY);
+
+            processAccountWithLock(accountId, targetDate);
+
+        } catch (LockFailedException e) {
+            // Not good!
+            log.error(String.format("Failed to process invoice for account %s, subscription %s, targetDate %s",
+                    accountId.toString(), subscriptionId.toString(), targetDate), e);
+        } finally {
+            if (lock != null) {
+                lock.release();
+            }
+        }
+    }
+
+    private void processAccountWithLock(final UUID accountId, final DateTime targetDate) throws InvoiceApiException {
+
+        Account account = accountUserApi.getAccountById(accountId);
+        if (account == null) {
+            log.error("Failed handling entitlement change.",
+                    new InvoiceApiException(ErrorCode.INVOICE_ACCOUNT_ID_INVALID, accountId.toString()));
+            return;
+        }
+
+        SortedSet<BillingEvent> events = entitlementBillingApi.getBillingEventsForAccount(accountId);
+        BillingEventSet billingEvents = new BillingEventSet(events);
+
+        Currency targetCurrency = account.getCurrency();
+
+        List<InvoiceItem> items = invoiceDao.getInvoiceItemsByAccount(accountId);
+        InvoiceItemList invoiceItemList = new InvoiceItemList(items);
+        Invoice invoice = generator.generateInvoice(accountId, billingEvents, invoiceItemList, targetDate, targetCurrency);
+
+        if (invoice == null) {
+            log.info("Generated null invoice.");
+            outputDebugData(events, invoiceItemList);
+        } else {
+            log.info("Generated invoice {} with {} items.", invoice.getId().toString(), invoice.getNumberOfItems());
+
+            if (VERBOSE_OUTPUT) {
+                log.info("New items");
+                for (InvoiceItem item : invoice.getInvoiceItems()) {
+                    log.info(item.toString());
+                }
+            }
+            outputDebugData(events, invoiceItemList);
+
+            if (invoice.getNumberOfItems() > 0) {
+                invoiceDao.create(invoice);
+            }
+        }
+    }
+
+    private void outputDebugData(Collection<BillingEvent> events, Collection<InvoiceItem> invoiceItemList) {
+        if (VERBOSE_OUTPUT) {
+            log.info("Events");
+            for (BillingEvent event : events) {
+                log.info(event.toString());
+            }
+
+            log.info("Existing items");
+            for (InvoiceItem item : invoiceItemList) {
+                log.info(item.toString());
+            }
+        }
+    }
+}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java b/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java
index 038d674..c348f83 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java
@@ -16,9 +16,6 @@
 
 package com.ning.billing.invoice;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.SortedSet;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -27,69 +24,30 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
-import com.ning.billing.ErrorCode;
-import com.ning.billing.account.api.Account;
-import com.ning.billing.account.api.AccountUserApi;
-import com.ning.billing.catalog.api.Currency;
-import com.ning.billing.entitlement.api.billing.BillingEvent;
-import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
 import com.ning.billing.entitlement.api.user.SubscriptionTransition;
-import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceApiException;
-import com.ning.billing.invoice.api.InvoiceItem;
-import com.ning.billing.invoice.dao.InvoiceDao;
-import com.ning.billing.invoice.model.BillingEventSet;
-import com.ning.billing.invoice.model.InvoiceGenerator;
-import com.ning.billing.invoice.model.InvoiceItemList;
-import com.ning.billing.invoice.notification.NextBillingDateEvent;
-import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.globallocker.GlobalLock;
-import com.ning.billing.util.globallocker.GlobalLocker;
-import com.ning.billing.util.globallocker.GlobalLocker.LockerService;
-import com.ning.billing.util.globallocker.LockFailedException;
 
 public class InvoiceListener {
-
-
     private final static Logger log = LoggerFactory.getLogger(InvoiceListener.class);
-    private final static int NB_LOCK_TRY = 5;
-
-    private final InvoiceGenerator generator;
-    private final EntitlementBillingApi entitlementBillingApi;
-    private final AccountUserApi accountUserApi;
-    private final InvoiceDao invoiceDao;
-    private final Clock clock;
-    private final GlobalLocker locker;
-
-    private final static boolean VERBOSE_OUTPUT = false;
+	private InvoiceDispatcher dispatcher;
 
     @Inject
-    public InvoiceListener(final InvoiceGenerator generator, final AccountUserApi accountUserApi,
-                           final EntitlementBillingApi entitlementBillingApi,
-                           final InvoiceDao invoiceDao,
-                           final GlobalLocker locker,
-                           final Clock clock) {
-        this.generator = generator;
-        this.entitlementBillingApi = entitlementBillingApi;
-        this.accountUserApi = accountUserApi;
-        this.invoiceDao = invoiceDao;
-        this.locker = locker;
-        this.clock = clock;
+    public InvoiceListener(InvoiceDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
     }
 
     @Subscribe
     public void handleSubscriptionTransition(final SubscriptionTransition transition) {
         try {
-            processSubscription(transition);
+        	dispatcher.processSubscription(transition);
         } catch (InvoiceApiException e) {
             log.error(e.getMessage());
         }
     }
 
-    public void handleNextBillingDateEvent(final NextBillingDateEvent event) {
-        // STEPH should we use the date of the event instead?
+    public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime) {
         try {
-            processSubscription(event.getSubscriptionId(), clock.getUTCNow());
+        	dispatcher.processSubscription(subscriptionId, eventDateTime);
         } catch (InvoiceApiException e) {
             log.error(e.getMessage());
         }
@@ -160,32 +118,7 @@ public class InvoiceListener {
             outputDebugData(events, invoiceItemList);
         } else {
             log.info("Generated invoice {} with {} items.", invoice.getId().toString(), invoice.getNumberOfItems());
+=======
+>>>>>>> master
 
-            if (VERBOSE_OUTPUT) {
-                log.info("New items");
-                for (InvoiceItem item : invoice.getInvoiceItems()) {
-                    log.info(item.toString());
-                }
-            }
-            outputDebugData(events, invoiceItemList);
-
-            if (invoice.getNumberOfItems() > 0) {
-                invoiceDao.create(invoice);
-            }
-        }
-    }
-
-    private void outputDebugData(Collection<BillingEvent> events, Collection<InvoiceItem> invoiceItemList) {
-        if (VERBOSE_OUTPUT) {
-            log.info("Events");
-            for (BillingEvent event : events) {
-                log.info(event.toString());
-            }
-
-            log.info("Existing items");
-            for (InvoiceItem item : invoiceItemList) {
-                log.info(item.toString());
-            }
-        }
-    }
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java
index 92180ac..039f305 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java
@@ -109,10 +109,6 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
                     proposedItemIterator.remove();
                 }
             }
-//            if (existingInvoiceItems.contains(proposedItem)) {
-//                existingInvoiceItems.remove(proposedItem);
-//                proposedItemIterator.remove();
-//            }
         }
     }
 
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 3fd03b2..0dd5e3a 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -18,8 +18,6 @@ package com.ning.billing.invoice.notification;
 
 import java.util.UUID;
 
-import com.ning.billing.entitlement.api.user.Subscription;
-import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
@@ -27,9 +25,11 @@ import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.ning.billing.config.InvoiceConfig;
+import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.engine.dao.EntitlementDao;
+import com.ning.billing.invoice.InvoiceListener;
 import com.ning.billing.invoice.api.DefaultInvoiceService;
 import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.Bus.EventBusException;
 import com.ning.billing.util.notificationq.NotificationConfig;
 import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
@@ -41,22 +41,22 @@ public class DefaultNextBillingDateNotifier implements  NextBillingDateNotifier 
 
     private final static Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
 
-    private static final String NEXT_BILLING_DATE_NOTIFIER_QUEUE = "next-billing-date-queue";
+    public static final String NEXT_BILLING_DATE_NOTIFIER_QUEUE = "next-billing-date-queue";
 
-    private final Bus eventBus;
     private final NotificationQueueService notificationQueueService;
 	private final InvoiceConfig config;
     private final EntitlementDao entitlementDao;
 
     private NotificationQueue nextBillingQueue;
+	private InvoiceListener listener;
 
     @Inject
-	public DefaultNextBillingDateNotifier(NotificationQueueService notificationQueueService, Bus eventBus,
-                                          InvoiceConfig config, EntitlementDao entitlementDao){
+	public DefaultNextBillingDateNotifier(NotificationQueueService notificationQueueService, 
+			InvoiceConfig config, EntitlementDao entitlementDao, InvoiceListener listener){
 		this.notificationQueueService = notificationQueueService;
 		this.config = config;
-		this.eventBus = eventBus;
         this.entitlementDao = entitlementDao;
+        this.listener = listener; 
 	}
 
     @Override
@@ -66,15 +66,14 @@ public class DefaultNextBillingDateNotifier implements  NextBillingDateNotifier 
             		NEXT_BILLING_DATE_NOTIFIER_QUEUE,
                     new NotificationQueueHandler() {
                 @Override
-                public void handleReadyNotification(String notificationKey) {
-                	UUID subscriptionId;
+                public void handleReadyNotification(String notificationKey, DateTime eventDate) {
                 	try {
                  		UUID key = UUID.fromString(notificationKey);
                         Subscription subscription = entitlementDao.getSubscriptionFromId(key);
                         if (subscription == null) {
                             log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")" );
                         } else {
-                            processEvent(key);
+                            processEvent(key , eventDate);
                         }
                 	} catch (IllegalArgumentException e) {
                 		log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
@@ -118,27 +117,9 @@ public class DefaultNextBillingDateNotifier implements  NextBillingDateNotifier 
         }
     }
 
-    private void processEvent(UUID subscriptionId) {
-        try {
-            eventBus.post(new NextBillingDateEvent(subscriptionId));
-        } catch (EventBusException e) {
-            log.error("Failed to post entitlement event " + subscriptionId, e);
-        }
+    private void processEvent(UUID subscriptionId, DateTime eventDateTime) {
+        listener.handleNextBillingDateEvent(subscriptionId, eventDateTime);
     }
 
-    @Override
-    public void insertNextBillingNotification(final Transmogrifier transactionalDao, final UUID subscriptionId, final DateTime futureNotificationTime) {
-    	if (nextBillingQueue != null) {
-            log.info("Queuing next billing date notification. id: {}, timestamp: {}", subscriptionId.toString(), futureNotificationTime.toString());
-
-            nextBillingQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NotificationKey(){
-                @Override
-                public String toString() {
-                    return subscriptionId.toString();
-                }
-    	    });
-        } else {
-            log.error("Attempting to put items on a non-existent queue (NextBillingDateNotifier).");
-        }
-    }
+ 
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
index d33dc61..ea630aa 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
@@ -16,10 +16,6 @@
 
 package com.ning.billing.invoice.notification;
 
-import java.util.UUID;
-
-import org.joda.time.DateTime;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 public interface NextBillingDateNotifier {
 
@@ -29,7 +25,4 @@ public interface NextBillingDateNotifier {
 
     public void stop();
 
-	public void insertNextBillingNotification(Transmogrifier transactionalDao,
-			UUID subscriptionId, DateTime futureNotificationTime);
-
 }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/MockSubscription.java b/invoice/src/test/java/com/ning/billing/invoice/dao/MockSubscription.java
index 005a605..7a9253c 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/MockSubscription.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/MockSubscription.java
@@ -28,7 +28,7 @@ import java.util.List;
 import java.util.UUID;
 
 public class MockSubscription implements Subscription {
-    private UUID subscriptionId = UUID.randomUUID();
+    private final UUID subscriptionId = UUID.randomUUID();
 
     @Override
     public void cancel(DateTime requestedDate, boolean eot) throws EntitlementUserApiException {
@@ -119,4 +119,9 @@ public class MockSubscription implements Subscription {
     public SubscriptionTransition getPendingTransition() {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public SubscriptionTransition getPreviousTransition() {
+        return null;
+    }
 }
\ No newline at end of file
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/BrainDeadSubscription.java b/invoice/src/test/java/com/ning/billing/invoice/notification/BrainDeadSubscription.java
new file mode 100644
index 0000000..587144b
--- /dev/null
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/BrainDeadSubscription.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+import com.ning.billing.catalog.api.BillingPeriod;
+import com.ning.billing.catalog.api.Plan;
+import com.ning.billing.catalog.api.PlanPhase;
+import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionTransition;
+
+public class BrainDeadSubscription implements Subscription {
+
+	@Override
+	public void cancel(DateTime requestedDate, boolean eot)
+			throws EntitlementUserApiException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void uncancel() throws EntitlementUserApiException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void changePlan(String productName, BillingPeriod term,
+			String planSet, DateTime requestedDate)
+			throws EntitlementUserApiException {
+		throw new UnsupportedOperationException();
+
+		
+	}
+
+	@Override
+	public void pause() throws EntitlementUserApiException {
+		throw new UnsupportedOperationException();
+
+		
+	}
+
+	@Override
+	public void resume() throws EntitlementUserApiException {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public UUID getId() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public UUID getBundleId() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public SubscriptionState getState() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public DateTime getStartDate() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public DateTime getEndDate() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public Plan getCurrentPlan() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public String getCurrentPriceList() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public PlanPhase getCurrentPhase() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public DateTime getChargedThroughDate() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public DateTime getPaidThroughDate() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public List<SubscriptionTransition> getActiveTransitions() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public List<SubscriptionTransition> getAllTransitions() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public SubscriptionTransition getPendingTransition() {
+		throw new UnsupportedOperationException();
+		
+	}
+
+	@Override
+	public SubscriptionTransition getPreviousTransition() {
+		throw new UnsupportedOperationException();
+	}
+
+}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
index d2fcdee..67c27fa 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
@@ -16,17 +16,15 @@
 
 package com.ning.billing.invoice.notification;
 
+import static com.jayway.awaitility.Awaitility.await;
+import static java.util.concurrent.TimeUnit.MINUTES;
+
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 
-import com.ning.billing.catalog.DefaultCatalogService;
-import com.ning.billing.catalog.api.CatalogService;
-import com.ning.billing.config.CatalogConfig;
-import com.ning.billing.entitlement.engine.dao.EntitlementDao;
-import com.ning.billing.entitlement.engine.dao.EntitlementSqlDao;
-import com.ning.billing.util.bus.InMemoryBus;
 import org.apache.commons.io.IOUtils;
 import org.joda.time.DateTime;
 import org.skife.config.ConfigurationObjectFactory;
@@ -38,25 +36,36 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import com.google.common.eventbus.Subscribe;
+
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Stage;
+import com.ning.billing.catalog.DefaultCatalogService;
+import com.ning.billing.catalog.api.CatalogService;
+import com.ning.billing.config.CatalogConfig;
 import com.ning.billing.config.InvoiceConfig;
 import com.ning.billing.dbi.MysqlTestingHelper;
+import com.ning.billing.entitlement.api.migration.AccountMigrationData;
+import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBundle;
+import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
+import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.engine.dao.EntitlementDao;
+import com.ning.billing.entitlement.engine.dao.EntitlementSqlDao;
+import com.ning.billing.entitlement.events.EntitlementEvent;
+import com.ning.billing.invoice.InvoiceListener;
+import com.ning.billing.invoice.dao.DefaultInvoiceDao;
 import com.ning.billing.lifecycle.KillbillService.ServiceException;
+import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.bus.InMemoryBus;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
-import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
 import com.ning.billing.util.notificationq.DummySqlTest;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
-import static com.jayway.awaitility.Awaitility.await;
-import static java.util.concurrent.TimeUnit.MINUTES;
-
 public class TestNextBillingDateNotifier {
     private static Logger log = LoggerFactory.getLogger(TestNextBillingDateNotifier.class);
 	private Clock clock;
@@ -64,7 +73,159 @@ public class TestNextBillingDateNotifier {
 	private DummySqlTest dao;
 	private Bus eventBus;
 	private MysqlTestingHelper helper;
+	private InvoiceListenerMock listener = new InvoiceListenerMock();
+	private NotificationQueueService notificationQueueService;
+
+	private static final class InvoiceListenerMock extends InvoiceListener {
+		int eventCount = 0;
+		UUID latestSubscriptionId = null;
+
+		public InvoiceListenerMock() {
+			super(null);
+		}
+		
+
+		@Override
+		public void handleNextBillingDateEvent(UUID subscriptionId,
+				DateTime eventDateTime) {
+			eventCount++;
+			latestSubscriptionId=subscriptionId;
+		}
+		
+		public int getEventCount() {
+			return eventCount;
+		}
+		
+		public UUID getLatestSubscriptionId(){
+			return latestSubscriptionId;
+		}
+		
+	}
+	
+	private class MockEntitlementDao implements EntitlementDao {
+
+		@Override
+		public List<SubscriptionBundle> getSubscriptionBundleForAccount(
+				UUID accountId) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public SubscriptionBundle getSubscriptionBundleFromKey(String bundleKey) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public SubscriptionBundle getSubscriptionBundleFromId(UUID bundleId) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public SubscriptionBundle createSubscriptionBundle(
+				SubscriptionBundleData bundle) {
+			throw new UnsupportedOperationException();
+
+		}
+
+		@Override
+		public Subscription getSubscriptionFromId(UUID subscriptionId) {
+			return new BrainDeadSubscription();
+
+		}
+
+		@Override
+		public UUID getAccountIdFromSubscriptionId(UUID subscriptionId) {
+			throw new UnsupportedOperationException();
+
+		}
 
+		@Override
+		public Subscription getBaseSubscription(UUID bundleId) {
+			throw new UnsupportedOperationException();
+
+		}
+
+		@Override
+		public List<Subscription> getSubscriptions(UUID bundleId) {
+			throw new UnsupportedOperationException();
+
+		}
+
+		@Override
+		public List<Subscription> getSubscriptionsForKey(String bundleKey) {
+			throw new UnsupportedOperationException();
+
+		}
+
+		@Override
+		public void updateSubscription(SubscriptionData subscription) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void createNextPhaseEvent(UUID subscriptionId,
+				EntitlementEvent nextPhase) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public EntitlementEvent getEventById(UUID eventId) {
+			throw new UnsupportedOperationException();
+
+		}
+
+		@Override
+		public List<EntitlementEvent> getEventsForSubscription(
+				UUID subscriptionId) {
+			throw new UnsupportedOperationException();
+
+		}
+
+		@Override
+		public List<EntitlementEvent> getPendingEventsForSubscription(
+				UUID subscriptionId) {
+			throw new UnsupportedOperationException();
+
+		}
+
+		@Override
+		public void createSubscription(SubscriptionData subscription,
+				List<EntitlementEvent> initialEvents) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void cancelSubscription(UUID subscriptionId,
+				EntitlementEvent cancelEvent) {
+			throw new UnsupportedOperationException();
+
+		}
+
+		@Override
+		public void uncancelSubscription(UUID subscriptionId,
+				List<EntitlementEvent> uncancelEvents) {
+			throw new UnsupportedOperationException();
+	
+		}
+
+		@Override
+		public void changePlan(UUID subscriptionId,
+				List<EntitlementEvent> changeEvents) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void migrate(UUID acountId, AccountMigrationData data) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void undoMigration(UUID accountId) {
+			throw new UnsupportedOperationException();
+		}
+		
+	}
+	
 	@BeforeClass(groups={"setup"})
 	public void setup() throws ServiceException, IOException, ClassNotFoundException, SQLException {
 		//TestApiBase.loadSystemPropertiesFromClasspath("/entitlement.properties");
@@ -91,7 +252,8 @@ public class TestNextBillingDateNotifier {
         dao = dbi.onDemand(DummySqlTest.class);
         eventBus = g.getInstance(Bus.class);
         helper = g.getInstance(MysqlTestingHelper.class);
-        notifier = new DefaultNextBillingDateNotifier(g.getInstance(NotificationQueueService.class), eventBus, g.getInstance(InvoiceConfig.class), g.getInstance(EntitlementDao.class));
+        notificationQueueService = g.getInstance(NotificationQueueService.class);
+        notifier = new DefaultNextBillingDateNotifier(notificationQueueService,g.getInstance(InvoiceConfig.class), new MockEntitlementDao(), listener);
         startMysql();
 	}
 
@@ -105,48 +267,30 @@ public class TestNextBillingDateNotifier {
         helper.initDb(entitlementDdl);
 	}
 
-	public static class NextBillingEventListener {
-		private int eventCount=0;
-		private NextBillingDateEvent event;
 
-		public int getEventCount() {
-			return eventCount;
-		}
-
-		@Subscribe
-		public synchronized void processEvent(NextBillingDateEvent event) {
-			eventCount++;
-			this.event = event;
-			//log.debug("Got event {} {}", event.name, event.value);
-		}
-		
-		public NextBillingDateEvent getLatestEvent() {
-			return event;
-		}
-	}
-
-	@Test(enabled=false, groups="slow")
+	@Test(enabled=true, groups="slow")
 	public void test() throws Exception {
-		final UUID subscriptionId = new UUID(0L,1000L);
+		final UUID subscriptionId = new UUID(0L,1L);
 		final DateTime now = new DateTime();
 		final DateTime readyTime = now.plusMillis(2000);
+		final NextBillingDatePoster poster = new DefaultNextBillingDatePoster(notificationQueueService); 
 
-		final NextBillingEventListener listener = new NextBillingEventListener();
 		eventBus.start();
 		notifier.initialize();
 		notifier.start();
+		
 
-        eventBus.register(listener);
 		dao.inTransaction(new Transaction<Void, DummySqlTest>() {
 			@Override
 			public Void inTransaction(DummySqlTest transactional,
 					TransactionStatus status) throws Exception {
 
-				notifier.insertNextBillingNotification(transactional, subscriptionId, readyTime);
+				poster.insertNextBillingNotification(transactional, subscriptionId, readyTime);
 				return null;
 			}
 		});
-
+		
+		
 		// Move time in the future after the notification effectiveDate
 		((ClockMock) clock).setDeltaFromReality(3000);
 
@@ -159,6 +303,6 @@ public class TestNextBillingDateNotifier {
 	        });
 
 		Assert.assertEquals(listener.getEventCount(), 1);
-		Assert.assertEquals(listener.getLatestEvent().getSubscriptionId(), subscriptionId);
+		Assert.assertEquals(listener.getLatestSubscriptionId(), subscriptionId);
 	}
 }

payment/pom.xml 2(+1 -1)

diff --git a/payment/pom.xml b/payment/pom.xml
index 3e050f0..bdb30c8 100644
--- a/payment/pom.xml
+++ b/payment/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.5-SNAPSHOT</version>
+        <version>0.1.6-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-payment</artifactId>
diff --git a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
index c868b05..a8364bf 100644
--- a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
+++ b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
@@ -23,8 +23,6 @@ import java.util.UUID;
 
 import javax.annotation.Nullable;
 
-import com.ning.billing.invoice.api.InvoicePayment;
-import com.ning.billing.invoice.model.DefaultInvoicePayment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +31,7 @@ import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountUserApi;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoicePaymentApi;
+import com.ning.billing.invoice.model.DefaultInvoicePayment;
 import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.provider.PaymentProviderPlugin;
 import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
@@ -179,9 +178,9 @@ public class DefaultPaymentApi implements PaymentApi {
                 invoicePaymentApi.notifyOfPaymentAttempt(new DefaultInvoicePayment(paymentAttempt.getPaymentAttemptId(),
                                                                                    invoice.getId(),
                                                                                    paymentAttempt.getPaymentAttemptDate(),
-                                                                                   paymentInfo == null ? null : paymentInfo.getAmount(),
+                                                                                   paymentInfo == null || paymentInfo.getStatus().equalsIgnoreCase("Error") ? null : paymentInfo.getAmount(),
 //                                                                                 paymentInfo.getRefundAmount(), TODO
-                                                                                   paymentInfo == null ? null : invoice.getCurrency()));
+                                                                                   paymentInfo == null || paymentInfo.getStatus().equalsIgnoreCase("Error") ? null : invoice.getCurrency()));
 
             }
         }
diff --git a/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java b/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java
new file mode 100644
index 0000000..71c7e41
--- /dev/null
+++ b/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.payment.provider;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import com.ning.billing.account.api.Account;
+import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.payment.api.Either;
+import com.ning.billing.payment.api.PaymentError;
+import com.ning.billing.payment.api.PaymentInfo;
+import com.ning.billing.payment.api.PaymentMethodInfo;
+import com.ning.billing.payment.api.PaymentProviderAccount;
+
+public class NoOpPaymentProviderPlugin implements PaymentProviderPlugin {
+
+    @Override
+    public Either<PaymentError, PaymentInfo> processInvoice(Account account, Invoice invoice) {
+        PaymentInfo payment = new PaymentInfo.Builder()
+                                             .setPaymentId(UUID.randomUUID().toString())
+                                             .setAmount(invoice.getBalance())
+                                             .setStatus("Processed")
+                                             .setCreatedDate(new DateTime(DateTimeZone.UTC))
+                                             .setEffectiveDate(new DateTime(DateTimeZone.UTC))
+                                             .setType("Electronic")
+                                             .build();
+        return Either.right(payment);
+    }
+
+    @Override
+    public Either<PaymentError, PaymentInfo> getPaymentInfo(String paymentId) {
+        return Either.right(null);
+    }
+
+    @Override
+    public Either<PaymentError, String> createPaymentProviderAccount(Account account) {
+        return Either.left(new PaymentError("unsupported", "Account creation not supported in this plugin"));
+    }
+
+    @Override
+    public Either<PaymentError, PaymentProviderAccount> getPaymentProviderAccount(String accountKey) {
+        return Either.right(null);
+    }
+
+    @Override
+    public Either<PaymentError, String> addPaymentMethod(String accountKey, PaymentMethodInfo paymentMethod) {
+        return Either.right(null);
+    }
+
+    public void setDefaultPaymentMethodOnAccount(PaymentProviderAccount account, String paymentMethodId) {
+        // NO-OP
+    }
+
+    @Override
+    public Either<PaymentError, PaymentMethodInfo> updatePaymentMethod(String accountKey, PaymentMethodInfo paymentMethod) {
+        return Either.right(paymentMethod);
+    }
+
+    @Override
+    public Either<PaymentError, Void> deletePaymentMethod(String accountKey, String paymentMethodId) {
+        return Either.right(null);
+    }
+
+    @Override
+    public Either<PaymentError, PaymentMethodInfo> getPaymentMethodInfo(String paymentMethodId) {
+        return Either.right(null);
+    }
+
+    @Override
+    public Either<PaymentError, List<PaymentMethodInfo>> getPaymentMethods(final String accountKey) {
+        return Either.right(Arrays.<PaymentMethodInfo>asList());
+    }
+
+    @Override
+    public Either<PaymentError, Void> updatePaymentGateway(String accountKey) {
+        return Either.right(null);
+    }
+
+    @Override
+    public Either<PaymentError, Void> updatePaymentProviderAccountExistingContact(Account account) {
+        return Either.right(null);
+    }
+
+    @Override
+    public Either<PaymentError, Void> updatePaymentProviderAccountWithNewContact(Account account) {
+        return Either.right(null);
+    }
+
+}
diff --git a/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPluginProvider.java b/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPluginProvider.java
new file mode 100644
index 0000000..5ba98b7
--- /dev/null
+++ b/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPluginProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.payment.provider;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+
+public class NoOpPaymentProviderPluginProvider implements Provider<NoOpPaymentProviderPlugin> {
+    private PaymentProviderPluginRegistry registry;
+    private final String instanceName;
+
+    public NoOpPaymentProviderPluginProvider(String instanceName) {
+        this.instanceName = instanceName;
+    }
+
+    @Inject
+    public void setPaymentProviderPluginRegistry(PaymentProviderPluginRegistry registry) {
+        this.registry = registry;
+    }
+
+    @Override
+    public NoOpPaymentProviderPlugin get() {
+        NoOpPaymentProviderPlugin plugin = new NoOpPaymentProviderPlugin();
+
+        registry.register(plugin, instanceName);
+        return plugin;
+    }
+
+}
diff --git a/payment/src/main/resources/com/ning/billing/payment/ddl.sql b/payment/src/main/resources/com/ning/billing/payment/ddl.sql
index a8bd47f..abbd8a6 100644
--- a/payment/src/main/resources/com/ning/billing/payment/ddl.sql
+++ b/payment/src/main/resources/com/ning/billing/payment/ddl.sql
@@ -25,7 +25,7 @@ CREATE TABLE payments (
       status varchar(20) COLLATE utf8_bin,
       reference_id varchar(36) COLLATE utf8_bin,
       payment_type varchar(20) COLLATE utf8_bin,
-      payment_method_id varchar(20) COLLATE utf8_bin,
+      payment_method_id varchar(36) COLLATE utf8_bin,
       payment_method varchar(20) COLLATE utf8_bin,
       card_type varchar(20) COLLATE utf8_bin,
       card_country varchar(50) COLLATE utf8_bin,

pom.xml 2(+1 -1)

diff --git a/pom.xml b/pom.xml
index dd142a7..179d4c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,7 +17,7 @@
     <groupId>com.ning.billing</groupId>
     <artifactId>killbill</artifactId>
     <packaging>pom</packaging>
-    <version>0.1.5-SNAPSHOT</version>
+    <version>0.1.6-SNAPSHOT</version>
     <name>killbill</name>
     <description>Library for managing recurring subscriptions and the associated billing</description>
     <url>http://github.com/ning/killbill</url>

util/pom.xml 2(+1 -1)

diff --git a/util/pom.xml b/util/pom.xml
index b16ecfa..68648ef 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.5-SNAPSHOT</version>
+        <version>0.1.6-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-util</artifactId>
diff --git a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
index 9e4460b..31105c8 100644
--- a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
@@ -28,9 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public class InMemoryBus implements Bus {
 
-    // STEPH config ?
-    private final static int MAX_EVENT_THREADS = 1;
-
     private final static String EVENT_BUS_IDENTIFIER = "bus-service";
     private final static String EVENT_BUS_GROUP_NAME = "bus-grp";
     private final static String EVENT_BUS_TH_NAME = "bus-th";
@@ -68,7 +65,7 @@ public class InMemoryBus implements Bus {
     public InMemoryBus() {
 
         final ThreadGroup group = new ThreadGroup(EVENT_BUS_GROUP_NAME);
-        Executor executor = Executors.newFixedThreadPool(MAX_EVENT_THREADS, new ThreadFactory() {
+        Executor executor = Executors.newCachedThreadPool(new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                 return new Thread(group, r, EVENT_BUS_TH_NAME);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 3c19a2b..8e2aaf8 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -38,26 +38,29 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     }
 
     @Override
-    protected void doProcessEvents(final int sequenceId) {
+    protected int doProcessEvents(final int sequenceId) {
 
         logDebug("ENTER doProcessEvents");
         List<Notification> notifications = getReadyNotifications(sequenceId);
         if (notifications.size() == 0) {
             logDebug("EXIT doProcessEvents");
-            return;
+            return 0;
         }
 
         logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
 
+        int result = 0;
         for (final Notification cur : notifications) {
             nbProcessedEvents.incrementAndGet();
             logDebug("handling notification %s, key = %s for time %s",
                     cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
-            handler.handleReadyNotification(cur.getNotificationKey());
+            handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
+            result++;
             clearNotification(cur);
             logDebug("done handling notification %s, key = %s for time %s",
                     cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
         }
+        return result;
     }
 
     @Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index 91e7110..3b96ee4 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -36,5 +36,4 @@ public class DefaultNotificationQueueService extends NotificationQueueServiceBas
             NotificationConfig config) {
         return new DefaultNotificationQueue(dbi, clock, svcName, queueName, handler, config);
     }
-
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 4ea38f7..e1dcdbf 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -38,8 +38,9 @@ public interface NotificationQueue {
     * This is only valid when the queue has been configured with isNotificationProcessingOff is true
     * In which case, it will callback users for all the ready notifications.
     *
+    * @return the number of entries we processed
     */
-   public void processReadyNotification();
+   public int processReadyNotification();
 
    /**
     * Stops the queue. Blocks until queue is completely stopped.
@@ -55,4 +56,10 @@ public interface NotificationQueue {
     */
    public void startQueue();
 
+   /**
+    *
+    * @return the name of that queue
+    */
+   public String getFullQName();
+
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 9a42d2e..cc1ea28 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -57,10 +57,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
     // Use this object's monitor for synchronization (no need for volatile)
     protected boolean isProcessingEvents;
-    
+
     private boolean startedComplete = false;
     private boolean stoppedComplete = false;
-    
+
     // Package visibility on purpose
     NotificationQueueBase(final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
         this.clock = clock;
@@ -88,8 +88,8 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
 
     @Override
-    public void processReadyNotification() {
-        doProcessEvents(sequenceId.incrementAndGet());
+    public int processReadyNotification() {
+        return doProcessEvents(sequenceId.incrementAndGet());
     }
 
 
@@ -181,14 +181,14 @@ public abstract class NotificationQueueBase implements NotificationQueue {
         });
         waitForNotificationStartCompletion();
     }
-    
+
     private void completedQueueStop() {
     	synchronized (this) {
     		stoppedComplete = true;
             this.notifyAll();
         }
     }
-    
+
     private void completedQueueStart() {
         synchronized (this) {
         	startedComplete = true;
@@ -233,9 +233,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
         }
     }
 
-    protected String getFullQName() {
+    @Override
+    public String getFullQName() {
         return svcName + ":" +  queueName;
     }
 
-    protected abstract void doProcessEvents(int sequenceId);
+    protected abstract int doProcessEvents(int sequenceId);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index 4fb17b5..4d56b03 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.util.notificationq;
 
+import org.joda.time.DateTime;
+
 
 public interface NotificationQueueService {
 
@@ -25,7 +27,7 @@ public interface NotificationQueueService {
          *
          * @param notificationKey the notification key associated to that notification entry
          */
-        public void handleReadyNotification(String notificationKey);
+        public void handleReadyNotification(String notificationKey, DateTime eventDateTime);
      }
 
     public static final class NotificationQueueAlreadyExists extends Exception {
@@ -57,7 +59,7 @@ public interface NotificationQueueService {
      * @throws com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists is the queue associated with that service and name already exits
      *
      */
-    NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config)
+    public NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config)
         throws NotificationQueueAlreadyExists;
 
     /**
@@ -69,7 +71,14 @@ public interface NotificationQueueService {
      *
      * @throws NoSuchNotificationQueue if queue does not exist
      */
-    NotificationQueue getNotificationQueue(final String svcName, final String queueName)
+    public NotificationQueue getNotificationQueue(final String svcName, final String queueName)
         throws NoSuchNotificationQueue;
 
+
+    /**
+     *
+     * @param services
+     * @return the number of processed notifications
+     */
+    public int triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 98a10b1..3f8f26f 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -16,12 +16,16 @@
 
 package com.ning.billing.util.notificationq;
 
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
 import com.google.inject.Inject;
 import com.ning.billing.util.clock.Clock;
 
@@ -79,6 +83,48 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
     }
 
 
+    //
+    // Test ONLY
+    //
+    @Override
+    public int triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
+
+        int result = 0;
+
+        List<NotificationQueue> manualQueues = null;
+        if (services == null) {
+            manualQueues = new ArrayList<NotificationQueue>(queues.values());
+        } else {
+            Joiner join = Joiner.on(",");
+            join.join(services);
+
+            log.info("Trigger manual processing for services {} ", join.toString());
+            manualQueues = new LinkedList<NotificationQueue>();
+            synchronized (queues) {
+                for (String svc : services) {
+                    addQueuesForService(manualQueues, svc);
+                }
+            }
+        }
+        for (NotificationQueue cur : manualQueues) {
+            int processedNotifications = 0;
+            do {
+                processedNotifications = cur.processReadyNotification();
+                log.info("Got {} results from queue {}", processedNotifications, cur.getFullQName());
+                result += processedNotifications;
+            } while(keepRunning && processedNotifications > 0);
+        }
+        return result;
+    }
+
+    private final void addQueuesForService(final List<NotificationQueue> result, final String svcName) {
+        for (String cur : queues.keySet()) {
+            if (cur.startsWith(svcName)) {
+                result.add(queues.get(cur));
+            }
+        }
+    }
+
     protected abstract NotificationQueue createNotificationQueueInternal(String svcName,
             String queueName, NotificationQueueHandler handler,
             NotificationConfig config);
diff --git a/util/src/test/java/com/ning/billing/dbi/MysqlTestingHelper.java b/util/src/test/java/com/ning/billing/dbi/MysqlTestingHelper.java
index 1e949a2..5ee7e88 100644
--- a/util/src/test/java/com/ning/billing/dbi/MysqlTestingHelper.java
+++ b/util/src/test/java/com/ning/billing/dbi/MysqlTestingHelper.java
@@ -39,6 +39,9 @@ import com.mysql.management.MysqldResourceI;
  */
 public class MysqlTestingHelper
 {
+
+    public static final String USE_LOCAL_DB_PROP = "com.ning.billing.dbi.test.useLocalDb";
+
     private static final Logger log = LoggerFactory.getLogger(MysqlTestingHelper.class);
 
     private static final String DB_NAME = "test_killbill";
@@ -47,24 +50,38 @@ public class MysqlTestingHelper
 
     private File dbDir;
     private MysqldResource mysqldResource;
-    private int port = 0;
+    private int port;
 
     public MysqlTestingHelper()
     {
-        // New socket on any free port
-        final ServerSocket socket;
-        try {
-            socket = new ServerSocket(0);
-            port = socket.getLocalPort();
-            socket.close();
-        }
-        catch (IOException e) {
-            Assert.fail();
+        if (isUsingLocalInstance()) {
+            port = 3306;
+        } else {
+            // New socket on any free port
+            final ServerSocket socket;
+            try {
+                socket = new ServerSocket(0);
+                port = socket.getLocalPort();
+                socket.close();
+            }
+            catch (IOException e) {
+                Assert.fail();
+            }
         }
     }
 
+
+    public boolean isUsingLocalInstance() {
+        return (System.getProperty(USE_LOCAL_DB_PROP) != null);
+    }
+
     public void startMysql() throws IOException
     {
+
+        if (isUsingLocalInstance()) {
+            return;
+        }
+
         dbDir = File.createTempFile("mysql", "");
         dbDir.delete();
         dbDir.mkdir();
@@ -87,6 +104,12 @@ public class MysqlTestingHelper
 
     public void cleanupTable(final String table)
     {
+
+        if (!isUsingLocalInstance() && (mysqldResource == null || !mysqldResource.isRunning())) {
+            log.error("Asked to cleanup table " + table + " but MySQL is not running!");
+            return;
+        }
+
         if (mysqldResource == null || !mysqldResource.isRunning()) {
             log.error("Asked to cleanup table " + table + " but MySQL is not running!");
             return;
@@ -122,6 +145,9 @@ public class MysqlTestingHelper
 
     public void initDb(final String ddl) throws IOException
     {
+        if (isUsingLocalInstance()) {
+            return;
+        }
         final IDBI dbi = getDBI();
         dbi.withHandle(new HandleCallback<Void>()
         {
diff --git a/util/src/test/java/com/ning/billing/util/clock/ClockMock.java b/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
index 72fd8f4..ad3e5d3 100644
--- a/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
+++ b/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
@@ -38,13 +38,13 @@ public class ClockMock extends DefaultClock {
 
     private long deltaFromRealityMs;
     private List<Duration> deltaFromRealityDuration;
-    private long deltaFromRealitDurationEpsilon;
+    private long deltaFromRealityDurationEpsilon;
     private DeltaType deltaType;
 
     public ClockMock() {
         deltaType = DeltaType.DELTA_NONE;
         deltaFromRealityMs = 0;
-        deltaFromRealitDurationEpsilon = 0;
+        deltaFromRealityDurationEpsilon = 0;
         deltaFromRealityDuration = null;
     }
 
@@ -58,7 +58,7 @@ public class ClockMock extends DefaultClock {
         return getNow(DateTimeZone.UTC);
     }
 
-    private void logClockAdjustement(DateTime prev, DateTime next) {
+    private void logClockAdjustment(DateTime prev, DateTime next) {
         log.info(String.format("            ************      ADJUSTING CLOCK FROM %s to %s     ********************", prev, next));
     }
 
@@ -67,9 +67,9 @@ public class ClockMock extends DefaultClock {
         deltaType = DeltaType.DELTA_DURATION;
         deltaFromRealityDuration = new ArrayList<Duration>();
         deltaFromRealityDuration.add(delta);
-        deltaFromRealitDurationEpsilon = epsilon;
+        deltaFromRealityDurationEpsilon = epsilon;
         deltaFromRealityMs = 0;
-        logClockAdjustement(prev, getUTCNow());
+        logClockAdjustment(prev, getUTCNow());
     }
 
     public synchronized void addDeltaFromReality(Duration delta) {
@@ -78,16 +78,16 @@ public class ClockMock extends DefaultClock {
             throw new RuntimeException("ClockMock should be set with type DELTA_DURATION");
         }
         deltaFromRealityDuration.add(delta);
-        logClockAdjustement(prev, getUTCNow());
+        logClockAdjustment(prev, getUTCNow());
     }
 
     public synchronized void setDeltaFromReality(long delta) {
         DateTime prev = getUTCNow();
         deltaType = DeltaType.DELTA_ABS;
         deltaFromRealityDuration = null;
-        deltaFromRealitDurationEpsilon = 0;
+        deltaFromRealityDurationEpsilon = 0;
         deltaFromRealityMs = delta;
-        logClockAdjustement(prev, getUTCNow());
+        logClockAdjustment(prev, getUTCNow());
     }
 
     public synchronized void addDeltaFromReality(long delta) {
@@ -96,15 +96,15 @@ public class ClockMock extends DefaultClock {
             throw new RuntimeException("ClockMock should be set with type DELTA_ABS");
         }
         deltaFromRealityDuration = null;
-        deltaFromRealitDurationEpsilon = 0;
+        deltaFromRealityDurationEpsilon = 0;
         deltaFromRealityMs += delta;
-        logClockAdjustement(prev, getUTCNow());
+        logClockAdjustment(prev, getUTCNow());
     }
 
     public synchronized void resetDeltaFromReality() {
         deltaType = DeltaType.DELTA_NONE;
         deltaFromRealityDuration = null;
-        deltaFromRealitDurationEpsilon = 0;
+        deltaFromRealityDurationEpsilon = 0;
         deltaFromRealityMs = 0;
     }
 
@@ -125,8 +125,6 @@ public class ClockMock extends DefaultClock {
 
         DateTime result = input;
         for (Duration cur : deltaFromRealityDuration) {
-
-            int length = cur.getNumber();
             switch (cur.getUnit()) {
             case DAYS:
                 result = result.plusDays(cur.getNumber());
@@ -142,11 +140,11 @@ public class ClockMock extends DefaultClock {
 
             case UNLIMITED:
             default:
-                throw new RuntimeException("ClockMock is adjusting an unlimtited time period");
+                throw new RuntimeException("ClockMock is adjusting an unlimited time period");
             }
         }
-        if (deltaFromRealitDurationEpsilon != 0) {
-            result = result.plus(deltaFromRealitDurationEpsilon);
+        if (deltaFromRealityDurationEpsilon != 0) {
+            result = result.plus(deltaFromRealityDurationEpsilon);
         }
         return result;
     }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index e1da366..e96d2cf 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -59,7 +59,9 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    protected void doProcessEvents(int sequenceId) {
+    protected int doProcessEvents(int sequenceId) {
+
+        int result = 0;
 
         List<Notification> processedNotifications = new ArrayList<Notification>();
         List<Notification> oldNotifications = new ArrayList<Notification>();
@@ -73,8 +75,10 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
                     readyNotifications.add(cur);
                 }
             }
+
+            result = readyNotifications.size();
             for (Notification cur : readyNotifications) {
-                handler.handleReadyNotification(cur.getNotificationKey());
+                handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
                 DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
                 oldNotifications.add(cur);
                 processedNotifications.add(processedNotification);
@@ -87,6 +91,6 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
                 notifications.addAll(processedNotifications);
             }
         }
-
+        return result;
     }
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index eac2d78..fbcf8f9 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -48,7 +48,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
-import com.google.inject.name.Named;
 import com.google.inject.name.Names;
 import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.util.clock.Clock;
@@ -119,7 +118,7 @@ public class TestNotificationQueue {
 		final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
 				new NotificationQueueHandler() {
 			@Override
-			public void handleReadyNotification(String notificationKey) {
+			public void handleReadyNotification(String notificationKey, DateTime eventDateTime) {
 				synchronized (expectedNotifications) {
 	            	log.info("Handler received key: " + notificationKey);
 
@@ -182,7 +181,7 @@ public class TestNotificationQueue {
 		final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
 				new NotificationQueueHandler() {
 			@Override
-			public void handleReadyNotification(String notificationKey) {
+			public void handleReadyNotification(String notificationKey, DateTime eventDateTime) {
 				synchronized (expectedNotifications) {
 					expectedNotifications.put(notificationKey, Boolean.TRUE);
 					expectedNotifications.notify();
@@ -292,7 +291,7 @@ public class TestNotificationQueue {
 
 		final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
                 @Override
-                public void handleReadyNotification(String notificationKey) {
+                public void handleReadyNotification(String notificationKey, DateTime eventDateTime)  {
                 	log.info("Fred received key: " + notificationKey);
                 	expectedNotificationsFred.put(notificationKey, Boolean.TRUE);
                 	eventsReceived++;
@@ -302,7 +301,7 @@ public class TestNotificationQueue {
 
 		final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
             @Override
-            public void handleReadyNotification(String notificationKey) {
+            public void handleReadyNotification(String notificationKey, DateTime eventDateTime) {
              	log.info("Barney received key: " + notificationKey);
             	expectedNotificationsBarney.put(notificationKey, Boolean.TRUE);
             	eventsReceived++;