killbill-uncached

Changes

account/pom.xml 2(+1 -1)

api/pom.xml 2(+1 -1)

beatrix/pom.xml 2(+1 -1)

catalog/pom.xml 2(+1 -1)

invoice/pom.xml 2(+1 -1)

payment/pom.xml 9(+2 -7)

payment/src/main/java/com/ning/billing/payment/PaymentAttempt.java 83(+0 -83)

payment/src/main/java/com/ning/billing/payment/PaymentInfoRequest.java 45(+0 -45)

pom.xml 4(+2 -2)

util/pom.xml 2(+1 -1)

Details

account/pom.xml 2(+1 -1)

diff --git a/account/pom.xml b/account/pom.xml
index 0ac4f64..230e8f9 100644
--- a/account/pom.xml
+++ b/account/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.6-SNAPSHOT</version>
+        <version>0.1.7-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-account</artifactId>
diff --git a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
index 1e00dba..623aa01 100644
--- a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
@@ -16,6 +16,7 @@
 
 package com.ning.billing.account.dao;
 
+import java.sql.DataTruncation;
 import java.util.List;
 import java.util.UUID;
 
@@ -89,7 +90,6 @@ public class DefaultAccountDao implements AccountDao {
     public void create(final Account account) throws AccountApiException {
         final String key = account.getExternalKey();
         try {
-
             accountSqlDao.inTransaction(new Transaction<Void, AccountSqlDao>() {
                 @Override
                 public Void inTransaction(final AccountSqlDao transactionalDao, final TransactionStatus status) throws AccountApiException, Bus.EventBusException {
@@ -109,6 +109,8 @@ public class DefaultAccountDao implements AccountDao {
         } catch (RuntimeException re) {
             if (re.getCause() instanceof AccountApiException) {
                 throw (AccountApiException) re.getCause();
+            } else if (re.getCause() instanceof DataTruncation) {
+                throw new AccountApiException(ErrorCode.DATA_TRUNCATION, re.getCause().getMessage());
             } else {
                 throw re;
             }
diff --git a/account/src/main/resources/com/ning/billing/account/ddl.sql b/account/src/main/resources/com/ning/billing/account/ddl.sql
index 58b47f0..fd2f6f8 100644
--- a/account/src/main/resources/com/ning/billing/account/ddl.sql
+++ b/account/src/main/resources/com/ning/billing/account/ddl.sql
@@ -17,7 +17,7 @@ CREATE TABLE accounts (
     state_or_province varchar(50) DEFAULT NULL,
     country varchar(50) DEFAULT NULL,
     postal_code varchar(11) DEFAULT NULL,
-    phone varchar(13) DEFAULT NULL,
+    phone varchar(25) DEFAULT NULL,
     created_dt datetime,
     updated_dt datetime,
     PRIMARY KEY(id)
@@ -25,3 +25,48 @@ CREATE TABLE accounts (
 CREATE UNIQUE INDEX accounts_external_key ON accounts(external_key);
 CREATE UNIQUE INDEX accounts_email ON accounts(email);
 
+DROP TABLE IF EXISTS account_history;
+CREATE TABLE account_history (
+    id char(36) NOT NULL,
+    external_key varchar(128) NULL,
+    email varchar(50) NOT NULL,
+    name varchar(100) NOT NULL,
+    first_name_length int NOT NULL,
+    currency char(3) DEFAULT NULL,
+    billing_cycle_day int DEFAULT NULL,
+    payment_provider_name varchar(20) DEFAULT NULL,
+    time_zone varchar(50) DEFAULT NULL,
+    locale varchar(5) DEFAULT NULL,
+    address1 varchar(100) DEFAULT NULL,
+    address2 varchar(100) DEFAULT NULL,
+    company_name varchar(50) DEFAULT NULL,
+    city varchar(50) DEFAULT NULL,
+    state_or_province varchar(50) DEFAULT NULL,
+    country varchar(50) DEFAULT NULL,
+    postal_code varchar(11) DEFAULT NULL,
+    phone varchar(25) DEFAULT NULL,
+    date datetime
+) ENGINE=innodb;
+CREATE INDEX account_id ON account_history(id);
+
+CREATE TRIGGER store_account_history_on_insert AFTER INSERT ON accounts
+    FOR EACH ROW
+        INSERT INTO account_history (id, external_key, email, name, first_name_length, currency,
+                                    billing_cycle_day, payment_provider_name, time_zone, locale, 
+                                    address1, address2, company_name, city, state_or_province, 
+                                    country, postal_code, phone, date)
+        VALUES (NEW.id, NEW.external_key, NEW.email, NEW.name, NEW.first_name_length, NEW.currency,
+                NEW.billing_cycle_day, NEW.payment_provider_name, NEW.time_zone, NEW.locale, 
+                NEW.address1, NEW.address2, NEW.company_name, NEW.city, NEW.state_or_province, 
+                NEW.country, NEW.postal_code, NEW.phone, NEW.created_dt);
+
+CREATE TRIGGER store_account_history_on_update AFTER UPDATE ON accounts
+    FOR EACH ROW
+        INSERT INTO account_history (id, external_key, email, name, first_name_length, currency,
+                                    billing_cycle_day, payment_provider_name, time_zone, locale, 
+                                    address1, address2, company_name, city, state_or_province, 
+                                    country, postal_code, phone, date)
+        VALUES (NEW.id, NEW.external_key, NEW.email, NEW.name, NEW.first_name_length, NEW.currency,
+                NEW.billing_cycle_day, NEW.payment_provider_name, NEW.time_zone, NEW.locale, 
+                NEW.address1, NEW.address2, NEW.company_name, NEW.city, NEW.state_or_province, 
+                NEW.country, NEW.postal_code, NEW.phone, NEW.updated_dt);
diff --git a/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java b/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
index f4f530e..964eb6a 100644
--- a/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
+++ b/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
@@ -21,7 +21,10 @@ import static org.testng.Assert.fail;
 import java.io.IOException;
 
 import org.apache.commons.io.IOUtils;
+import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.TransactionCallback;
+import org.skife.jdbi.v2.TransactionStatus;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
@@ -31,6 +34,7 @@ import com.google.inject.Stage;
 import com.ning.billing.account.glue.AccountModuleWithEmbeddedDb;
 import com.ning.billing.util.bus.DefaultBusService;
 import com.ning.billing.util.bus.BusService;
+import org.testng.annotations.BeforeMethod;
 
 public abstract class AccountDaoTestBase {
     protected AccountModuleWithEmbeddedDb module;
@@ -68,4 +72,29 @@ public abstract class AccountDaoTestBase {
     {
         module.stopDb();
     }
+
+    @BeforeMethod(alwaysRun = true)
+    public void cleanupData() {
+        dbi.inTransaction(new TransactionCallback<Void>() {
+            @Override
+            public Void inTransaction(Handle h, TransactionStatus status) throws Exception {
+                h.execute("truncate table accounts");
+                h.execute("truncate table entitlement_events");
+                h.execute("truncate table subscriptions");
+                h.execute("truncate table bundles");
+                h.execute("truncate table notifications");
+                h.execute("truncate table claimed_notifications");
+                h.execute("truncate table invoices");
+                h.execute("truncate table fixed_invoice_items");
+                h.execute("truncate table recurring_invoice_items");
+                h.execute("truncate table tag_definitions");
+                h.execute("truncate table tags");
+                h.execute("truncate table custom_fields");
+                h.execute("truncate table invoice_payments");
+                h.execute("truncate table payment_attempts");
+                h.execute("truncate table payments");
+                return null;
+            }
+        });
+    }
 }
diff --git a/account/src/test/java/com/ning/billing/account/dao/TestSimpleAccountDao.java b/account/src/test/java/com/ning/billing/account/dao/TestSimpleAccountDao.java
index 1c118a9..3c726aa 100644
--- a/account/src/test/java/com/ning/billing/account/dao/TestSimpleAccountDao.java
+++ b/account/src/test/java/com/ning/billing/account/dao/TestSimpleAccountDao.java
@@ -42,7 +42,7 @@ import com.ning.billing.util.tag.dao.TagDefinitionSqlDao;
 
 @Test(groups = {"account-dao"})
 public class TestSimpleAccountDao extends AccountDaoTestBase {
-    private DefaultAccount createTestAccount() {
+    private AccountBuilder createTestAccountBuilder() {
         String thisKey = "test" + UUID.randomUUID().toString();
         String lastName = UUID.randomUUID().toString();
         String thisEmail = "me@me.com" + " " + UUID.randomUUID();
@@ -65,13 +65,12 @@ public class TestSimpleAccountDao extends AccountDaoTestBase {
                                    .locale(locale)
                                    .timeZone(timeZone)
                                    .createdDate(createdDate)
-                                   .updatedDate(updatedDate)
-                                   .build();
+                                   .updatedDate(updatedDate);
     }
 
+    @Test
     public void testBasic() throws AccountApiException {
-
-        Account a = createTestAccount();
+        Account a = createTestAccountBuilder().build();
         accountDao.create(a);
         String key = a.getExternalKey();
 
@@ -88,9 +87,26 @@ public class TestSimpleAccountDao extends AccountDaoTestBase {
         assertTrue(all.size() >= 1);
     }
 
+    // simple test to ensure long phone numbers can be stored
+    @Test
+    public void testLongPhoneNumber() throws AccountApiException {
+        Account account = createTestAccountBuilder().phone("123456789012345678901234").build();
+        accountDao.create(account);
+
+        Account saved = accountDao.getAccountByKey(account.getExternalKey());
+        assertNotNull(saved);
+    }
+
+    // simple test to ensure excessively long phone numbers cannot be stored
+    @Test(expectedExceptions = {AccountApiException.class})
+    public void testOverlyLongPhoneNumber() throws AccountApiException {
+        Account account = createTestAccountBuilder().phone("12345678901234567890123456").build();
+        accountDao.create(account);
+    }
+
     @Test
     public void testGetById() throws AccountApiException {
-        Account account = createTestAccount();
+        Account account = createTestAccountBuilder().build();
         UUID id = account.getId();
         String key = account.getExternalKey();
         String name = account.getName();
@@ -109,7 +125,7 @@ public class TestSimpleAccountDao extends AccountDaoTestBase {
 
     @Test
     public void testCustomFields() throws AccountApiException {
-        Account account = createTestAccount();
+        Account account = createTestAccountBuilder().build();
         String fieldName = "testField1";
         String fieldValue = "testField1_value";
         account.setFieldValue(fieldName, fieldValue);
@@ -124,8 +140,8 @@ public class TestSimpleAccountDao extends AccountDaoTestBase {
 
     @Test
     public void testTags() throws AccountApiException {
-        Account account = createTestAccount();
-        TagDefinition definition = new DefaultTagDefinition("Test Tag", "For testing only", "Test System", new DateTime());
+        Account account = createTestAccountBuilder().build();
+        TagDefinition definition = new DefaultTagDefinition("Test Tag", "For testing only", "Test System");
         TagDefinitionSqlDao tagDescriptionDao = dbi.onDemand(TagDefinitionSqlDao.class);
         tagDescriptionDao.create(definition);
 
@@ -146,7 +162,7 @@ public class TestSimpleAccountDao extends AccountDaoTestBase {
 
     @Test
     public void testGetIdFromKey() throws AccountApiException {
-        Account account = createTestAccount();
+        Account account = createTestAccountBuilder().build();
         accountDao.create(account);
 
         try {
@@ -159,12 +175,13 @@ public class TestSimpleAccountDao extends AccountDaoTestBase {
 
     @Test(expectedExceptions = AccountApiException.class)
     public void testGetIdFromKeyForNullKey() throws AccountApiException {
-        accountDao.getIdFromKey(null);
+        String key = null;
+        accountDao.getIdFromKey(key);
     }
 
     @Test
     public void testUpdate() throws Exception {
-        final Account account = createTestAccount();
+        final Account account = createTestAccountBuilder().build();
         accountDao.create(account);
 
         AccountData accountData = new AccountData() {
@@ -365,7 +382,7 @@ public class TestSimpleAccountDao extends AccountDaoTestBase {
     @Test(groups={"slow"},enabled=true)
     public void testDelete() throws AccountApiException {
 
-        Account a = createTestAccount();
+        Account a = createTestAccountBuilder().build();
         accountDao.create(a);
         String key = a.getExternalKey();
 
diff --git a/analytics/pom.xml b/analytics/pom.xml
index 9ee654b..10f94d4 100644
--- a/analytics/pom.xml
+++ b/analytics/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.6-SNAPSHOT</version>
+        <version>0.1.7-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-analytics</artifactId>
diff --git a/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java b/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
index d623338..1735062 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
@@ -36,14 +36,15 @@ public class AnalyticsListener
     }
 
     @Subscribe
-    public void handleSubscriptionTransitionChange(final SubscriptionTransition event) throws AccountApiException {
+    public void handleSubscriptionTransitionChange(final SubscriptionTransition event) throws AccountApiException
+    {
         switch (event.getTransitionType()) {
             case MIGRATE_ENTITLEMENT:
                 // TODO do nothing for now
-            break;
+                break;
             case CREATE:
                 bstRecorder.subscriptionCreated(event);
-            break;
+                break;
             case CANCEL:
                 bstRecorder.subscriptionCancelled(event);
                 break;
diff --git a/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransition.java b/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransition.java
index 8da7ff0..6c3a393 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransition.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransition.java
@@ -18,6 +18,8 @@ package com.ning.billing.analytics;
 
 import org.joda.time.DateTime;
 
+import java.util.UUID;
+
 /**
  * Describe a state change between two BusinessSubscription
  * <p/>
@@ -25,6 +27,7 @@ import org.joda.time.DateTime;
  */
 public class BusinessSubscriptionTransition
 {
+    private final UUID id;
     private final String key;
     private final String accountKey;
     private final DateTime requestedTimestamp;
@@ -32,8 +35,11 @@ public class BusinessSubscriptionTransition
     private final BusinessSubscription previousSubscription;
     private final BusinessSubscription nextSubscription;
 
-    public BusinessSubscriptionTransition(final String key, final String accountKey, final DateTime requestedTimestamp, final BusinessSubscriptionEvent event, final BusinessSubscription previousSubscription, final BusinessSubscription nextsubscription)
+    public BusinessSubscriptionTransition(final UUID id, final String key, final String accountKey, final DateTime requestedTimestamp, final BusinessSubscriptionEvent event, final BusinessSubscription previousSubscription, final BusinessSubscription nextsubscription)
     {
+        if (id == null) {
+            throw new IllegalArgumentException("An event must have an id");
+        }
         if (key == null) {
             throw new IllegalArgumentException("An event must have an key");
         }
@@ -47,6 +53,7 @@ public class BusinessSubscriptionTransition
             throw new IllegalArgumentException("No event specified");
         }
 
+        this.id = id;
         this.key = key;
         this.accountKey = accountKey;
         this.requestedTimestamp = requestedTimestamp;
@@ -55,6 +62,11 @@ public class BusinessSubscriptionTransition
         this.nextSubscription = nextsubscription;
     }
 
+    public UUID getId()
+    {
+        return id;
+    }
+
     public BusinessSubscriptionEvent getEvent()
     {
         return event;
@@ -90,10 +102,11 @@ public class BusinessSubscriptionTransition
     {
         final StringBuilder sb = new StringBuilder();
         sb.append("BusinessSubscriptionTransition");
-        sb.append("{event=").append(event);
+        sb.append("{accountKey='").append(accountKey).append('\'');
+        sb.append(", id=").append(id);
         sb.append(", key='").append(key).append('\'');
-        sb.append(", accountKey='").append(accountKey).append('\'');
         sb.append(", requestedTimestamp=").append(requestedTimestamp);
+        sb.append(", event=").append(event);
         sb.append(", previousSubscription=").append(previousSubscription);
         sb.append(", nextSubscription=").append(nextSubscription);
         sb.append('}');
@@ -112,13 +125,16 @@ public class BusinessSubscriptionTransition
 
         final BusinessSubscriptionTransition that = (BusinessSubscriptionTransition) o;
 
+        if (accountKey != null ? !accountKey.equals(that.accountKey) : that.accountKey != null) {
+            return false;
+        }
         if (event != null ? !event.equals(that.event) : that.event != null) {
             return false;
         }
-        if (key != null ? !key.equals(that.key) : that.key != null) {
+        if (id != null ? !id.equals(that.id) : that.id != null) {
             return false;
         }
-        if (accountKey != null ? !accountKey.equals(that.accountKey) : that.accountKey != null) {
+        if (key != null ? !key.equals(that.key) : that.key != null) {
             return false;
         }
         if (nextSubscription != null ? !nextSubscription.equals(that.nextSubscription) : that.nextSubscription != null) {
@@ -137,7 +153,8 @@ public class BusinessSubscriptionTransition
     @Override
     public int hashCode()
     {
-        int result = key != null ? key.hashCode() : 0;
+        int result = id != null ? id.hashCode() : 0;
+        result = 31 * result + (key != null ? key.hashCode() : 0);
         result = 31 * result + (accountKey != null ? accountKey.hashCode() : 0);
         result = 31 * result + (requestedTimestamp != null ? requestedTimestamp.hashCode() : 0);
         result = 31 * result + (event != null ? event.hashCode() : 0);
diff --git a/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java b/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java
index 0384fab..438850f 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.UUID;
 
 public class BusinessSubscriptionTransitionRecorder
 {
@@ -47,37 +48,45 @@ public class BusinessSubscriptionTransitionRecorder
         this.accountApi = accountApi;
     }
 
-    public void subscriptionCreated(final SubscriptionTransition created) throws AccountApiException {
+    public void subscriptionCreated(final SubscriptionTransition created) throws AccountApiException
+    {
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionCreated(created.getNextPlan());
         recordTransition(event, created);
     }
 
-    public void subscriptionCancelled(final SubscriptionTransition cancelled) throws AccountApiException {
-        final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionCancelled(cancelled.getNextPlan());
+    public void subscriptionCancelled(final SubscriptionTransition cancelled) throws AccountApiException
+    {
+        // cancelled.getNextPlan() is null here - need to look at the previous one to create the correct event name
+        final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionCancelled(cancelled.getPreviousPlan());
         recordTransition(event, cancelled);
     }
 
-    public void subscriptionChanged(final SubscriptionTransition changed) throws AccountApiException {
+    public void subscriptionChanged(final SubscriptionTransition changed) throws AccountApiException
+    {
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionChanged(changed.getNextPlan());
         recordTransition(event, changed);
     }
 
-    public void subscriptionPaused(final SubscriptionTransition paused) throws AccountApiException {
+    public void subscriptionPaused(final SubscriptionTransition paused) throws AccountApiException
+    {
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionPaused(paused.getNextPlan());
         recordTransition(event, paused);
     }
 
-    public void subscriptionResumed(final SubscriptionTransition resumed) throws AccountApiException {
+    public void subscriptionResumed(final SubscriptionTransition resumed) throws AccountApiException
+    {
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionResumed(resumed.getNextPlan());
         recordTransition(event, resumed);
     }
 
-    public void subscriptionPhaseChanged(final SubscriptionTransition phaseChanged) throws AccountApiException {
+    public void subscriptionPhaseChanged(final SubscriptionTransition phaseChanged) throws AccountApiException
+    {
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionPhaseChanged(phaseChanged.getNextPlan(), phaseChanged.getNextState());
         recordTransition(event, phaseChanged);
     }
 
-    public void recordTransition(final BusinessSubscriptionEvent event, final SubscriptionTransition transition) throws AccountApiException {
+    public void recordTransition(final BusinessSubscriptionEvent event, final SubscriptionTransition transition) throws AccountApiException
+    {
         Currency currency = null;
         String transitionKey = null;
         String accountKey = null;
@@ -113,15 +122,24 @@ public class BusinessSubscriptionTransitionRecorder
         else {
             prevSubscription = new BusinessSubscription(transition.getPreviousPriceList(), transition.getPreviousPlan(), transition.getPreviousPhase(), currency, previousEffectiveTransitionTime, transition.getPreviousState(), transition.getSubscriptionId(), transition.getBundleId());
         }
-        final BusinessSubscription nextSubscription = new BusinessSubscription(transition.getNextPriceList(), transition.getNextPlan(), transition.getNextPhase(), currency, transition.getEffectiveTransitionTime(), transition.getNextState(), transition.getSubscriptionId(), transition.getBundleId());
+        final BusinessSubscription nextSubscription;
+
+        // next plan is null for CANCEL events
+        if (transition.getNextPlan() == null) {
+            nextSubscription = null;
+        }
+        else {
+            nextSubscription = new BusinessSubscription(transition.getNextPriceList(), transition.getNextPlan(), transition.getNextPhase(), currency, transition.getEffectiveTransitionTime(), transition.getNextState(), transition.getSubscriptionId(), transition.getBundleId());
+        }
 
-        record(transitionKey, accountKey, transition.getRequestedTransitionTime(), event, prevSubscription, nextSubscription);
+        record(transition.getId(), transitionKey, accountKey, transition.getRequestedTransitionTime(), event, prevSubscription, nextSubscription);
     }
 
     // Public for internal reasons
-    public void record(final String key, final String accountKey, final DateTime requestedDateTime, final BusinessSubscriptionEvent event, final BusinessSubscription prevSubscription, final BusinessSubscription nextSubscription)
+    public void record(final UUID id, final String key, final String accountKey, final DateTime requestedDateTime, final BusinessSubscriptionEvent event, final BusinessSubscription prevSubscription, final BusinessSubscription nextSubscription)
     {
         final BusinessSubscriptionTransition transition = new BusinessSubscriptionTransition(
+            id,
             key,
             accountKey,
             requestedDateTime,
diff --git a/analytics/src/main/java/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionBinder.java b/analytics/src/main/java/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionBinder.java
index 374f296..b769e5a 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionBinder.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionBinder.java
@@ -43,6 +43,7 @@ public @interface BusinessSubscriptionTransitionBinder
             {
                 public void bind(final SQLStatement q, final BusinessSubscriptionTransitionBinder bind, final BusinessSubscriptionTransition arg)
                 {
+                    q.bind("event_id", arg.getId().toString());
                     q.bind("event_key", arg.getKey());
                     q.bind("account_key", arg.getAccountKey());
                     q.bind("requested_timestamp", arg.getRequestedTimestamp().getMillis());
diff --git a/analytics/src/main/java/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionMapper.java b/analytics/src/main/java/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionMapper.java
index a31d104..ed41ab5 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionMapper.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionMapper.java
@@ -38,20 +38,20 @@ public class BusinessSubscriptionTransitionMapper implements ResultSetMapper<Bus
     public BusinessSubscriptionTransition map(final int index, final ResultSet r, final StatementContext ctx) throws SQLException
     {
         BusinessSubscription prev = new BusinessSubscription(
-            r.getString(5), // productName
-            r.getString(6), // productType
-            r.getString(7) == null ? null : ProductCategory.valueOf(r.getString(7)), // productCategory
-            r.getString(8), // slug
-            r.getString(9),  // phase
-            r.getString(10),  // billing period
-            BigDecimal.valueOf(r.getDouble(11)), // price
-            r.getString(12), // priceList
-            BigDecimal.valueOf(r.getDouble(13)), // mrr
-            r.getString(14), // currency
-            r.getLong(15) == 0 ? null : new DateTime(r.getLong(15), DateTimeZone.UTC), // startDate
-            r.getString(16) == null ? null : SubscriptionState.valueOf(r.getString(16)), // state
-            r.getString(17) == null ? null : UUID.fromString(r.getString(17)), // subscriptionId
-            r.getString(18) == null ? null : UUID.fromString(r.getString(18)) //bundleId
+            r.getString(6), // productName
+            r.getString(7), // productType
+            r.getString(8) == null ? null : ProductCategory.valueOf(r.getString(8)), // productCategory
+            r.getString(9), // slug
+            r.getString(10),  // phase
+            r.getString(11),  // billing period
+            BigDecimal.valueOf(r.getDouble(12)), // price
+            r.getString(13), // priceList
+            BigDecimal.valueOf(r.getDouble(14)), // mrr
+            r.getString(15), // currency
+            r.getLong(16) == 0 ? null : new DateTime(r.getLong(16), DateTimeZone.UTC), // startDate
+            r.getString(17) == null ? null : SubscriptionState.valueOf(r.getString(17)), // state
+            r.getString(18) == null ? null : UUID.fromString(r.getString(18)), // subscriptionId
+            r.getString(19) == null ? null : UUID.fromString(r.getString(19)) //bundleId
         );
 
         // Avoid creating a dummy subscriptions with all null fields
@@ -60,20 +60,20 @@ public class BusinessSubscriptionTransitionMapper implements ResultSetMapper<Bus
         }
 
         BusinessSubscription next = new BusinessSubscription(
-            r.getString(19), // productName
-            r.getString(20), // productType
-            r.getString(21) == null ? null : ProductCategory.valueOf(r.getString(21)), // productCategory
-            r.getString(22), // slug8
-            r.getString(23),  // phase
-            r.getString(24),  // billing period
-            BigDecimal.valueOf(r.getDouble(25)), // price
-            r.getString(26), // priceList
-            BigDecimal.valueOf(r.getDouble(27)), // mrr
-            r.getString(28), // currency
-            r.getLong(29) == 0 ? null : new DateTime(r.getLong(29), DateTimeZone.UTC), // startDate
-            r.getString(30) == null ? null : SubscriptionState.valueOf(r.getString(30)), // state
-            r.getString(31) == null ? null : UUID.fromString(r.getString(31)), // subscriptionId
-            r.getString(32) == null ? null : UUID.fromString(r.getString(32)) //bundleId
+            r.getString(20), // productName
+            r.getString(21), // productType
+            r.getString(22) == null ? null : ProductCategory.valueOf(r.getString(22)), // productCategory
+            r.getString(23), // slug8
+            r.getString(24),  // phase
+            r.getString(25),  // billing period
+            BigDecimal.valueOf(r.getDouble(26)), // price
+            r.getString(27), // priceList
+            BigDecimal.valueOf(r.getDouble(28)), // mrr
+            r.getString(29), // currency
+            r.getLong(30) == 0 ? null : new DateTime(r.getLong(30), DateTimeZone.UTC), // startDate
+            r.getString(31) == null ? null : SubscriptionState.valueOf(r.getString(31)), // state
+            r.getString(32) == null ? null : UUID.fromString(r.getString(32)), // subscriptionId
+            r.getString(33) == null ? null : UUID.fromString(r.getString(33)) //bundleId
         );
 
         // Avoid creating a dummy subscriptions with all null fields
@@ -81,12 +81,13 @@ public class BusinessSubscriptionTransitionMapper implements ResultSetMapper<Bus
             next = null;
         }
 
-        final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.valueOf(r.getString(4));
+        final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.valueOf(r.getString(5));
 
         return new BusinessSubscriptionTransition(
-            r.getString(1),
+            UUID.fromString(r.getString(1)),
             r.getString(2),
-            new DateTime(r.getLong(3), DateTimeZone.UTC),
+            r.getString(3),
+            new DateTime(r.getLong(4), DateTimeZone.UTC),
             event,
             prev,
             next
diff --git a/analytics/src/main/resources/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionDao.sql.stg b/analytics/src/main/resources/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionDao.sql.stg
index 1654b5b..de6076e 100644
--- a/analytics/src/main/resources/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionDao.sql.stg
+++ b/analytics/src/main/resources/com/ning/billing/analytics/dao/BusinessSubscriptionTransitionDao.sql.stg
@@ -2,7 +2,8 @@ group BusinessSubscriptionTransition;
 
 getTransitions(event_key) ::= <<
   select
-    event_key
+    event_id
+  , event_key
   , account_key
   , requested_timestamp
   , event
@@ -41,8 +42,9 @@ getTransitions(event_key) ::= <<
 >>
 
 createTransition() ::= <<
-  insert into bst(
-    event_key
+  insert ignore into bst(
+    event_id
+  , event_key
   , account_key
   , requested_timestamp
   , event
@@ -75,7 +77,8 @@ createTransition() ::= <<
   , next_subscription_id
   , next_bundle_id
   ) values (
-    :event_key
+    :event_id
+  , :event_key
   , :account_key
   , :requested_timestamp
   , :event
diff --git a/analytics/src/main/resources/com/ning/billing/analytics/ddl.sql b/analytics/src/main/resources/com/ning/billing/analytics/ddl.sql
index 49e48f0..6489b12 100644
--- a/analytics/src/main/resources/com/ning/billing/analytics/ddl.sql
+++ b/analytics/src/main/resources/com/ning/billing/analytics/ddl.sql
@@ -1,6 +1,7 @@
 drop table if exists bst;
 create table bst (
-  event_key varchar(50) not null
+  event_id char(36) not null
+, event_key varchar(50) not null
 , account_key varchar(50) not null
 , requested_timestamp bigint not null
 , event varchar(50) not null
@@ -32,6 +33,7 @@ create table bst (
 , next_state varchar(32) default null
 , next_subscription_id varchar(100) default null
 , next_bundle_id varchar(100) default null
+, primary key(event_id)
 ) engine=innodb;
 create index bst_key_index on bst (event_key, requested_timestamp asc);
 
diff --git a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
index 6054529..c413f0d 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
@@ -72,10 +72,11 @@ import static org.testng.Assert.fail;
 @Guice(modules = AnalyticsTestModule.class)
 public class TestAnalyticsService
 {
+    private static final UUID ID = UUID.randomUUID();
     private static final String KEY = "12345";
     private static final String ACCOUNT_KEY = "pierre-12345";
-    private static final DefaultTagDefinition TAG_ONE = new DefaultTagDefinition("batch20", "something", "pierre", new DateTime(DateTimeZone.UTC));
-    private static final DefaultTagDefinition TAG_TWO = new DefaultTagDefinition("awesome", "something", "pierre", new DateTime(DateTimeZone.UTC));
+    private static final DefaultTagDefinition TAG_ONE = new DefaultTagDefinition("batch20", "something", "pierre");
+    private static final DefaultTagDefinition TAG_TWO = new DefaultTagDefinition("awesome", "something", "pierre");
 
     @Inject
     private AccountUserApi accountApi;
@@ -165,7 +166,7 @@ public class TestAnalyticsService
         final String priceList = "something";
 
         transition = new SubscriptionTransitionData(
-            UUID.randomUUID(),
+            ID,
             subscriptionId,
             bundle.getId(),
             EntitlementEvent.EventType.API_USER,
@@ -182,6 +183,7 @@ public class TestAnalyticsService
             priceList
         );
         expectedTransition = new BusinessSubscriptionTransition(
+            ID,
             KEY,
             ACCOUNT_KEY,
             requestedTransitionTime,
diff --git a/analytics/src/test/java/com/ning/billing/analytics/dao/TestAnalyticsDao.java b/analytics/src/test/java/com/ning/billing/analytics/dao/TestAnalyticsDao.java
index c17cdd4..a9a5422 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/dao/TestAnalyticsDao.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/dao/TestAnalyticsDao.java
@@ -52,6 +52,7 @@ import java.util.UUID;
 
 public class TestAnalyticsDao
 {
+    private static final UUID EVENT_ID = UUID.randomUUID();
     private static final String EVENT_KEY = "12345";
     private static final String ACCOUNT_KEY = "pierre-143343-vcc";
 
@@ -84,7 +85,7 @@ public class TestAnalyticsDao
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionCancelled(plan);
         final DateTime requestedTimestamp = new DateTime(DateTimeZone.UTC);
 
-        transition = new BusinessSubscriptionTransition(EVENT_KEY, ACCOUNT_KEY, requestedTimestamp, event, prevSubscription, nextSubscription);
+        transition = new BusinessSubscriptionTransition(EVENT_ID, EVENT_KEY, ACCOUNT_KEY, requestedTimestamp, event, prevSubscription, nextSubscription);
 
         final IDBI dbi = helper.getDBI();
         businessSubscriptionTransitionDao = dbi.onDemand(BusinessSubscriptionTransitionDao.class);
@@ -130,9 +131,50 @@ public class TestAnalyticsDao
     }
 
     @Test(groups = "slow")
+    public void testHandleDuplicatedEvents()
+    {
+        final BusinessSubscriptionTransition transitionWithNullPrev = new BusinessSubscriptionTransition(
+            transition.getId(),
+            transition.getKey(),
+            transition.getAccountKey(),
+            transition.getRequestedTimestamp(),
+            transition.getEvent(),
+            null,
+            transition.getNextSubscription()
+        );
+
+        businessSubscriptionTransitionDao.createTransition(transitionWithNullPrev);
+        List<BusinessSubscriptionTransition> transitions = businessSubscriptionTransitionDao.getTransitions(EVENT_KEY);
+        Assert.assertEquals(transitions.size(), 1);
+        Assert.assertEquals(transitions.get(0), transitionWithNullPrev);
+        // Try to add the same transition, with the same UUID - we should only store one though
+        businessSubscriptionTransitionDao.createTransition(transitionWithNullPrev);
+        transitions = businessSubscriptionTransitionDao.getTransitions(EVENT_KEY);
+        Assert.assertEquals(transitions.size(), 1);
+        Assert.assertEquals(transitions.get(0), transitionWithNullPrev);
+
+        // Try now to store a look-alike transition (same fields except UUID) - we should store it this time
+        final BusinessSubscriptionTransition secondTransitionWithNullPrev = new BusinessSubscriptionTransition(
+            UUID.randomUUID(),
+            transition.getKey(),
+            transition.getAccountKey(),
+            transition.getRequestedTimestamp(),
+            transition.getEvent(),
+            null,
+            transition.getNextSubscription()
+        );
+        businessSubscriptionTransitionDao.createTransition(secondTransitionWithNullPrev);
+        transitions = businessSubscriptionTransitionDao.getTransitions(EVENT_KEY);
+        Assert.assertEquals(transitions.size(), 2);
+        Assert.assertTrue(transitions.contains(transitionWithNullPrev));
+        Assert.assertTrue(transitions.contains(secondTransitionWithNullPrev));
+    }
+
+    @Test(groups = "slow")
     public void testTransitionsWithNullPrevSubscription()
     {
         final BusinessSubscriptionTransition transitionWithNullPrev = new BusinessSubscriptionTransition(
+            transition.getId(),
             transition.getKey(),
             transition.getAccountKey(),
             transition.getRequestedTimestamp(),
@@ -151,6 +193,7 @@ public class TestAnalyticsDao
     public void testTransitionsWithNullNextSubscription()
     {
         final BusinessSubscriptionTransition transitionWithNullNext = new BusinessSubscriptionTransition(
+            transition.getId(),
             transition.getKey(),
             transition.getAccountKey(),
             transition.getRequestedTimestamp(),
@@ -170,6 +213,7 @@ public class TestAnalyticsDao
     {
         final BusinessSubscription subscriptionWithNullFields = new BusinessSubscription(null, plan, phase, Currency.USD, null, null, null, null);
         final BusinessSubscriptionTransition transitionWithNullFields = new BusinessSubscriptionTransition(
+            transition.getId(),
             transition.getKey(),
             transition.getAccountKey(),
             transition.getRequestedTimestamp(),
@@ -189,6 +233,7 @@ public class TestAnalyticsDao
     {
         final BusinessSubscription subscriptionWithNullPlanAndPhase = new BusinessSubscription(null, null, null, Currency.USD, null, null, null, null);
         final BusinessSubscriptionTransition transitionWithNullPlanAndPhase = new BusinessSubscriptionTransition(
+            transition.getId(),
             transition.getKey(),
             transition.getAccountKey(),
             transition.getRequestedTimestamp(),
@@ -213,6 +258,7 @@ public class TestAnalyticsDao
     {
         final BusinessSubscription subscriptionWithNullPlan = new BusinessSubscription(null, null, phase, Currency.USD, null, null, null, null);
         final BusinessSubscriptionTransition transitionWithNullPlan = new BusinessSubscriptionTransition(
+            transition.getId(),
             transition.getKey(),
             transition.getAccountKey(),
             transition.getRequestedTimestamp(),
@@ -233,6 +279,7 @@ public class TestAnalyticsDao
     {
         final BusinessSubscription subscriptionWithNullPhase = new BusinessSubscription(null, plan, null, Currency.USD, null, null, null, null);
         final BusinessSubscriptionTransition transitionWithNullPhase = new BusinessSubscriptionTransition(
+            transition.getId(),
             transition.getKey(),
             transition.getAccountKey(),
             transition.getRequestedTimestamp(),
diff --git a/analytics/src/test/java/com/ning/billing/analytics/TestAnalyticsListener.java b/analytics/src/test/java/com/ning/billing/analytics/TestAnalyticsListener.java
index ec6d9ae..a9a8293 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/TestAnalyticsListener.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/TestAnalyticsListener.java
@@ -17,8 +17,6 @@
 package com.ning.billing.analytics;
 
 
-import java.util.UUID;
-
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.catalog.api.PhaseType;
 import com.ning.billing.catalog.api.Plan;
@@ -30,13 +28,14 @@ import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.user.ApiEventType;
 import com.ning.billing.util.clock.ClockMock;
-
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.util.UUID;
+
 
 public class TestAnalyticsListener
 {
@@ -68,7 +67,7 @@ public class TestAnalyticsListener
         final DateTime effectiveTransitionTime = new DateTime(DateTimeZone.UTC);
         final DateTime requestedTransitionTime = new DateTime(DateTimeZone.UTC);
         final SubscriptionTransitionData firstTransition = createFirstSubscriptionTransition(requestedTransitionTime, effectiveTransitionTime);
-        final BusinessSubscriptionTransition firstBST = createExpectedFirstBST(requestedTransitionTime, effectiveTransitionTime);
+        final BusinessSubscriptionTransition firstBST = createExpectedFirstBST(firstTransition.getId(), requestedTransitionTime, effectiveTransitionTime);
         listener.handleSubscriptionTransitionChange(firstTransition);
         Assert.assertEquals(dao.getTransitions(KEY).size(), 1);
         Assert.assertEquals(dao.getTransitions(KEY).get(0), firstBST);
@@ -77,7 +76,7 @@ public class TestAnalyticsListener
         final DateTime effectivePauseTransitionTime = new DateTime(DateTimeZone.UTC);
         final DateTime requestedPauseTransitionTime = new DateTime(DateTimeZone.UTC);
         final SubscriptionTransitionData pausedSubscriptionTransition = createPauseSubscriptionTransition(effectivePauseTransitionTime, requestedPauseTransitionTime, firstTransition.getNextState());
-        final BusinessSubscriptionTransition pausedBST = createExpectedPausedBST(requestedPauseTransitionTime, effectivePauseTransitionTime, firstBST.getNextSubscription());
+        final BusinessSubscriptionTransition pausedBST = createExpectedPausedBST(pausedSubscriptionTransition.getId(), requestedPauseTransitionTime, effectivePauseTransitionTime, firstBST.getNextSubscription());
         listener.handleSubscriptionTransitionChange(pausedSubscriptionTransition);
         Assert.assertEquals(dao.getTransitions(KEY).size(), 2);
         Assert.assertEquals(dao.getTransitions(KEY).get(1), pausedBST);
@@ -86,7 +85,7 @@ public class TestAnalyticsListener
         final DateTime effectiveResumeTransitionTime = new DateTime(DateTimeZone.UTC);
         final DateTime requestedResumeTransitionTime = new DateTime(DateTimeZone.UTC);
         final SubscriptionTransitionData resumedSubscriptionTransition = createResumeSubscriptionTransition(requestedResumeTransitionTime, effectiveResumeTransitionTime, pausedSubscriptionTransition.getNextState());
-        final BusinessSubscriptionTransition resumedBST = createExpectedResumedBST(requestedResumeTransitionTime, effectiveResumeTransitionTime, pausedBST.getNextSubscription());
+        final BusinessSubscriptionTransition resumedBST = createExpectedResumedBST(resumedSubscriptionTransition.getId(), requestedResumeTransitionTime, effectiveResumeTransitionTime, pausedBST.getNextSubscription());
         listener.handleSubscriptionTransitionChange(resumedSubscriptionTransition);
         Assert.assertEquals(dao.getTransitions(KEY).size(), 3);
         Assert.assertEquals(dao.getTransitions(KEY).get(2), resumedBST);
@@ -94,41 +93,42 @@ public class TestAnalyticsListener
         // Cancel it
         final DateTime effectiveCancelTransitionTime = new DateTime(DateTimeZone.UTC);
         final DateTime requestedCancelTransitionTime = new DateTime(DateTimeZone.UTC);
-        listener.handleSubscriptionTransitionChange(createCancelSubscriptionTransition(requestedCancelTransitionTime, effectiveCancelTransitionTime, resumedSubscriptionTransition.getNextState()));
-        final BusinessSubscriptionTransition cancelledBST = createExpectedCancelledBST(requestedCancelTransitionTime, effectiveCancelTransitionTime, resumedBST.getNextSubscription());
+        final SubscriptionTransitionData cancelledSubscriptionTransition = createCancelSubscriptionTransition(requestedCancelTransitionTime, effectiveCancelTransitionTime, resumedSubscriptionTransition.getNextState());
+        final BusinessSubscriptionTransition cancelledBST = createExpectedCancelledBST(cancelledSubscriptionTransition.getId(), requestedCancelTransitionTime, effectiveCancelTransitionTime, resumedBST.getNextSubscription());
+        listener.handleSubscriptionTransitionChange(cancelledSubscriptionTransition);
         Assert.assertEquals(dao.getTransitions(KEY).size(), 4);
         Assert.assertEquals(dao.getTransitions(KEY).get(3), cancelledBST);
     }
 
-    private BusinessSubscriptionTransition createExpectedFirstBST(final DateTime requestedTransitionTime, final DateTime effectiveTransitionTime)
+    private BusinessSubscriptionTransition createExpectedFirstBST(final UUID id, final DateTime requestedTransitionTime, final DateTime effectiveTransitionTime)
     {
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionCreated(plan);
         final Subscription.SubscriptionState subscriptionState = Subscription.SubscriptionState.ACTIVE;
-        return createExpectedBST(event, requestedTransitionTime, effectiveTransitionTime, null, subscriptionState);
+        return createExpectedBST(id, event, requestedTransitionTime, effectiveTransitionTime, null, subscriptionState);
     }
 
-    private BusinessSubscriptionTransition createExpectedPausedBST(final DateTime requestedTransitionTime, final DateTime effectiveTransitionTime, final BusinessSubscription lastSubscription)
+    private BusinessSubscriptionTransition createExpectedPausedBST(final UUID id, final DateTime requestedTransitionTime, final DateTime effectiveTransitionTime, final BusinessSubscription lastSubscription)
     {
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionPaused(plan);
         final Subscription.SubscriptionState subscriptionState = Subscription.SubscriptionState.PAUSED;
-        return createExpectedBST(event, requestedTransitionTime, effectiveTransitionTime, lastSubscription, subscriptionState);
+        return createExpectedBST(id, event, requestedTransitionTime, effectiveTransitionTime, lastSubscription, subscriptionState);
     }
 
-    private BusinessSubscriptionTransition createExpectedResumedBST(final DateTime requestedTransitionTime, final DateTime effectiveTransitionTime, final BusinessSubscription lastSubscription)
+    private BusinessSubscriptionTransition createExpectedResumedBST(final UUID id, final DateTime requestedTransitionTime, final DateTime effectiveTransitionTime, final BusinessSubscription lastSubscription)
     {
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionResumed(plan);
         final Subscription.SubscriptionState nextState = Subscription.SubscriptionState.ACTIVE;
-        return createExpectedBST(event, requestedTransitionTime, effectiveTransitionTime, lastSubscription, nextState);
+        return createExpectedBST(id, event, requestedTransitionTime, effectiveTransitionTime, lastSubscription, nextState);
     }
 
-    private BusinessSubscriptionTransition createExpectedCancelledBST(final DateTime requestedTransitionTime, final DateTime effectiveTransitionTime, final BusinessSubscription lastSubscription)
+    private BusinessSubscriptionTransition createExpectedCancelledBST(final UUID id, final DateTime requestedTransitionTime, final DateTime effectiveTransitionTime, final BusinessSubscription lastSubscription)
     {
         final BusinessSubscriptionEvent event = BusinessSubscriptionEvent.subscriptionCancelled(plan);
-        final Subscription.SubscriptionState nextState = Subscription.SubscriptionState.CANCELLED;
-        return createExpectedBST(event, requestedTransitionTime, effectiveTransitionTime, lastSubscription, nextState);
+        return createExpectedBST(id, event, requestedTransitionTime, effectiveTransitionTime, lastSubscription, null);
     }
 
     private BusinessSubscriptionTransition createExpectedBST(
+        final UUID eventId,
         final BusinessSubscriptionEvent eventType,
         final DateTime requestedTransitionTime,
         final DateTime effectiveTransitionTime,
@@ -137,12 +137,13 @@ public class TestAnalyticsListener
     )
     {
         return new BusinessSubscriptionTransition(
+            eventId,
             KEY,
             ACCOUNT_KEY,
             requestedTransitionTime,
             eventType,
             previousSubscription,
-            new BusinessSubscription(
+            nextState == null ? null : new BusinessSubscription(
                 null,
                 plan,
                 phase,
@@ -195,8 +196,24 @@ public class TestAnalyticsListener
     private SubscriptionTransitionData createCancelSubscriptionTransition(final DateTime requestedTransitionTime, final DateTime effectiveTransitionTime, final Subscription.SubscriptionState previousState)
     {
         final ApiEventType eventType = ApiEventType.CANCEL;
-        final Subscription.SubscriptionState nextState = Subscription.SubscriptionState.CANCELLED;
-        return createSubscriptionTransition(eventType, requestedTransitionTime, effectiveTransitionTime, previousState, nextState);
+        // next state is null for canceled events
+        return new SubscriptionTransitionData(
+            UUID.randomUUID(),
+            subscriptionId,
+            bundleUUID,
+            EntitlementEvent.EventType.API_USER,
+            eventType,
+            requestedTransitionTime,
+            effectiveTransitionTime,
+            previousState,
+            plan,
+            phase,
+            priceList,
+            null,
+            null,
+            null,
+            null
+        );
     }
 
     private SubscriptionTransitionData createSubscriptionTransition(
diff --git a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessSubscriptionTransition.java b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessSubscriptionTransition.java
index a8a954c..3b33bef 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessSubscriptionTransition.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessSubscriptionTransition.java
@@ -28,6 +28,8 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.util.UUID;
+
 import static com.ning.billing.catalog.api.Currency.USD;
 
 public class TestBusinessSubscriptionTransition
@@ -36,6 +38,7 @@ public class TestBusinessSubscriptionTransition
     private BusinessSubscription nextSubscription;
     private BusinessSubscriptionEvent event;
     private DateTime requestedTimestamp;
+    private UUID id;
     private String key;
     private String accountKey;
     private BusinessSubscriptionTransition transition;
@@ -53,9 +56,10 @@ public class TestBusinessSubscriptionTransition
         nextSubscription = new BusinessSubscription(nextISubscription, USD);
         event = BusinessSubscriptionEvent.subscriptionCancelled(prevISubscription.getCurrentPlan());
         requestedTimestamp = new DateTime(DateTimeZone.UTC);
+        id = UUID.randomUUID();
         key = "1234";
         accountKey = "pierre-1234";
-        transition = new BusinessSubscriptionTransition(key, accountKey, requestedTimestamp, event, prevSubscription, nextSubscription);
+        transition = new BusinessSubscriptionTransition(id, key, accountKey, requestedTimestamp, event, prevSubscription, nextSubscription);
     }
 
     @Test(groups = "fast")
@@ -76,22 +80,22 @@ public class TestBusinessSubscriptionTransition
 
         BusinessSubscriptionTransition otherTransition;
 
-        otherTransition = new BusinessSubscriptionTransition(key, accountKey, new DateTime(), event, prevSubscription, nextSubscription);
+        otherTransition = new BusinessSubscriptionTransition(id, key, accountKey, new DateTime(), event, prevSubscription, nextSubscription);
         Assert.assertTrue(!transition.equals(otherTransition));
 
-        otherTransition = new BusinessSubscriptionTransition("12345", accountKey, requestedTimestamp, event, prevSubscription, nextSubscription);
+        otherTransition = new BusinessSubscriptionTransition(id, "12345", accountKey, requestedTimestamp, event, prevSubscription, nextSubscription);
         Assert.assertTrue(!transition.equals(otherTransition));
 
-        otherTransition = new BusinessSubscriptionTransition(key, accountKey, requestedTimestamp, BusinessSubscriptionEvent.subscriptionPaused(null), prevSubscription, nextSubscription);
+        otherTransition = new BusinessSubscriptionTransition(id, key, accountKey, requestedTimestamp, BusinessSubscriptionEvent.subscriptionPaused(null), prevSubscription, nextSubscription);
         Assert.assertTrue(!transition.equals(otherTransition));
 
-        otherTransition = new BusinessSubscriptionTransition(key, accountKey, requestedTimestamp, event, prevSubscription, prevSubscription);
+        otherTransition = new BusinessSubscriptionTransition(id, key, accountKey, requestedTimestamp, event, prevSubscription, prevSubscription);
         Assert.assertTrue(!transition.equals(otherTransition));
 
-        otherTransition = new BusinessSubscriptionTransition(key, accountKey, requestedTimestamp, event, nextSubscription, nextSubscription);
+        otherTransition = new BusinessSubscriptionTransition(id, key, accountKey, requestedTimestamp, event, nextSubscription, nextSubscription);
         Assert.assertTrue(!transition.equals(otherTransition));
 
-        otherTransition = new BusinessSubscriptionTransition(key, accountKey, requestedTimestamp, event, nextSubscription, prevSubscription);
+        otherTransition = new BusinessSubscriptionTransition(id, key, accountKey, requestedTimestamp, event, nextSubscription, prevSubscription);
         Assert.assertTrue(!transition.equals(otherTransition));
     }
 
@@ -99,7 +103,15 @@ public class TestBusinessSubscriptionTransition
     public void testRejectInvalidTransitions() throws Exception
     {
         try {
-            new BusinessSubscriptionTransition(null, accountKey, requestedTimestamp, event, prevSubscription, nextSubscription);
+            new BusinessSubscriptionTransition(null, key, accountKey, requestedTimestamp, event, prevSubscription, nextSubscription);
+            Assert.fail();
+        }
+        catch (IllegalArgumentException e) {
+            Assert.assertTrue(true);
+        }
+
+        try {
+            new BusinessSubscriptionTransition(id, null, accountKey, requestedTimestamp, event, prevSubscription, nextSubscription);
             Assert.fail();
         }
         catch (IllegalArgumentException e) {
@@ -107,7 +119,7 @@ public class TestBusinessSubscriptionTransition
         }
 
         try {
-            new BusinessSubscriptionTransition(key, accountKey, null, event, prevSubscription, nextSubscription);
+            new BusinessSubscriptionTransition(id, key, accountKey, null, event, prevSubscription, nextSubscription);
             Assert.fail();
         }
         catch (IllegalArgumentException e) {
@@ -115,7 +127,7 @@ public class TestBusinessSubscriptionTransition
         }
 
         try {
-            new BusinessSubscriptionTransition(key, accountKey, requestedTimestamp, null, prevSubscription, nextSubscription);
+            new BusinessSubscriptionTransition(id, key, accountKey, requestedTimestamp, null, prevSubscription, nextSubscription);
             Assert.fail();
         }
         catch (IllegalArgumentException e) {

api/pom.xml 2(+1 -1)

diff --git a/api/pom.xml b/api/pom.xml
index ca5b331..75e229b 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.6-SNAPSHOT</version>
+        <version>0.1.7-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-api</artifactId>
diff --git a/api/src/main/java/com/ning/billing/ErrorCode.java b/api/src/main/java/com/ning/billing/ErrorCode.java
index 5482e0d..fdcaa23 100644
--- a/api/src/main/java/com/ning/billing/ErrorCode.java
+++ b/api/src/main/java/com/ning/billing/ErrorCode.java
@@ -22,7 +22,7 @@ public enum ErrorCode {
      * Range 0 : COMMON EXCEPTIONS
      */
     NOT_IMPLEMENTED(1, "Api not implemented yet"),
-
+    DATA_TRUNCATION(2, "Data truncation error. (%s)"),
     /*
      *
      * Range 1000 : ENTITLEMENTS
diff --git a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
index 363e483..977d6f7 100644
--- a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
+++ b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
@@ -39,4 +39,6 @@ public interface InvoiceUserApi {
     public void notifyOfPaymentAttempt(InvoicePayment invoicePayment);
 
     public Collection<Invoice> getUnpaidInvoicesByAccountId(UUID accountId, DateTime upToDate);
+    
+    public Invoice triggerInvoiceGeneration(UUID accountId, DateTime targetDate, boolean dryrun) throws InvoiceApiException;
 }
diff --git a/api/src/main/java/com/ning/billing/payment/api/CreditCardPaymentMethodInfo.java b/api/src/main/java/com/ning/billing/payment/api/CreditCardPaymentMethodInfo.java
index a9e25dc..572e362 100644
--- a/api/src/main/java/com/ning/billing/payment/api/CreditCardPaymentMethodInfo.java
+++ b/api/src/main/java/com/ning/billing/payment/api/CreditCardPaymentMethodInfo.java
@@ -192,4 +192,10 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
     public String getMaskNumber() {
       return maskNumber;
     }
+
+    @Override
+    public String toString() {
+        return "CreditCardPaymentMethodInfo [cardHolderName=" + cardHolderName + ", cardType=" + cardType + ", expirationDate=" + expirationDate + ", maskNumber=" + maskNumber + ", cardAddress1=" + cardAddress1 + ", cardAddress2=" + cardAddress2 + ", cardCity=" + cardCity + ", cardState=" + cardState + ", cardPostalCode=" + cardPostalCode + ", cardCountry=" + cardCountry + "]";
+    }
+
 }
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java b/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
index 58e9898..fd84927 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
@@ -17,6 +17,7 @@
 package com.ning.billing.payment.api;
 
 import java.util.List;
+import java.util.UUID;
 
 import javax.annotation.Nullable;
 
@@ -38,6 +39,7 @@ public interface PaymentApi {
 
     List<Either<PaymentError, PaymentInfo>> createPayment(String accountKey, List<String> invoiceIds);
     List<Either<PaymentError, PaymentInfo>> createPayment(Account account, List<String> invoiceIds);
+    Either<PaymentError, PaymentInfo> createPayment(UUID paymentAttemptId);
 
     List<Either<PaymentError, PaymentInfo>> createRefund(Account account, List<String> invoiceIds); //TODO
 
@@ -49,4 +51,10 @@ public interface PaymentApi {
 
     PaymentAttempt getPaymentAttemptForPaymentId(String id);
 
+    List<PaymentInfo> getPaymentInfo(List<String> invoiceIds);
+
+    PaymentAttempt getPaymentAttemptForInvoiceId(String invoiceId);
+
+    PaymentInfo getPaymentInfoForPaymentAttemptId(String paymentAttemptId);
+
 }
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentAttempt.java b/api/src/main/java/com/ning/billing/payment/api/PaymentAttempt.java
index b5df043..fcccf9b 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentAttempt.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentAttempt.java
@@ -265,6 +265,7 @@ public class PaymentAttempt {
                                       createdDate,
                                       updatedDate);
         }
+
     }
 
     @Override
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentService.java b/api/src/main/java/com/ning/billing/payment/api/PaymentService.java
index 988a00a..ede2506 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentService.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentService.java
@@ -23,4 +23,5 @@ public interface PaymentService extends KillbillService {
     String getName();
 
     PaymentApi getPaymentApi();
+
 }
diff --git a/api/src/main/java/com/ning/billing/util/tag/TagDefinition.java b/api/src/main/java/com/ning/billing/util/tag/TagDefinition.java
index d5cc5c1..1e17866 100644
--- a/api/src/main/java/com/ning/billing/util/tag/TagDefinition.java
+++ b/api/src/main/java/com/ning/billing/util/tag/TagDefinition.java
@@ -24,7 +24,5 @@ public interface TagDefinition extends Entity {
 
     String getCreatedBy();
 
-    DateTime getCreationDate();
-
     String getDescription();
 }

beatrix/pom.xml 2(+1 -1)

diff --git a/beatrix/pom.xml b/beatrix/pom.xml
index 64a7b38..cf5164f 100644
--- a/beatrix/pom.xml
+++ b/beatrix/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.6-SNAPSHOT</version>
+        <version>0.1.7-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-beatrix</artifactId>
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java
index d04d768..22065ac 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java
@@ -22,8 +22,6 @@ import static org.testng.Assert.assertTrue;
 
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
@@ -31,13 +29,11 @@ import com.ning.billing.account.api.AccountApiException;
 import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 
-import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceItem;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
-import org.joda.time.Days;
 import org.joda.time.Interval;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
@@ -79,6 +75,13 @@ import com.ning.billing.util.bus.BusService;
 
 @Guice(modules = {MockModule.class})
 public class TestBasic {
+    private static final int NUMBER_OF_DECIMALS = 4;
+    private static final int ROUNDING_METHOD = BigDecimal.ROUND_HALF_EVEN;
+
+    private static final BigDecimal ONE = new BigDecimal("1.0000").setScale(NUMBER_OF_DECIMALS);
+    private static final BigDecimal TWENTY_NINE = new BigDecimal("29.0000").setScale(NUMBER_OF_DECIMALS);
+    private static final BigDecimal THIRTY = new BigDecimal("30.0000").setScale(NUMBER_OF_DECIMALS);
+    private static final BigDecimal THIRTY_ONE = new BigDecimal("31.0000").setScale(NUMBER_OF_DECIMALS);
 
     private static final Logger log = LoggerFactory.getLogger(TestBasic.class);
     private static long AT_LEAST_ONE_MONTH_MS =  31L * 24L * 3600L * 1000L;
@@ -114,8 +117,6 @@ public class TestBasic {
 
     private TestBusHandler busHandler;
 
-
-
     private void setupMySQL() throws IOException
     {
 
@@ -208,17 +209,18 @@ public class TestBasic {
 
     private void verifyTestResult(UUID accountId, UUID subscriptionId,
                                   DateTime startDate, DateTime endDate,
-                                  BigDecimal amount, DateTime chargeThroughDate) {
+                                  BigDecimal amount, DateTime chargeThroughDate,
+                                  int totalInvoiceItemCount) {
         SubscriptionData subscription = (SubscriptionData) entitlementUserApi.getSubscriptionFromId(subscriptionId);
 
         List<InvoiceItem> invoiceItems = invoiceUserApi.getInvoiceItemsByAccount(accountId);
+        assertEquals(invoiceItems.size(), totalInvoiceItemCount);
+
         boolean wasFound = false;
 
-        Iterator<InvoiceItem> invoiceItemIterator = invoiceItems.iterator();
-        while (invoiceItemIterator.hasNext()) {
-            InvoiceItem item = invoiceItemIterator.next();
-            if (item.getStartDate().compareTo(removeMillis(startDate)) == 0) {
-                if (item.getEndDate().compareTo(removeMillis(endDate)) == 0) {
+        for (InvoiceItem item : invoiceItems) {
+            if (item.getStartDate().compareTo(startDate) == 0) {
+                if (item.getEndDate().compareTo(endDate) == 0) {
                     if (item.getAmount().compareTo(amount) == 0) {
                         wasFound = true;
                         break;
@@ -233,32 +235,28 @@ public class TestBasic {
         assertNotNull(ctd);
         log.info("Checking CTD: " + ctd.toString() + "; clock is " + clock.getUTCNow().toString());
         assertTrue(clock.getUTCNow().isBefore(ctd));
-        assertTrue(ctd.compareTo(removeMillis(chargeThroughDate)) == 0);
+        assertTrue(ctd.compareTo(chargeThroughDate) == 0);
     }
 
-    private DateTime removeMillis(DateTime input) {
-        return input.toMutableDateTime().millisOfSecond().set(0).toDateTime();
-    }
-
-    @Test(groups = "fast", enabled = false)
+    @Test(groups = "fast", enabled = true)
     public void testBasePlanCompleteWithBillingDayInPast() throws Exception {
         DateTime startDate = new DateTime(2012, 2, 1, 0, 3, 42, 0);
         testBasePlanComplete(startDate, 31, false);
     }
 
-    @Test(groups = "fast", enabled = false)
+    @Test(groups = "fast", enabled = true)
     public void testBasePlanCompleteWithBillingDayPresent() throws Exception {
         DateTime startDate = new DateTime(2012, 2, 1, 0, 3, 42, 0);
         testBasePlanComplete(startDate, 1, false);
     }
 
-    @Test(groups = "fast", enabled = false)
+    @Test(groups = "fast", enabled = true)
     public void testBasePlanCompleteWithBillingDayAlignedWithTrial() throws Exception {
         DateTime startDate = new DateTime(2012, 2, 1, 0, 3, 42, 0);
         testBasePlanComplete(startDate, 2, false);
     }
 
-    @Test(groups = "fast", enabled = false)
+    @Test(groups = "fast", enabled = true)
     public void testBasePlanCompleteWithBillingDayInFuture() throws Exception {
         DateTime startDate = new DateTime(2012, 2, 1, 0, 3, 42, 0);
         testBasePlanComplete(startDate, 3, true);
@@ -268,11 +266,10 @@ public class TestBasic {
         Thread.sleep(600000);
     }
 
-    @Test(groups = "stress", enabled = false)
+    @Test(groups = "stress", enabled = true)
     public void stressTest() throws Exception {
         final int maxIterations = 7;
-        int curIteration = maxIterations;
-        for (curIteration = 0; curIteration < maxIterations; curIteration++) {
+        for (int curIteration = 0; curIteration < maxIterations; curIteration++) {
             log.info("################################  ITERATION " + curIteration + "  #########################");
             Thread.sleep(1000);
             setupTest();
@@ -291,14 +288,15 @@ public class TestBasic {
 
     private void testBasePlanComplete(DateTime initialCreationDate, int billingDay,
                                       boolean proRationExpected) throws Exception {
-        long DELAY = 5000 * 10;
+        long DELAY = 5000;
 
+        log.info("Beginning test with BCD of " + billingDay);
         Account account = accountUserApi.createAccount(getAccountData(billingDay), null, null);
         UUID accountId = account.getId();
         assertNotNull(account);
 
         // set clock to the initial start date
-        clock.setDeltaFromReality(initialCreationDate.getMillis() - DateTime.now().getMillis());
+        clock.setDeltaFromReality(initialCreationDate.getMillis() - clock.getUTCNow().getMillis());
         SubscriptionBundle bundle = entitlementUserApi.createBundleForAccount(account.getId(), "whatever");
 
         String productName = "Shotgun";
@@ -315,15 +313,15 @@ public class TestBasic {
         assertNotNull(subscription);
 
         assertTrue(busHandler.isCompleted(DELAY));
-        log.info("testSimple passed first busHandler checkpoint.");
 
         //
         // VERIFY CTD HAS BEEN SET
         //
         DateTime startDate = subscription.getCurrentPhaseStart();
         DateTime endDate = startDate.plusDays(30);
-        BigDecimal price = subscription.getCurrentPhase().getFixedPrice().getPrice(Currency.USD);
-        verifyTestResult(accountId, subscription.getId(), startDate, endDate, price, endDate);
+        BigDecimal rate = subscription.getCurrentPhase().getFixedPrice().getPrice(Currency.USD);
+        int invoiceItemCount = 1;
+        verifyTestResult(accountId, subscription.getId(), startDate, endDate, rate, endDate, invoiceItemCount);
 
         //
         // CHANGE PLAN IMMEDIATELY AND EXPECT BOTH EVENTS: NextEvent.CHANGE NextEvent.INVOICE
@@ -337,15 +335,14 @@ public class TestBasic {
         subscription.changePlan(newProductName, newTerm, newPlanSetName, clock.getUTCNow());
 
         assertTrue(busHandler.isCompleted(DELAY));
-        log.info("testSimple passed second busHandler checkpoint.");
 
         //
         // VERIFY AGAIN CTD HAS BEEN SET
         //
         startDate = subscription.getCurrentPhaseStart();
-        endDate = startDate.plusMonths(1);
-        price = subscription.getCurrentPhase().getFixedPrice().getPrice(Currency.USD);
-        verifyTestResult(accountId, subscription.getId(), startDate, endDate, price, endDate);
+        endDate = startDate.plusDays(30);
+        invoiceItemCount = 2;
+        verifyTestResult(accountId, subscription.getId(), startDate, endDate, rate, endDate, invoiceItemCount);
 
         //
         // MOVE TIME TO AFTER TRIAL AND EXPECT BOTH EVENTS :  NextEvent.PHASE NextEvent.INVOICE
@@ -359,10 +356,50 @@ public class TestBasic {
             busHandler.pushExpectedEvent(NextEvent.PAYMENT);
         }
 
-        clock.setDeltaFromReality(AT_LEAST_ONE_MONTH_MS);
+        clock.addDeltaFromReality(AT_LEAST_ONE_MONTH_MS);
 
         assertTrue(busHandler.isCompleted(DELAY));
 
+        startDate = subscription.getCurrentPhaseStart();
+        rate = subscription.getCurrentPhase().getRecurringPrice().getPrice(Currency.USD);
+        BigDecimal price;
+        DateTime chargeThroughDate;
+
+        switch (billingDay) {
+            case 1:
+                // this will result in a 30-day pro-ration
+                price = THIRTY.divide(THIRTY_ONE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD).multiply(rate).setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+                chargeThroughDate = startDate.plusMonths(1).toMutableDateTime().dayOfMonth().set(billingDay).toDateTime();
+                invoiceItemCount += 1;
+                verifyTestResult(accountId, subscription.getId(), startDate, chargeThroughDate, price, chargeThroughDate, invoiceItemCount);
+                break;
+            case 2:
+                // this will result in one full-period invoice item
+                price = rate;
+                chargeThroughDate = startDate.plusMonths(1);
+                invoiceItemCount += 1;
+                verifyTestResult(accountId, subscription.getId(), startDate, chargeThroughDate, price, chargeThroughDate, invoiceItemCount);
+                break;
+            case 3:
+                // this will result in a 1-day leading pro-ration and a full-period invoice item
+                price = ONE.divide(TWENTY_NINE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD).multiply(rate).setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+                DateTime firstEndDate = startDate.plusDays(1);
+                chargeThroughDate = firstEndDate.plusMonths(1);
+                invoiceItemCount += 2;
+                verifyTestResult(accountId, subscription.getId(), startDate, firstEndDate, price, chargeThroughDate, invoiceItemCount);
+                verifyTestResult(accountId, subscription.getId(), firstEndDate, chargeThroughDate, rate, chargeThroughDate, invoiceItemCount);
+                break;
+            case 31:
+                // this will result in a 29-day pro-ration
+                chargeThroughDate = startDate.toMutableDateTime().dayOfMonth().set(31).toDateTime();
+                price = TWENTY_NINE.divide(THIRTY_ONE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD).multiply(rate).setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+                invoiceItemCount += 1;
+                verifyTestResult(accountId, subscription.getId(), startDate, chargeThroughDate, price, chargeThroughDate, invoiceItemCount);
+                break;
+            default:
+                throw new UnsupportedOperationException();
+        }
+
         //
         // CHANGE PLAN EOT AND EXPECT NOTHING
         //
@@ -371,7 +408,6 @@ public class TestBasic {
         newProductName = "Pistol";
         subscription = (SubscriptionData) entitlementUserApi.getSubscriptionFromId(subscription.getId());
         subscription.changePlan(newProductName, newTerm, newPlanSetName, clock.getUTCNow());
-        log.info("testSimple has passed third busHandler checkpoint (no events)");
 
         //
         // MOVE TIME AFTER CTD AND EXPECT BOTH EVENTS : NextEvent.CHANGE NextEvent.INVOICE
@@ -385,20 +421,34 @@ public class TestBasic {
         //waitForDebug();
 
         assertTrue(busHandler.isCompleted(DELAY));
-        log.info("testSimple passed fourth busHandler checkpoint.");
+
+        startDate = chargeThroughDate;
+        endDate = chargeThroughDate.plusMonths(1);
+        price = subscription.getCurrentPhase().getRecurringPrice().getPrice(Currency.USD);
+        invoiceItemCount += 1;
+        verifyTestResult(accountId, subscription.getId(), startDate, endDate, price, endDate, invoiceItemCount);
 
         //
         // MOVE TIME AFTER NEXT BILL CYCLE DAY AND EXPECT EVENT : NextEvent.INVOICE
         //
         int maxCycles = 3;
-        startDate = endDate;
-        endDate = startDate.plusMonths(1);
         do {
             busHandler.pushExpectedEvent(NextEvent.INVOICE);
             busHandler.pushExpectedEvent(NextEvent.PAYMENT);
             clock.addDeltaFromReality(AT_LEAST_ONE_MONTH_MS + 1000);
             assertTrue(busHandler.isCompleted(DELAY));
-            verifyTestResult(accountId, subscription.getId(), startDate, endDate, price, endDate);
+
+            startDate = endDate;
+            endDate = startDate.plusMonths(1);
+            if (endDate.dayOfMonth().get() != billingDay) {
+                // adjust for end of month issues
+                int maximumDay = endDate.dayOfMonth().getMaximumValue();
+                int newDay = (maximumDay < billingDay) ? maximumDay : billingDay;
+                endDate = endDate.toMutableDateTime().dayOfMonth().set(newDay).toDateTime();
+            }
+
+            invoiceItemCount += 1;
+            verifyTestResult(accountId, subscription.getId(), startDate, endDate, price, endDate, invoiceItemCount);
         } while (maxCycles-- > 0);
 
         //
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBusHandler.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBusHandler.java
index 57a0d6e..edc38a1 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBusHandler.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBusHandler.java
@@ -167,20 +167,22 @@ public class TestBusHandler {
 
     private void assertEqualsNicely(NextEvent received) {
 
-        boolean foundIt = false;
-        Iterator<NextEvent> it = nextExpectedEvent.iterator();
-        while (it.hasNext()) {
-            NextEvent ev = it.next();
-            if (ev == received) {
-                it.remove();
-                foundIt = true;
-                break;
+        synchronized(this) {
+            boolean foundIt = false;
+            Iterator<NextEvent> it = nextExpectedEvent.iterator();
+            while (it.hasNext()) {
+                NextEvent ev = it.next();
+                if (ev == received) {
+                    it.remove();
+                    foundIt = true;
+                    break;
+                }
+            }
+            if (!foundIt) {
+                Joiner joiner = Joiner.on(" ");
+                log.error("TestBusHandler Received event " + received + "; expected " + joiner.join(nextExpectedEvent));
+                Assert.fail();
             }
-        }
-        if (!foundIt) {
-            Joiner joiner = Joiner.on(" ");
-            log.error("TestBusHandler Received event " + received + "; expected " + joiner.join(nextExpectedEvent));
-            Assert.fail();
         }
     }
 }

catalog/pom.xml 2(+1 -1)

diff --git a/catalog/pom.xml b/catalog/pom.xml
index 5d19c5b..9c7a5a0 100644
--- a/catalog/pom.xml
+++ b/catalog/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.6-SNAPSHOT</version>
+        <version>0.1.7-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-catalog</artifactId>
diff --git a/catalog/src/main/java/com/ning/billing/catalog/DefaultPlanPhase.java b/catalog/src/main/java/com/ning/billing/catalog/DefaultPlanPhase.java
index 6d00162..5523e12 100644
--- a/catalog/src/main/java/com/ning/billing/catalog/DefaultPlanPhase.java
+++ b/catalog/src/main/java/com/ning/billing/catalog/DefaultPlanPhase.java
@@ -77,7 +77,7 @@ public class DefaultPlanPhase extends ValidatingConfig<StandaloneCatalog> implem
 	 * @see com.ning.billing.catalog.IPlanPhase#getRecurringPrice()
 	 */
     @Override
-	public InternationalPrice getRecurringPrice() {
+	public DefaultInternationalPrice getRecurringPrice() {
         return recurringPrice;
     }
 
diff --git a/catalog/src/test/java/com/ning/billing/catalog/MockCatalog.java b/catalog/src/test/java/com/ning/billing/catalog/MockCatalog.java
index 5fc51b8..092a863 100644
--- a/catalog/src/test/java/com/ning/billing/catalog/MockCatalog.java
+++ b/catalog/src/test/java/com/ning/billing/catalog/MockCatalog.java
@@ -32,9 +32,9 @@ public class MockCatalog extends StandaloneCatalog {
 	
 	public MockCatalog() {
 		setEffectiveDate(new Date());
-		populateProducts();
+		setProducts(MockProduct.createAll());
+		setPlans(MockPlan.createAll());
 		populateRules();
-		populatePlans();
 		populatePriceLists();
 	}
 	
@@ -51,25 +51,6 @@ public class MockCatalog extends StandaloneCatalog {
 		
 	}
 
-	public void populateProducts() {
-		String[] names = getProductNames();
-		DefaultProduct[] products = new DefaultProduct[names.length];
-		for(int i = 0; i < names.length; i++) {
-			products[i] = new DefaultProduct(names[i], ProductCategory.BASE);
-		}
-		setProducts(products);
-	}
-	
-	public void populatePlans() {
-		DefaultProduct[] products = getCurrentProducts();
-		DefaultPlan[] plans = new DefaultPlan[products.length];
-		for(int i = 0; i < products.length; i++) {
-			DefaultPlanPhase phase = new DefaultPlanPhase().setPhaseType(PhaseType.EVERGREEN).setBillingPeriod(BillingPeriod.MONTHLY).setRecurringPrice(new DefaultInternationalPrice());
-			plans[i] = new MockPlan().setName(products[i].getName().toLowerCase() + "-plan").setProduct(products[i]).setFinalPhase(phase);
-		}
-		setPlans(plans);
-	}
-
 	public void populatePriceLists() {
 		DefaultPlan[] plans = getCurrentPlans();
 		
diff --git a/catalog/src/test/java/com/ning/billing/catalog/MockInternationalPrice.java b/catalog/src/test/java/com/ning/billing/catalog/MockInternationalPrice.java
index cc3a679..3df36aa 100644
--- a/catalog/src/test/java/com/ning/billing/catalog/MockInternationalPrice.java
+++ b/catalog/src/test/java/com/ning/billing/catalog/MockInternationalPrice.java
@@ -16,16 +16,22 @@
 
 package com.ning.billing.catalog;
 
-import com.ning.billing.catalog.api.Currency;
-
 import java.math.BigDecimal;
 
+import com.ning.billing.catalog.api.Currency;
+
 public class MockInternationalPrice extends DefaultInternationalPrice {
+	
+	public static MockInternationalPrice create0USD() {
+		return new MockInternationalPrice(new DefaultPrice().setCurrency(Currency.USD).setValue(BigDecimal.ZERO));
+	}
+	
+	public static MockInternationalPrice create1USD() {
+		return new MockInternationalPrice(new DefaultPrice().setCurrency(Currency.USD).setValue(BigDecimal.ONE));
+	}
 
-	public MockInternationalPrice() {
-		setPrices(new DefaultPrice[] {
-			new DefaultPrice().setCurrency(Currency.USD).setValue(new BigDecimal(1))
-		});
+	public static MockInternationalPrice createUSD(String value) {
+		return new MockInternationalPrice(new DefaultPrice().setCurrency(Currency.USD).setValue(new BigDecimal(value)));
 	}
 
 	public MockInternationalPrice(DefaultPrice... price) {
diff --git a/catalog/src/test/java/com/ning/billing/catalog/MockPlan.java b/catalog/src/test/java/com/ning/billing/catalog/MockPlan.java
index 53c73fe..58483c4 100644
--- a/catalog/src/test/java/com/ning/billing/catalog/MockPlan.java
+++ b/catalog/src/test/java/com/ning/billing/catalog/MockPlan.java
@@ -16,7 +16,65 @@
 
 package com.ning.billing.catalog;
 
+
 public class MockPlan extends DefaultPlan {
+	
+	public static MockPlan createBicycleTrialEvergreen1USD(int trialDurationInDays) {
+		return new MockPlan("BicycleTrialEvergreen1USD",
+				MockProduct.createBicycle(),
+				new DefaultPlanPhase[]{ MockPlanPhase.createTrial(trialDurationInDays) },
+				MockPlanPhase.create1USDMonthlyEvergreen(),
+				-1);
+	}
+
+	
+	public static MockPlan createBicycleTrialEvergreen1USD() {
+		return new MockPlan("BicycleTrialEvergreen1USD",
+				MockProduct.createBicycle(),
+				new DefaultPlanPhase[]{ MockPlanPhase.create30DayTrial() },
+				MockPlanPhase.create1USDMonthlyEvergreen(),
+				-1);
+	}
+
+	public static MockPlan createSportsCarTrialEvergreen100USD() {
+		return new MockPlan("SportsCarTrialEvergreen100USD",
+				MockProduct.createSportsCar(),
+				new DefaultPlanPhase[]{ MockPlanPhase.create30DayTrial() },
+				MockPlanPhase.createUSDMonthlyEvergreen("100.00",null),
+				-1);
+	}
+	
+	public static MockPlan createPickupTrialEvergreen10USD() {
+		return new MockPlan("PickupTrialEvergreen10USD",
+				MockProduct.createPickup(),
+				new DefaultPlanPhase[]{ MockPlanPhase.create30DayTrial() },
+				MockPlanPhase.createUSDMonthlyEvergreen("10.00",null),
+				-1);
+	}
+
+	public static MockPlan createJetTrialEvergreen1000USD() {
+		return new MockPlan("JetTrialEvergreen1000USD",
+				MockProduct.createJet(),
+				new DefaultPlanPhase[]{ MockPlanPhase.create30DayTrial() },
+				MockPlanPhase.create1USDMonthlyEvergreen(),
+				-1);
+	}
+
+	public static MockPlan createJetTrialFixedTermEvergreen1000USD() {
+		return new MockPlan("JetTrialEvergreen1000USD",
+				MockProduct.createJet(),
+				new DefaultPlanPhase[]{ MockPlanPhase.create30DayTrial(), MockPlanPhase.createUSDMonthlyFixedTerm("500.00", null, 6) },
+				MockPlanPhase.create1USDMonthlyEvergreen(),
+				-1);
+	}
+
+	public MockPlan() {
+		this("BicycleTrialEvergreen1USD",
+				MockProduct.createBicycle(),
+				new DefaultPlanPhase[]{ MockPlanPhase.create30DayTrial()},
+				MockPlanPhase.create1USDMonthlyEvergreen(),
+				-1);
+	}
 
 	public MockPlan(String name, DefaultProduct product, DefaultPlanPhase[] planPhases, DefaultPlanPhase finalPhase, int plansAllowedInBundle) {
 		setName(name);
@@ -24,31 +82,51 @@ public class MockPlan extends DefaultPlan {
 		setFinalPhase(finalPhase);
 		setInitialPhases(planPhases);
 		setPlansAllowedInBundle(plansAllowedInBundle);
-	}
-	
-	public MockPlan() {
-		setName("test-plan");
-		setProduct(new MockProduct());
-		setFinalPhase(new MockPlanPhase(this));
-		setInitialPhases(null);
-		setPlansAllowedInBundle(1);
+		
+		finalPhase.setPlan(this);
+		for (DefaultPlanPhase pp : planPhases) {
+			pp.setPlan(this);
+		}
 	}
 
-    public MockPlan(String planName) {
-		setName(planName);
-		setProduct(new MockProduct());
-		setFinalPhase(new MockPlanPhase(this));
-		setInitialPhases(null);
-		setPlansAllowedInBundle(1);
-	}
 
+
+	public static MockPlan createBicycleNoTrialEvergreen1USD() {
+		return new MockPlan("BicycleNoTrialEvergreen1USD",
+				MockProduct.createBicycle(),
+				new DefaultPlanPhase[]{ },
+				MockPlanPhase.createUSDMonthlyEvergreen("1.0", null)	,
+				-1);
+	}
+	
 	public MockPlan(MockPlanPhase mockPlanPhase) {
-		setName("test-plan");
-		setProduct(new MockProduct());
+		setName("Test");
+		setProduct(MockProduct.createBicycle());
 		setFinalPhase(mockPlanPhase);
-		setInitialPhases(null);
-		setPlansAllowedInBundle(1);
+		
+		mockPlanPhase.setPlan(this);
 	}
 
+	  public MockPlan(String planName) {
+			setName(planName);
+			setProduct(new MockProduct());
+			setFinalPhase(new MockPlanPhase(this));
+			setInitialPhases(null);
+			setPlansAllowedInBundle(1);
+	  }
+
+
+	public static DefaultPlan[] createAll() {
+		return new MockPlan[]{
+				createBicycleTrialEvergreen1USD(),
+				createBicycleNoTrialEvergreen1USD(),
+				createPickupTrialEvergreen10USD(),
+				createSportsCarTrialEvergreen100USD(),
+				createJetTrialEvergreen1000USD(),
+				createJetTrialFixedTermEvergreen1000USD()
+		};
+	}
+	
+	
 
 }
diff --git a/catalog/src/test/java/com/ning/billing/catalog/MockPlanPhase.java b/catalog/src/test/java/com/ning/billing/catalog/MockPlanPhase.java
index d4ae5ff..b995bef 100644
--- a/catalog/src/test/java/com/ning/billing/catalog/MockPlanPhase.java
+++ b/catalog/src/test/java/com/ning/billing/catalog/MockPlanPhase.java
@@ -16,14 +16,51 @@
 
 package com.ning.billing.catalog;
 
+import javax.annotation.Nullable;
+
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.PhaseType;
 import com.ning.billing.catalog.api.Plan;
 import com.ning.billing.catalog.api.TimeUnit;
 
-import javax.annotation.Nullable;
-
 public class MockPlanPhase extends DefaultPlanPhase {
+	
+	public static MockPlanPhase create1USDMonthlyEvergreen() {
+		return (MockPlanPhase) new MockPlanPhase(BillingPeriod.MONTHLY,
+				PhaseType.EVERGREEN,
+				new DefaultDuration().setUnit(TimeUnit.UNLIMITED),
+				MockInternationalPrice.create1USD(),
+				null).setPlan(MockPlan.createBicycleNoTrialEvergreen1USD());
+	}
+	
+	public static MockPlanPhase createUSDMonthlyEvergreen(String reccuringUSDPrice, String fixedPrice) {
+		return new MockPlanPhase(BillingPeriod.MONTHLY,
+				PhaseType.EVERGREEN,
+				new DefaultDuration().setUnit(TimeUnit.UNLIMITED),
+				(reccuringUSDPrice == null) ? null : MockInternationalPrice.createUSD(reccuringUSDPrice),
+				(fixedPrice == null) ? null :MockInternationalPrice.createUSD(fixedPrice));
+	}
+
+	public static MockPlanPhase createUSDMonthlyFixedTerm(String reccuringUSDPrice, String fixedPrice, int durationInMonths) {
+		return new MockPlanPhase(BillingPeriod.MONTHLY,
+				PhaseType.FIXEDTERM,
+				new DefaultDuration().setUnit(TimeUnit.MONTHS).setNumber(durationInMonths),
+				(reccuringUSDPrice == null) ? null : MockInternationalPrice.createUSD(reccuringUSDPrice),
+				(fixedPrice == null) ? null :MockInternationalPrice.createUSD(fixedPrice));
+	}
+
+	public static MockPlanPhase create30DayTrial() {
+		return createTrial(30);
+	}
+
+	public static MockPlanPhase createTrial(int days) {
+		return new MockPlanPhase(BillingPeriod.NO_BILLING_PERIOD,
+				PhaseType.TRIAL,
+				new DefaultDuration().setUnit(TimeUnit.DAYS).setNumber(days),
+				null,
+				MockInternationalPrice.create1USD()
+				);
+	}
 
     public MockPlanPhase(
     		BillingPeriod billingPeriod, 
@@ -38,6 +75,7 @@ public class MockPlanPhase extends DefaultPlanPhase {
 		setFixedPrice(fixedPrice);
 	}
     
+  
     public MockPlanPhase() {
         this(new MockInternationalPrice(), null);
 	}
diff --git a/catalog/src/test/java/com/ning/billing/catalog/MockProduct.java b/catalog/src/test/java/com/ning/billing/catalog/MockProduct.java
index 7587dba..52ffd91 100644
--- a/catalog/src/test/java/com/ning/billing/catalog/MockProduct.java
+++ b/catalog/src/test/java/com/ning/billing/catalog/MockProduct.java
@@ -23,6 +23,54 @@ public class MockProduct extends DefaultProduct {
 	public MockProduct() {
 		setName("TestProduct");
 		setCatagory(ProductCategory.BASE);
-		setCatalogName("Ning");
+		setCatalogName("Vehcles");
 	}
+	
+	public MockProduct(String name, ProductCategory category, String catalogName) {
+		setName(name);
+		setCatagory(category);
+		setCatalogName(catalogName);
+	}
+	
+	public static MockProduct createBicycle() {
+		return new MockProduct("Bicycle", ProductCategory.BASE, "Vehcles");
+	}
+	
+	public static MockProduct createPickup() {
+		return new MockProduct("Pickup", ProductCategory.BASE, "Vehcles");
+	}
+	
+	public static MockProduct createSportsCar() {
+		return new MockProduct("SportsCar", ProductCategory.BASE, "Vehcles");
+	}
+	
+	public static MockProduct createJet() {
+		return new MockProduct("Jet", ProductCategory.BASE, "Vehcles");
+	}
+	
+	public static MockProduct createHorn() {
+		return new MockProduct("Horn", ProductCategory.ADD_ON, "Vehcles");
+	}
+	
+	public static MockProduct createSpotlight() {
+		return new MockProduct("spotlight", ProductCategory.ADD_ON, "Vehcles");
+	}
+	
+	public static MockProduct createRedPaintJob() {
+		return new MockProduct("RedPaintJob", ProductCategory.ADD_ON, "Vehcles");
+	}
+
+	public static DefaultProduct[] createAll() {
+		return new MockProduct[]{
+				createBicycle(),
+				createPickup(),
+				createSportsCar(),
+				createJet(),
+				createHorn(),
+				createRedPaintJob()
+		};
+	}
+	
+	
+	
 }
diff --git a/catalog/src/test/java/com/ning/billing/catalog/TestInternationalPrice.java b/catalog/src/test/java/com/ning/billing/catalog/TestInternationalPrice.java
index 3e6a755..c14a157 100644
--- a/catalog/src/test/java/com/ning/billing/catalog/TestInternationalPrice.java
+++ b/catalog/src/test/java/com/ning/billing/catalog/TestInternationalPrice.java
@@ -66,6 +66,7 @@ public class TestInternationalPrice {
   public void testPriceInitialization() throws URISyntaxException, CatalogApiException  {
 	  StandaloneCatalog c = new MockCatalog();
 	  c.setSupportedCurrencies(new Currency[]{Currency.GBP, Currency.EUR, Currency.USD, Currency.BRL, Currency.MXN});
+	  c.getCurrentPlans()[0].getFinalPhase().getRecurringPrice().setPrices(null);
 	  c.initialize(c, new URI("foo://bar"));
 	  Assert.assertEquals(c.getCurrentPlans()[0].getFinalPhase().getRecurringPrice().getPrice(Currency.GBP), new BigDecimal(0));
   }
diff --git a/catalog/src/test/java/com/ning/billing/catalog/TestPlan.java b/catalog/src/test/java/com/ning/billing/catalog/TestPlan.java
index e58f71c..a8714c1 100644
--- a/catalog/src/test/java/com/ning/billing/catalog/TestPlan.java
+++ b/catalog/src/test/java/com/ning/billing/catalog/TestPlan.java
@@ -37,7 +37,7 @@ public class TestPlan {
 
 		StandaloneCatalog c = new MockCatalog();
 		c.setSupportedCurrencies(new Currency[]{Currency.GBP, Currency.EUR, Currency.USD, Currency.BRL, Currency.MXN});
-		DefaultPlan p1 =  new MockPlan();
+		DefaultPlan p1 =  MockPlan.createBicycleTrialEvergreen1USD();
 		p1.setEffectiveDateForExistingSubscriptons(new Date((new Date().getTime()) - (1000 * 60 * 60 * 24)));
 		ValidationErrors errors = p1.validate(c, new ValidationErrors());
 		Assert.assertEquals(errors.size(), 1);
@@ -45,79 +45,18 @@ public class TestPlan {
 
 	}
 	
-	private static class MyDuration extends DefaultDuration {
-		final int days;
-		
-		public MyDuration(int days) {
-			this.days = days;
-		}
-		
-		@Override
-		public DateTime addToDateTime(DateTime dateTime) {
-			return dateTime.plusDays(days);
-		}
-	}
-	
-	private static class MyPlanPhase extends MockPlanPhase {
-		Duration duration;
-		boolean recurringPriceIsZero;
-		
-		MyPlanPhase(int duration, boolean recurringPriceIsZero) {
-			this.duration= new MyDuration( duration );
-			this.recurringPriceIsZero = recurringPriceIsZero;
-		}
-		@Override
-		public Duration getDuration(){
-			return duration;
-		}
-		
-		@Override
-		public InternationalPrice getRecurringPrice() {
-			return new MockInternationalPrice() {
-				@Override
-				public boolean isZero() {
-					return recurringPriceIsZero;
-				}
-			};
-		}
-	}
-	
 	@Test(groups={"fast"}, enabled = true)
 	public void testDataCalc() {
-		DefaultPlan p0 =  new MockPlan() {
-			public PlanPhase[] getAllPhases() {
-				return new PlanPhase[]{
-						new MyPlanPhase(10, true),
-						new MyPlanPhase(10, false),
-				};
-			}
-		};
+		DefaultPlan p0 = MockPlan.createBicycleTrialEvergreen1USD();  
+					
+		DefaultPlan p1 =  MockPlan.createBicycleTrialEvergreen1USD(100);
 		
-		DefaultPlan p1 =  new MockPlan() {
-			public PlanPhase[] getAllPhases() {
-				return new PlanPhase[]{
-						new MyPlanPhase(10, true),
-						new MyPlanPhase(10, true),
-						new MyPlanPhase(10, true),
-						new MyPlanPhase(10, true),
-						new MyPlanPhase(10, false),
-						new MyPlanPhase(10, true),
-				};
-			}
-		};
+		DefaultPlan p2 =  MockPlan.createBicycleNoTrialEvergreen1USD(); 
 		
-		DefaultPlan p2 =  new MockPlan() {
-			public PlanPhase[] getAllPhases() {
-				return new PlanPhase[]{
-						new MyPlanPhase(10, false),
-						new MyPlanPhase(10, true),
-				};
-			}
-		};
 		DateTime requestedDate = new DateTime();
-		Assert.assertEquals(p0.dateOfFirstRecurringNonZeroCharge(requestedDate), requestedDate.plusDays(10));
-		Assert.assertEquals(p1.dateOfFirstRecurringNonZeroCharge(requestedDate), requestedDate.plusDays(40));
-		Assert.assertEquals(p2.dateOfFirstRecurringNonZeroCharge(requestedDate), requestedDate.plusDays(0));
+		Assert.assertEquals(p0.dateOfFirstRecurringNonZeroCharge(requestedDate).compareTo(requestedDate.plusDays(30)), 0);
+		Assert.assertEquals(p1.dateOfFirstRecurringNonZeroCharge(requestedDate).compareTo(requestedDate.plusDays(100)), 0);
+		Assert.assertEquals(p2.dateOfFirstRecurringNonZeroCharge(requestedDate).compareTo(requestedDate.plusDays(0)), 0);
 
 	}
 }
diff --git a/catalog/src/test/java/com/ning/billing/catalog/TestPlanPhase.java b/catalog/src/test/java/com/ning/billing/catalog/TestPlanPhase.java
index 9b07bfe..a3b825d 100644
--- a/catalog/src/test/java/com/ning/billing/catalog/TestPlanPhase.java
+++ b/catalog/src/test/java/com/ning/billing/catalog/TestPlanPhase.java
@@ -32,17 +32,18 @@ public class TestPlanPhase {
 	public void testValidation() {
 		log.info("Testing Plan Phase Validation");
 		
-		DefaultPlanPhase pp = new MockPlanPhase().setBillCycleDuration(BillingPeriod.MONTHLY).setRecurringPrice(null).setFixedPrice(new DefaultInternationalPrice());
+		DefaultPlanPhase pp = MockPlanPhase.createUSDMonthlyEvergreen(null, "1.00").setPlan(MockPlan.createBicycleNoTrialEvergreen1USD());//new MockPlanPhase().setBillCycleDuration(BillingPeriod.MONTHLY).setRecurringPrice(null).setFixedPrice(new DefaultInternationalPrice());
+		
 		ValidationErrors errors = pp.validate(new MockCatalog(), new ValidationErrors());
 		errors.log(log);
 		Assert.assertEquals(errors.size(), 1);
 
-		pp = new MockPlanPhase().setBillCycleDuration(BillingPeriod.NO_BILLING_PERIOD).setRecurringPrice(new MockInternationalPrice());
+		pp = MockPlanPhase.createUSDMonthlyEvergreen("1.00", null).setBillCycleDuration(BillingPeriod.NO_BILLING_PERIOD).setPlan(MockPlan.createBicycleNoTrialEvergreen1USD());// new MockPlanPhase().setBillCycleDuration(BillingPeriod.NO_BILLING_PERIOD).setRecurringPrice(new MockInternationalPrice());
 		errors = pp.validate(new MockCatalog(), new ValidationErrors());
 		errors.log(log);
 		Assert.assertEquals(errors.size(), 1);
 
-		pp = new MockPlanPhase().setRecurringPrice(null).setFixedPrice(null).setBillCycleDuration(BillingPeriod.NO_BILLING_PERIOD);
+		pp = MockPlanPhase.createUSDMonthlyEvergreen(null, null).setBillCycleDuration(BillingPeriod.NO_BILLING_PERIOD).setPlan(MockPlan.createBicycleNoTrialEvergreen1USD());//new MockPlanPhase().setRecurringPrice(null).setFixedPrice(null).setBillCycleDuration(BillingPeriod.NO_BILLING_PERIOD);
 		errors = pp.validate(new MockCatalog(), new ValidationErrors());
 		errors.log(log);
 		Assert.assertEquals(errors.size(), 1);
@@ -53,11 +54,11 @@ public class TestPlanPhase {
 		String planName = "Foo";
 		String planNameExt = planName + "-";
 		
-		DefaultPlan p = new MockPlan().setName(planName);
-		DefaultPlanPhase ppDiscount = new MockPlanPhase().setPhaseType(PhaseType.DISCOUNT).setPlan(p);
-		DefaultPlanPhase ppTrial = new MockPlanPhase().setPhaseType(PhaseType.TRIAL).setPlan(p);
-		DefaultPlanPhase ppEvergreen = new MockPlanPhase().setPhaseType(PhaseType.EVERGREEN).setPlan(p);
-		DefaultPlanPhase ppFixedterm = new MockPlanPhase().setPhaseType(PhaseType.FIXEDTERM).setPlan(p);
+		DefaultPlan p = MockPlan.createBicycleNoTrialEvergreen1USD().setName(planName);
+		DefaultPlanPhase ppDiscount = MockPlanPhase.create1USDMonthlyEvergreen().setPhaseType(PhaseType.DISCOUNT).setPlan(p);
+		DefaultPlanPhase ppTrial = MockPlanPhase.create30DayTrial().setPhaseType(PhaseType.TRIAL).setPlan(p);
+		DefaultPlanPhase ppEvergreen = MockPlanPhase.create1USDMonthlyEvergreen().setPhaseType(PhaseType.EVERGREEN).setPlan(p);
+		DefaultPlanPhase ppFixedterm = MockPlanPhase.create1USDMonthlyEvergreen().setPhaseType(PhaseType.FIXEDTERM).setPlan(p);
 		
 		String ppnDiscount = DefaultPlanPhase.phaseName(p, ppDiscount);
 		String ppnTrial = DefaultPlanPhase.phaseName(p, ppTrial);
diff --git a/catalog/src/test/java/com/ning/billing/mock/catalog/MockPlan.java b/catalog/src/test/java/com/ning/billing/mock/catalog/MockPlan.java
new file mode 100644
index 0000000..0657fd4
--- /dev/null
+++ b/catalog/src/test/java/com/ning/billing/mock/catalog/MockPlan.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.mock.catalog;
+
+import java.util.Date;
+import java.util.Iterator;
+
+import org.joda.time.DateTime;
+
+import com.ning.billing.catalog.api.BillingPeriod;
+import com.ning.billing.catalog.api.CatalogApiException;
+import com.ning.billing.catalog.api.Plan;
+import com.ning.billing.catalog.api.PlanPhase;
+import com.ning.billing.catalog.api.Product;
+
+public class MockPlan implements Plan {
+
+	@Override
+	public PlanPhase[] getInitialPhases() {
+		return null;
+	}
+
+	@Override
+	public Product getProduct() {
+		return null;
+	}
+
+	@Override
+	public String getName() {
+		return null;
+	}
+
+	@Override
+	public boolean isRetired() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	@Override
+	public Iterator<PlanPhase> getInitialPhaseIterator() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public PlanPhase getFinalPhase() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public BillingPeriod getBillingPeriod() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public int getPlansAllowedInBundle() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	@Override
+	public PlanPhase[] getAllPhases() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public Date getEffectiveDateForExistingSubscriptons() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public PlanPhase findPhase(String name) throws CatalogApiException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public DateTime dateOfFirstRecurringNonZeroCharge(
+			DateTime subscriptionStartDate) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+}
diff --git a/entitlement/pom.xml b/entitlement/pom.xml
index f8a781f..cf8d926 100644
--- a/entitlement/pom.xml
+++ b/entitlement/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.6-SNAPSHOT</version>
+        <version>0.1.7-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-entitlement</artifactId>
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/alignment/PlanAligner.java b/entitlement/src/main/java/com/ning/billing/entitlement/alignment/PlanAligner.java
index 34de22b..3161af3 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/alignment/PlanAligner.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/alignment/PlanAligner.java
@@ -236,7 +236,8 @@ public class PlanAligner  {
             planStartDate = bundleStartDate;
             break;
         case CHANGE_OF_PLAN:
-            throw new EntitlementError(String.format("Not implemented yet %s", alignment));
+            planStartDate = requestedDate;
+            break;
         case CHANGE_OF_PRICELIST:
             throw new EntitlementError(String.format("Not implemented yet %s", alignment));
         default:
@@ -267,6 +268,7 @@ public class PlanAligner  {
 
             result.add(new TimedPhase(cur, curPhaseStart));
 
+            // STEPH check for duration null instead TimeUnit UNLIMITED
             if (cur.getPhaseType() != PhaseType.EVERGREEN) {
                 Duration curPhaseDuration = cur.getDuration();
                 nextPhaseStart = DefaultClock.addDuration(curPhaseStart, curPhaseDuration);
@@ -283,6 +285,7 @@ public class PlanAligner  {
         return result;
     }
 
+    // STEPH check for non evergreen Plans and what happens
     private TimedPhase getTimedPhase(List<TimedPhase> timedPhases, DateTime effectiveDate, WhichPhase which) {
         TimedPhase cur = null;
         TimedPhase next = null;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/billing/DefaultEntitlementBillingApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/billing/DefaultEntitlementBillingApi.java
index ee9ec81..788cda1 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/billing/DefaultEntitlementBillingApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/billing/DefaultEntitlementBillingApi.java
@@ -16,10 +16,24 @@ w * Copyright 2010-2011 Ning, Inc.
 
 package com.ning.billing.entitlement.api.billing;
 
+import java.util.Date;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.inject.Inject;
 import com.ning.billing.ErrorCode;
 import com.ning.billing.account.api.Account;
+import com.ning.billing.account.api.AccountApiException;
 import com.ning.billing.account.api.AccountUserApi;
+import com.ning.billing.account.api.DefaultAccount;
 import com.ning.billing.catalog.api.BillingAlignment;
 import com.ning.billing.catalog.api.Catalog;
 import com.ning.billing.catalog.api.CatalogApiException;
@@ -32,34 +46,23 @@ import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.api.user.SubscriptionFactory.SubscriptionBuilder;
-import com.ning.billing.entitlement.api.user.SubscriptionTransition.SubscriptionTransitionType;
 import com.ning.billing.entitlement.api.user.SubscriptionTransition;
+import com.ning.billing.entitlement.api.user.SubscriptionTransition.SubscriptionTransitionType;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.engine.dao.SubscriptionSqlDao;
-import org.joda.time.DateTime;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
 
 
 public class DefaultEntitlementBillingApi implements EntitlementBillingApi {
 	private static final Logger log = LoggerFactory.getLogger(DefaultEntitlementBillingApi.class);
 
-    private final EntitlementDao dao;
+    private final EntitlementDao entitlementDao;
     private final AccountUserApi accountApi;
     private final CatalogService catalogService;
 
     @Inject
     public DefaultEntitlementBillingApi(final EntitlementDao dao, final AccountUserApi accountApi, final CatalogService catalogService) {
         super();
-        this.dao = dao;
+        this.entitlementDao = dao;
         this.accountApi = accountApi;
         this.catalogService = catalogService;
     }
@@ -68,23 +71,22 @@ public class DefaultEntitlementBillingApi implements EntitlementBillingApi {
     public SortedSet<BillingEvent> getBillingEventsForAccount(
             final UUID accountId) {
 
-        List<SubscriptionBundle> bundles = dao.getSubscriptionBundleForAccount(accountId);
-        List<Subscription> subscriptions = new ArrayList<Subscription>();
-        for (final SubscriptionBundle bundle: bundles) {
-            subscriptions.addAll(dao.getSubscriptions(bundle.getId()));
-        }
-
+        List<SubscriptionBundle> bundles = entitlementDao.getSubscriptionBundleForAccount(accountId);
         SortedSet<BillingEvent> result = new TreeSet<BillingEvent>();
-        for (final Subscription subscription: subscriptions) {
-        	for (final SubscriptionTransition transition : subscription.getAllTransitions()) {
-        		try {
-                    BillingEvent event = new DefaultBillingEvent(transition, subscription, calculateBCD(transition, accountId));
-        			result.add(event);
-        		} catch (CatalogApiException e) {
-        			log.error("Failing to identify catalog components while creating BillingEvent from transition: " +
-        					transition.getId().toString(), e);
-                } catch (Exception e) {
-                    log.warn("Failed while getting BillingEvent", e);
+        for (final SubscriptionBundle bundle: bundles) {
+        	List<Subscription> subscriptions = entitlementDao.getSubscriptions(bundle.getId());
+
+        	for (final Subscription subscription: subscriptions) {
+        		for (final SubscriptionTransition transition : subscription.getAllTransitions()) {
+        			try {
+        				BillingEvent event = new DefaultBillingEvent(transition, subscription, calculateBcd(bundle, subscription, transition, accountId));
+        				result.add(event);
+        			} catch (CatalogApiException e) {
+        				log.error("Failing to identify catalog components while creating BillingEvent from transition: " +
+        						transition.getId().toString(), e);
+        			} catch (Exception e) {
+        				log.warn("Failed while getting BillingEvent", e);
+        			}
         		}
         	}
         }
@@ -93,10 +95,10 @@ public class DefaultEntitlementBillingApi implements EntitlementBillingApi {
 
     @Override
     public UUID getAccountIdFromSubscriptionId(final UUID subscriptionId) {
-        return dao.getAccountIdFromSubscriptionId(subscriptionId);
+        return entitlementDao.getAccountIdFromSubscriptionId(subscriptionId);
     }
 
-    private int calculateBCD(final SubscriptionTransition transition, final UUID accountId) throws CatalogApiException {
+    private int calculateBcd(SubscriptionBundle bundle, Subscription subscription, final SubscriptionTransition transition, final UUID accountId) throws CatalogApiException, AccountApiException {
     	Catalog catalog = catalogService.getFullCatalog();
     	Plan plan =  (transition.getTransitionType() != SubscriptionTransitionType.CANCEL) ?
     	        transition.getNextPlan() : transition.getPreviousPlan();
@@ -111,41 +113,89 @@ public class DefaultEntitlementBillingApi implements EntitlementBillingApi {
     					transition.getNextPriceList(),
     					phase.getPhaseType()),
     					transition.getRequestedTransitionTime());
-    	int result = 0;
+    	int result = -1;
 
-        Account account = accountApi.getAccountById(accountId);
-
-    	switch (alignment) {
+		Account account = accountApi.getAccountById(accountId);
+		switch (alignment) {
     		case ACCOUNT :
     			result = account.getBillCycleDay();
+    			
+    			if(result == 0) {
+    				result = calculateBcdFromSubscription(subscription, plan, account);
+    			}
     		break;
     		case BUNDLE :
-    			SubscriptionBundle bundle = dao.getSubscriptionBundleFromId(transition.getBundleId());
-    			//TODO result = bundle.getStartDate().toDateTime(account.getTimeZone()).getDayOfMonth();
-    			result = bundle.getStartDate().getDayOfMonth();
+    			result = bundle.getStartDate().toDateTime(account.getTimeZone()).getDayOfMonth();
     		break;
     		case SUBSCRIPTION :
-    			Subscription subscription = dao.getSubscriptionFromId(transition.getSubscriptionId());
-    			//TODO result = subscription.getStartDate().toDateTime(account.getTimeZone()).getDayOfMonth();
-    			result = subscription.getStartDate().getDayOfMonth();
+    			result = subscription.getStartDate().toDateTime(account.getTimeZone()).getDayOfMonth();
     		break;
     	}
-    	if(result == 0) {
+    	if(result == -1) {
     		throw new CatalogApiException(ErrorCode.CAT_INVALID_BILLING_ALIGNMENT, alignment.toString());
     	}
     	return result;
 
     }
+    
+   	private int calculateBcdFromSubscription(Subscription subscription, Plan plan, Account account) throws AccountApiException {
+		int result = account.getBillCycleDay();
+        if(result != 0) {
+            return result;
+        }
+        result = new DateTime(account.getTimeZone()).getDayOfMonth();
 
+        try {
+        	result = billCycleDay(subscription.getStartDate(),account.getTimeZone(), plan);
+        } catch (CatalogApiException e) {
+            log.error("Unexpected catalog error encountered when updating BCD",e);
+        }
+        
+
+        Account modifiedAccount = new DefaultAccount(
+                account.getId(),
+                account.getExternalKey(),
+                account.getEmail(),
+                account.getName(),
+                account.getFirstNameLength(),
+                account.getCurrency(),
+                result,
+                account.getPaymentProviderName(),
+                account.getTimeZone(),
+                account.getLocale(),
+                account.getAddress1(),
+                account.getAddress2(),
+                account.getCompanyName(),
+                account.getCity(),
+                account.getStateOrProvince(),
+                account.getCountry(),
+                account.getPostalCode(),
+                account.getPhone(),
+                account.getCreatedDate(),
+                null // Updated date will be set internally
+        );
+        accountApi.updateAccount(modifiedAccount);
+        return result;
+    }
+
+    private int billCycleDay(DateTime requestedDate, DateTimeZone timeZone, 
+    		Plan plan) throws CatalogApiException {
+
+        DateTime date = plan.dateOfFirstRecurringNonZeroCharge(requestedDate);
+        return date.toDateTime(timeZone).getDayOfMonth();
+
+    }
+    
+    
     @Override
     public void setChargedThroughDate(final UUID subscriptionId, final DateTime ctd) {
-        SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionId);
+        SubscriptionData subscription = (SubscriptionData) entitlementDao.getSubscriptionFromId(subscriptionId);
 
         SubscriptionBuilder builder = new SubscriptionBuilder(subscription)
             .setChargedThroughDate(ctd)
             .setPaidThroughDate(subscription.getPaidThroughDate());
 
-        dao.updateSubscription(new SubscriptionData(builder));
+        entitlementDao.updateSubscription(new SubscriptionData(builder));
     }
 
     @Override
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
index 05a063d..7a1f9a9 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
@@ -166,7 +166,7 @@ public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
         for (TimedMigration cur : migrationEvents) {
 
             if (cur.getEventType() == EventType.PHASE) {
-                PhaseEvent nextPhaseEvent = PhaseEventData.getNextPhaseEvent(cur.getPhase().getName(), subscriptionData, now, cur.getEventTime());
+                PhaseEvent nextPhaseEvent = PhaseEventData.createNextPhaseEvent(cur.getPhase().getName(), subscriptionData, now, cur.getEventTime());
                 events.add(nextPhaseEvent);
 
             } else if (cur.getEventType() == EventType.API_USER) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionApiService.java
index 5b0f45c..1c523df 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionApiService.java
@@ -73,7 +73,7 @@ public class SubscriptionApiService {
 
             TimedPhase nextTimedPhase = curAndNextPhases[1];
             PhaseEvent nextPhaseEvent = (nextTimedPhase != null) ?
-                    PhaseEventData.getNextPhaseEvent(nextTimedPhase.getPhase().getName(), subscription, processedDate, nextTimedPhase.getStartPhase()) :
+                    PhaseEventData.createNextPhaseEvent(nextTimedPhase.getPhase().getName(), subscription, processedDate, nextTimedPhase.getStartPhase()) :
                         null;
             List<EntitlementEvent> events = new ArrayList<EntitlementEvent>();
             events.add(creationEvent);
@@ -147,7 +147,7 @@ public class SubscriptionApiService {
 
         TimedPhase nextTimedPhase = planAligner.getNextTimedPhase(subscription, now, now);
         PhaseEvent nextPhaseEvent = (nextTimedPhase != null) ?
-                PhaseEventData.getNextPhaseEvent(nextTimedPhase.getPhase().getName(), subscription, now, nextTimedPhase.getStartPhase()) :
+                PhaseEventData.createNextPhaseEvent(nextTimedPhase.getPhase().getName(), subscription, now, nextTimedPhase.getStartPhase()) :
                     null;
         if (nextPhaseEvent != null) {
             uncancelEvents.add(nextPhaseEvent);
@@ -199,7 +199,7 @@ public class SubscriptionApiService {
             ActionPolicy policy = planChangeResult.getPolicy();
             PriceList newPriceList = planChangeResult.getNewPriceList();
 
-            Plan newPlan = catalogService.getFullCatalog().findPlan(productName, term, newPriceList.getName(), requestedDate);
+            Plan newPlan = catalogService.getFullCatalog().findPlan(productName, term, newPriceList.getName(), requestedDate, subscription.getStartDate());
             DateTime effectiveDate = subscription.getPlanChangeEffectiveDate(policy, requestedDate);
 
             TimedPhase currentTimedPhase = planAligner.getCurrentTimedPhaseOnChange(subscription, newPlan, newPriceList.getName(), requestedDate, effectiveDate);
@@ -216,7 +216,7 @@ public class SubscriptionApiService {
 
             TimedPhase nextTimedPhase = planAligner.getNextTimedPhaseOnChange(subscription, newPlan, newPriceList.getName(), requestedDate, effectiveDate);
             PhaseEvent nextPhaseEvent = (nextTimedPhase != null) ?
-                    PhaseEventData.getNextPhaseEvent(nextTimedPhase.getPhase().getName(), subscription, now, nextTimedPhase.getStartPhase()) :
+                    PhaseEventData.createNextPhaseEvent(nextTimedPhase.getPhase().getName(), subscription, now, nextTimedPhase.getStartPhase()) :
                         null;
                     List<EntitlementEvent> changeEvents = new ArrayList<EntitlementEvent>();
                     // Only add the PHASE if it does not coincide with the CHANGE, if not this is 'just' a CHANGE.
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
index 20e4663..d94af98 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
@@ -202,12 +202,14 @@ public class SubscriptionData implements Subscription {
         return null;
     }
 
+    @Override
     public SubscriptionTransition getPreviousTransition() {
-
         if (transitions == null) {
             return null;
         }
-        SubscriptionTransition latestSubscription = null;
+
+        // ensure that the latestSubscription is always set; prevents NPEs
+        SubscriptionTransition latestSubscription = transitions.get(0);
         for (SubscriptionTransition cur : transitions) {
             if (cur.getEffectiveTransitionTime().isAfter(clock.getUTCNow())) {
                 break;
@@ -299,17 +301,13 @@ public class SubscriptionData implements Subscription {
             throw new EntitlementError(String.format("Unexpected policy type %s", policy.toString()));
         }
 
-        //
-        // If CTD is null or CTD in the past, we default to the start date of the current phase
-        //
-        DateTime effectiveDate = chargedThroughDate;
-        if (chargedThroughDate == null || chargedThroughDate.isBefore(clock.getUTCNow())) {
-            effectiveDate = getCurrentPhaseStart();
+        if (chargedThroughDate == null) {
+            return requestedDate;
+        } else {
+            return chargedThroughDate.isBefore(requestedDate) ? requestedDate : chargedThroughDate;
         }
-        return effectiveDate;
     }
 
-
     public DateTime getCurrentPhaseStart() {
 
         if (transitions == null) {
@@ -323,10 +321,12 @@ public class SubscriptionData implements Subscription {
                 // Skip future events
                 continue;
             }
-            if (cur.getEventType() == EventType.PHASE) {
+            if (cur.getEventType() == EventType.PHASE
+                    || (cur.getEventType() == EventType.API_USER && cur.getApiEventType() == ApiEventType.CHANGE)) {
                 return cur.getEffectiveTransitionTime();
             }
         }
+
         // CREATE event
         return transitions.get(0).getEffectiveTransitionTime();
     }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index c7294b4..2646d3e 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -84,7 +84,7 @@ public class Engine implements EventListener, EntitlementService {
     private final EntitlementConfig config;
     private final NotificationQueueService notificationQueueService;
 
-    private NotificationQueue subscritionEventQueue;
+    private NotificationQueue subscriptionEventQueue;
 
     @Inject
     public Engine(Clock clock, EntitlementDao dao, PlanAligner planAligner,
@@ -114,7 +114,7 @@ public class Engine implements EventListener, EntitlementService {
     public void initialize() {
 
         try {
-            subscritionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
+            subscriptionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
                     NOTIFICATION_QUEUE_NAME,
                     new NotificationQueueHandler() {
                 @Override
@@ -142,7 +142,7 @@ public class Engine implements EventListener, EntitlementService {
                 }
                 @Override
                 public long getDaoClaimTimeMs() {
-                    return config.getDaoMaxReadyEvents();
+                    return config.getDaoClaimTimeMs();
                 }
             });
         } catch (NotificationQueueAlreadyExists e) {
@@ -152,13 +152,13 @@ public class Engine implements EventListener, EntitlementService {
 
     @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
     public void start() {
-        subscritionEventQueue.startQueue();
+        subscriptionEventQueue.startQueue();
     }
 
     @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
     public void stop() {
-        if (subscritionEventQueue != null) {
-            subscritionEventQueue.stopQueue();
+        if (subscriptionEventQueue != null) {
+            subscriptionEventQueue.stopQueue();
          }
     }
 
@@ -211,7 +211,7 @@ public class Engine implements EventListener, EntitlementService {
             DateTime now = clock.getUTCNow();
             TimedPhase nextTimedPhase = planAligner.getNextTimedPhase(subscription, now, now);
             PhaseEvent nextPhaseEvent = (nextTimedPhase != null) ?
-                    PhaseEventData.getNextPhaseEvent(nextTimedPhase.getPhase().getName(), subscription, now, nextTimedPhase.getStartPhase()) :
+                    PhaseEventData.createNextPhaseEvent(nextTimedPhase.getPhase().getName(), subscription, now, nextTimedPhase.getStartPhase()) :
                         null;
             if (nextPhaseEvent != null) {
                 dao.createNextPhaseEvent(subscription.getId(), nextPhaseEvent);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
index c9ddf90..5a04a47 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
@@ -16,17 +16,17 @@
 
 package com.ning.billing.entitlement.engine.dao;
 
-import com.ning.billing.entitlement.api.billing.EntitlementBillingApiException;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
 import com.ning.billing.entitlement.api.migration.AccountMigrationData;
-import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.events.EntitlementEvent;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
 
 public interface EntitlementDao {
 
@@ -76,4 +76,5 @@ public interface EntitlementDao {
     public void migrate(UUID acountId, AccountMigrationData data);
 
     public void undoMigration(UUID accountId);
-}
+
+	}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
index 062a0d5..dff8fad 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
@@ -21,18 +21,18 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.UUID;
+
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import com.google.inject.Inject;
 import com.ning.billing.ErrorCode;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.billing.EntitlementBillingApiException;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
@@ -53,7 +53,6 @@ import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import sun.jkernel.Bundle;
 
 
 public class EntitlementSqlDao implements EntitlementDao {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/SubscriptionSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/SubscriptionSqlDao.java
index d896ebe..7b2ddcc 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/SubscriptionSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/SubscriptionSqlDao.java
@@ -46,7 +46,8 @@ import java.util.UUID;
 @ExternalizedSqlViaStringTemplate3()
 public interface SubscriptionSqlDao extends Transactional<SubscriptionSqlDao>, CloseMe, Transmogrifier {
 
-    @SqlUpdate
+
+	@SqlUpdate
     public void insertSubscription(@Bind(binder = ISubscriptionDaoBinder.class) SubscriptionData sub);
 
     @SqlUpdate
@@ -62,7 +63,7 @@ public interface SubscriptionSqlDao extends Transactional<SubscriptionSqlDao>, C
 
     @SqlUpdate
     public void updateSubscription(@Bind("id") String id, @Bind("active_version") long activeVersion, @Bind("ctd_dt") Date ctd, @Bind("ptd_dt") Date ptd);
-
+   
     public static class ISubscriptionDaoBinder implements Binder<Bind, SubscriptionData> {
 
         private Date getDate(DateTime dateTime) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/phase/PhaseEventData.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/phase/PhaseEventData.java
index 9744363..47546ea 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/phase/PhaseEventData.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/phase/PhaseEventData.java
@@ -56,7 +56,7 @@ public class PhaseEventData extends EventBase implements PhaseEvent {
                 + ", isActive()=" + isActive() + "]\n";
     }
 
-    public static final PhaseEvent getNextPhaseEvent(String phaseName, SubscriptionData subscription, DateTime now, DateTime effectiveDate) {
+    public static final PhaseEvent createNextPhaseEvent(String phaseName, SubscriptionData subscription, DateTime now, DateTime effectiveDate) {
         return (phaseName == null) ?
                 null :
                     new PhaseEventData(new PhaseEventBuilder()
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
index dfdc746..c39d001 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
@@ -1,3 +1,4 @@
+DROP TABLE IF EXISTS events;
 DROP TABLE IF EXISTS entitlement_events;
 CREATE TABLE entitlement_events (
     id int(11) unsigned NOT NULL AUTO_INCREMENT,
@@ -16,6 +17,9 @@ CREATE TABLE entitlement_events (
     is_active bool DEFAULT 1,
     PRIMARY KEY(id)
 ) ENGINE=innodb;
+CREATE INDEX idx_ent_1 ON entitlement_events(subscription_id,is_active,effective_dt);
+CREATE INDEX idx_ent_2 ON entitlement_events(subscription_id,effective_dt,created_dt,requested_dt,id);
+
 
 DROP TABLE IF EXISTS subscriptions;
 CREATE TABLE subscriptions (
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/TestDefaultEntitlementBillingApi.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/TestDefaultEntitlementBillingApi.java
index 91ddc91..2d99f8d 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/TestDefaultEntitlementBillingApi.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/TestDefaultEntitlementBillingApi.java
@@ -17,12 +17,15 @@
 package com.ning.billing.entitlement.api.billing;
 
 
+import static org.testng.Assert.assertTrue;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -42,7 +45,6 @@ import com.ning.billing.catalog.api.PlanPhase;
 import com.ning.billing.catalog.api.PriceListSet;
 import com.ning.billing.catalog.glue.CatalogModule;
 import com.ning.billing.entitlement.api.TestApiBase;
-import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
@@ -55,11 +57,11 @@ import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
 import com.ning.billing.entitlement.events.user.ApiEventType;
 import com.ning.billing.lifecycle.KillbillService.ServiceException;
+import com.ning.billing.mock.BrainDeadProxyFactory;
+import com.ning.billing.mock.BrainDeadProxyFactory.ZombieControl;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.glue.ClockModule;
 
-import static org.testng.Assert.assertTrue;
-
 public class TestDefaultEntitlementBillingApi {
 	private static final UUID zeroId = new UUID(0L,0L);
 	private static final UUID oneId = new UUID(1L,0L);
@@ -90,7 +92,7 @@ public class TestDefaultEntitlementBillingApi {
 	@BeforeMethod(alwaysRun=true)
 	public void setupEveryTime() {
 		bundles = new ArrayList<SubscriptionBundle>();
-		final SubscriptionBundle bundle = new SubscriptionBundleData( zeroId,"TestKey", oneId,  new DateTime().minusDays(4));
+		final SubscriptionBundle bundle = new SubscriptionBundleData( zeroId,"TestKey", oneId,  clock.getUTCNow().minusDays(4));
 		bundles.add(bundle);
 		
 		
@@ -98,7 +100,7 @@ public class TestDefaultEntitlementBillingApi {
 		subscriptions = new ArrayList<Subscription>();
 		
 		SubscriptionBuilder builder = new SubscriptionBuilder();
-		subscriptionStartDate = new DateTime().minusDays(3);
+		subscriptionStartDate = clock.getUTCNow().minusDays(3);
 		builder.setStartDate(subscriptionStartDate).setId(oneId);
 		subscription = new SubscriptionData(builder) {
 		    public List<SubscriptionTransition> getAllTransitions() {
@@ -194,15 +196,12 @@ public class TestDefaultEntitlementBillingApi {
 				zeroId, oneId, twoId, EventType.API_USER, ApiEventType.CREATE, then, now, null, null, null, null, SubscriptionState.ACTIVE, nextPlan, nextPhase, nextPriceList);
 		transitions.add(t);
 		
-		AccountUserApi accountApi = new BrainDeadAccountUserApi(){
-
-			@Override
-			public Account getAccountById(UUID accountId) {
-				return new BrainDeadAccount(){@Override
-				public int getBillCycleDay() {
-					return 1;
-				}};
-			}} ;
+		Account account = BrainDeadProxyFactory.createBrainDeadProxyFor(Account.class); 
+		((ZombieControl)account).addResult("getBillCycleDay", 1).addResult("getTimeZone", DateTimeZone.UTC);
+		
+		AccountUserApi accountApi = BrainDeadProxyFactory.createBrainDeadProxyFor(AccountUserApi.class);			
+		((ZombieControl)accountApi).addResult("getAccountById", account);
+				
 		DefaultEntitlementBillingApi api = new DefaultEntitlementBillingApi(dao,accountApi,catalogService);
 		SortedSet<BillingEvent> events = api.getBillingEventsForAccount(new UUID(0L,0L));
 		checkFirstEvent(events, nextPlan, subscription.getStartDate().getDayOfMonth(), oneId, now, nextPhase, ApiEventType.CREATE.toString());
@@ -244,15 +243,12 @@ public class TestDefaultEntitlementBillingApi {
 				zeroId, oneId, twoId, EventType.API_USER, ApiEventType.CREATE, then, now, null, null, null, null, SubscriptionState.ACTIVE, nextPlan, nextPhase, nextPriceList);
 		transitions.add(t);
 		
-		AccountUserApi accountApi = new BrainDeadAccountUserApi(){
-
-			@Override
-			public Account getAccountById(UUID accountId) {
-				return new BrainDeadAccount(){@Override
-				public int getBillCycleDay() {
-					return 1;
-				}};
-			}} ;
+		Account account = BrainDeadProxyFactory.createBrainDeadProxyFor(Account.class); 
+		((ZombieControl)account).addResult("getBillCycleDay", 1).addResult("getTimeZone", DateTimeZone.UTC);
+		
+		AccountUserApi accountApi = BrainDeadProxyFactory.createBrainDeadProxyFor(AccountUserApi.class);			
+		((ZombieControl)accountApi).addResult("getAccountById", account);
+				
 		DefaultEntitlementBillingApi api = new DefaultEntitlementBillingApi(dao,accountApi,catalogService);
 		SortedSet<BillingEvent> events = api.getBillingEventsForAccount(new UUID(0L,0L));
 		checkFirstEvent(events, nextPlan, bundles.get(0).getStartDate().getDayOfMonth(), oneId, now, nextPhase, ApiEventType.CREATE.toString());

invoice/pom.xml 2(+1 -1)

diff --git a/invoice/pom.xml b/invoice/pom.xml
index 81f9eb0..a91cb5d 100644
--- a/invoice/pom.xml
+++ b/invoice/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.6-SNAPSHOT</version>
+        <version>0.1.7-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-invoice</artifactId>
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index da7f565..d38773a 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -20,22 +20,25 @@ import java.math.BigDecimal;
 import java.util.List;
 import java.util.UUID;
 
-import com.ning.billing.invoice.InvoiceListener;
-import com.ning.billing.invoice.api.InvoicePayment;
 import org.joda.time.DateTime;
+
 import com.google.inject.Inject;
+import com.ning.billing.invoice.InvoiceDispatcher;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceApiException;
 import com.ning.billing.invoice.api.InvoiceItem;
+import com.ning.billing.invoice.api.InvoicePayment;
 import com.ning.billing.invoice.api.InvoiceUserApi;
 import com.ning.billing.invoice.dao.InvoiceDao;
 
 public class DefaultInvoiceUserApi implements InvoiceUserApi {
     private final InvoiceDao dao;
+    private final InvoiceDispatcher dispatcher;
 
     @Inject
-    public DefaultInvoiceUserApi(final InvoiceDao dao) {
+    public DefaultInvoiceUserApi(final InvoiceDao dao, final InvoiceDispatcher dispatcher) {
         this.dao = dao;
+        this.dispatcher = dispatcher;
     }
 
     @Override
@@ -78,4 +81,10 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
     public List<Invoice> getUnpaidInvoicesByAccountId(final UUID accountId, final DateTime upToDate) {
         return dao.getUnpaidInvoicesByAccountId(accountId, upToDate);
     }
+
+	@Override
+	public Invoice triggerInvoiceGeneration(UUID accountId,
+			DateTime targetDate, boolean dryrun) throws InvoiceApiException {
+		return dispatcher.processAccount(accountId, targetDate, dryrun);
+	}
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index 3b27fa6..9b475d1 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -28,13 +28,11 @@ import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
-import com.ning.billing.invoice.api.DefaultInvoiceService;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoiceCreationNotification;
 import com.ning.billing.invoice.api.InvoiceItem;
@@ -42,13 +40,8 @@ import com.ning.billing.invoice.api.InvoicePayment;
 import com.ning.billing.invoice.api.user.DefaultInvoiceCreationNotification;
 import com.ning.billing.invoice.model.FixedPriceInvoiceItem;
 import com.ning.billing.invoice.model.RecurringInvoiceItem;
-import com.ning.billing.invoice.notification.DefaultNextBillingDateNotifier;
 import com.ning.billing.invoice.notification.NextBillingDatePoster;
 import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.notificationq.NotificationKey;
-import com.ning.billing.util.notificationq.NotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
 
 public class DefaultInvoiceDao implements InvoiceDao {
     private final static Logger log = LoggerFactory.getLogger(DefaultInvoiceDao.class);
@@ -174,17 +167,22 @@ public class DefaultInvoiceDao implements InvoiceDao {
                     List<InvoicePayment> invoicePayments = invoice.getPayments();
                     InvoicePaymentSqlDao invoicePaymentSqlDao = invoiceDao.become(InvoicePaymentSqlDao.class);
                     invoicePaymentSqlDao.batchCreateFromTransaction(invoicePayments);
-
-                    InvoiceCreationNotification event;
-                    event = new DefaultInvoiceCreationNotification(invoice.getId(), invoice.getAccountId(),
-                                                                  invoice.getBalance(), invoice.getCurrency(),
-                                                                  invoice.getInvoiceDate());
-                    eventBus.postFromTransaction(event, invoiceDao);
                 }
 
                 return null;
             }
         });
+
+        // TODO: move this inside the transaction once the bus is persistent
+        InvoiceCreationNotification event;
+        event = new DefaultInvoiceCreationNotification(invoice.getId(), invoice.getAccountId(),
+                                                      invoice.getBalance(), invoice.getCurrency(),
+                                                      invoice.getInvoiceDate());
+        try {
+            eventBus.post(event);
+        } catch (Bus.EventBusException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
diff --git a/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java b/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java
index 1dfac5b..f6e6af0 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java
@@ -52,10 +52,6 @@ public class InvoiceModule extends AbstractModule {
         bind(InvoicePaymentApi.class).to(DefaultInvoicePaymentApi.class).asEagerSingleton();
     }
 
-    protected void installClock() {
-    	install(new ClockModule());
-    }
-
     protected void installConfig() {
         final InvoiceConfig config = new ConfigurationObjectFactory(System.getProperties()).build(InvoiceConfig.class);
         bind(InvoiceConfig.class).toInstance(config);
@@ -70,8 +66,11 @@ public class InvoiceModule extends AbstractModule {
         bind(NextBillingDatePoster.class).to(DefaultNextBillingDatePoster.class).asEagerSingleton();
     }
 
-    protected void installInvoiceListener() {
+    protected void installGlobalLocker() {
         install(new GlobalLockerModule());
+    }
+
+    protected void installInvoiceListener() {
         bind(InvoiceListener.class).asEagerSingleton();
     }
 
@@ -85,5 +84,6 @@ public class InvoiceModule extends AbstractModule {
         installInvoiceDao();
         installInvoiceUserApi();
         installInvoicePaymentApi();
+        installGlobalLocker();
     }
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
index 0a23d01..e79df61 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
@@ -89,31 +89,35 @@ public class InvoiceDispatcher {
                     new InvoiceApiException(ErrorCode.INVOICE_NO_ACCOUNT_ID_FOR_SUBSCRIPTION_ID, subscriptionId.toString()));
             return;
         }
-
-        GlobalLock lock = null;
+        processAccount(accountId, targetDate, false);
+    }
+    
+    public Invoice processAccount(UUID accountId, DateTime targetDate, boolean dryrun) throws InvoiceApiException {
+		GlobalLock lock = null;
         try {
             lock = locker.lockWithNumberOfTries(LockerService.INVOICE, accountId.toString(), NB_LOCK_TRY);
 
-            processAccountWithLock(accountId, targetDate);
+            return processAccountWithLock(accountId, targetDate, dryrun);
 
         } catch (LockFailedException e) {
             // Not good!
-            log.error(String.format("Failed to process invoice for account %s, subscription %s, targetDate %s",
-                    accountId.toString(), subscriptionId.toString(), targetDate), e);
+            log.error(String.format("Failed to process invoice for account %s, targetDate %s",
+                    accountId.toString(), targetDate), e);
         } finally {
             if (lock != null) {
                 lock.release();
             }
         }
+        return null;
     }
 
-    private void processAccountWithLock(final UUID accountId, final DateTime targetDate) throws InvoiceApiException {
+    private Invoice processAccountWithLock(final UUID accountId, final DateTime targetDate, boolean dryrun) throws InvoiceApiException {
 
         Account account = accountUserApi.getAccountById(accountId);
         if (account == null) {
             log.error("Failed handling entitlement change.",
                     new InvoiceApiException(ErrorCode.INVOICE_ACCOUNT_ID_INVALID, accountId.toString()));
-            return;
+            return null;
         }
 
         SortedSet<BillingEvent> events = entitlementBillingApi.getBillingEventsForAccount(accountId);
@@ -139,10 +143,12 @@ public class InvoiceDispatcher {
             }
             outputDebugData(events, invoiceItemList);
 
-            if (invoice.getNumberOfItems() > 0) {
+            if (invoice.getNumberOfItems() > 0 && !dryrun) {
                 invoiceDao.create(invoice);
             }
         }
+        
+        return invoice;
     }
 
     private void outputDebugData(Collection<BillingEvent> events, Collection<InvoiceItem> invoiceItemList) {
diff --git a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoice.java b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoice.java
index 6d419de..b11bfe2 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoice.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoice.java
@@ -184,7 +184,7 @@ public class DefaultInvoice implements Invoice {
             return true;
         }
 
-        return lastPaymentAttempt.plusDays(numberOfDays).isBefore(targetDate);
+        return !lastPaymentAttempt.plusDays(numberOfDays).isAfter(targetDate);
     }
 
     @Override
diff --git a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java
index 039f305..a8a1ecc 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java
@@ -233,34 +233,4 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
             return fixedPriceInvoiceItem;
         }
     }
-
-//    // assumption: startDate is in the user's time zone
-//    private DateTime calculateSegmentEndDate(final DateTime startDate, final DateTime nextEndDate,
-//                                             final int billCycleDay, final BillingPeriod billingPeriod) {
-//        int dayOfMonth = startDate.getDayOfMonth();
-//        int maxDayOfMonth = startDate.dayOfMonth().getMaximumValue();
-//
-//        DateTime nextBillingDate;
-//
-//        // if the start date is not on the bill cycle day, move it to the nearest following date that works
-//        if ((billCycleDay > maxDayOfMonth) || (dayOfMonth == billCycleDay)) {
-//            nextBillingDate = startDate.plusMonths(billingPeriod.getNumberOfMonths());
-//        } else {
-//            MutableDateTime proposedDate = startDate.toMutableDateTime();
-//
-//            if (dayOfMonth < billCycleDay) {
-//                // move the end date forward to the bill cycle date (same month)
-//                int effectiveBillCycleDay = (billCycleDay > maxDayOfMonth) ? maxDayOfMonth : billCycleDay;
-//                nextBillingDate = proposedDate.dayOfMonth().set(effectiveBillCycleDay).toDateTime();
-//            } else {
-//                // go to the next month
-//                proposedDate = proposedDate.monthOfYear().add(1);
-//                maxDayOfMonth = proposedDate.dayOfMonth().getMaximumValue();
-//                int effectiveBillCycleDay = (billCycleDay > maxDayOfMonth) ? maxDayOfMonth : billCycleDay;
-//                nextBillingDate = proposedDate.dayOfMonth().set(effectiveBillCycleDay).toDateTime();
-//            }
-//        }
-//
-//        return nextBillingDate.isAfter(nextEndDate) ? nextEndDate : nextBillingDate;
-//    }
 }
\ No newline at end of file
diff --git a/invoice/src/main/java/com/ning/billing/invoice/model/InAdvanceBillingMode.java b/invoice/src/main/java/com/ning/billing/invoice/model/InAdvanceBillingMode.java
index 4d3dff9..916d514 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/model/InAdvanceBillingMode.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/model/InAdvanceBillingMode.java
@@ -149,7 +149,7 @@ public class InAdvanceBillingMode implements BillingMode {
         BigDecimal daysInPeriod = new BigDecimal(daysBetween);
         BigDecimal days = new BigDecimal(Days.daysBetween(startDate, nextBillingCycleDate).getDays());
 
-        return days.divide(daysInPeriod, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        return days.divide(daysInPeriod, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
     }
 
     private int calculateNumberOfWholeBillingPeriods(final DateTime startDate, final DateTime endDate, final BillingPeriod billingPeriod) {
@@ -232,6 +232,6 @@ public class InAdvanceBillingMode implements BillingMode {
 
         BigDecimal days = new BigDecimal(Days.daysBetween(previousBillThroughDate, endDate).getDays());
 
-        return days.divide(daysInPeriod, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        return days.divide(daysInPeriod, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
     }
 }
\ No newline at end of file
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 0dd5e3a..5107d71 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -48,15 +48,15 @@ public class DefaultNextBillingDateNotifier implements  NextBillingDateNotifier 
     private final EntitlementDao entitlementDao;
 
     private NotificationQueue nextBillingQueue;
-	private InvoiceListener listener;
+	private final InvoiceListener listener;
 
     @Inject
-	public DefaultNextBillingDateNotifier(NotificationQueueService notificationQueueService, 
+	public DefaultNextBillingDateNotifier(NotificationQueueService notificationQueueService,
 			InvoiceConfig config, EntitlementDao entitlementDao, InvoiceListener listener){
 		this.notificationQueueService = notificationQueueService;
 		this.config = config;
         this.entitlementDao = entitlementDao;
-        this.listener = listener; 
+        this.listener = listener;
 	}
 
     @Override
@@ -97,7 +97,7 @@ public class DefaultNextBillingDateNotifier implements  NextBillingDateNotifier 
                 }
                 @Override
                 public long getDaoClaimTimeMs() {
-                    return config.getDaoMaxReadyEvents();
+                    return config.getDaoClaimTimeMs();
                 }
             });
         } catch (NotificationQueueAlreadyExists e) {
@@ -121,5 +121,5 @@ public class DefaultNextBillingDateNotifier implements  NextBillingDateNotifier 
         listener.handleNextBillingDateEvent(subscriptionId, eventDateTime);
     }
 
- 
+
 }
diff --git a/invoice/src/main/resources/com/ning/billing/invoice/dao/FixedPriceInvoiceItemSqlDao.sql.stg b/invoice/src/main/resources/com/ning/billing/invoice/dao/FixedPriceInvoiceItemSqlDao.sql.stg
index 90310d5..79fd3db 100644
--- a/invoice/src/main/resources/com/ning/billing/invoice/dao/FixedPriceInvoiceItemSqlDao.sql.stg
+++ b/invoice/src/main/resources/com/ning/billing/invoice/dao/FixedPriceInvoiceItemSqlDao.sql.stg
@@ -9,7 +9,8 @@ fields(prefix) ::= <<
   <prefix>start_date,
   <prefix>end_date,
   <prefix>amount,
-  <prefix>currency
+  <prefix>currency,
+  <prefix>created_date
 >>
 
 getById() ::= <<
@@ -40,13 +41,13 @@ getInvoiceItemsBySubscription() ::= <<
 create() ::= <<
   INSERT INTO fixed_invoice_items(<fields()>)
   VALUES(:id, :invoiceId, :subscriptionId, :planName, :phaseName,
-         :startDate, :endDate, :amount, :currency);
+         :startDate, :endDate, :amount, :currency, NOW());
 >>
 
 batchCreateFromTransaction() ::= <<
   INSERT INTO fixed_invoice_items(<fields()>)
   VALUES(:id, :invoiceId, :subscriptionId, :planName, :phaseName,
-         :startDate, :endDate, :amount, :currency);
+         :startDate, :endDate, :amount, :currency, NOW());
 >>
 
 update() ::= <<
diff --git a/invoice/src/main/resources/com/ning/billing/invoice/dao/RecurringInvoiceItemSqlDao.sql.stg b/invoice/src/main/resources/com/ning/billing/invoice/dao/RecurringInvoiceItemSqlDao.sql.stg
index d50a1c5..a5291e8 100644
--- a/invoice/src/main/resources/com/ning/billing/invoice/dao/RecurringInvoiceItemSqlDao.sql.stg
+++ b/invoice/src/main/resources/com/ning/billing/invoice/dao/RecurringInvoiceItemSqlDao.sql.stg
@@ -11,7 +11,8 @@ fields(prefix) ::= <<
   <prefix>amount,
   <prefix>rate,
   <prefix>currency,
-  <prefix>reversed_item_id
+  <prefix>reversed_item_id,
+  <prefix>created_date
 >>
 
 getById() ::= <<
@@ -42,13 +43,13 @@ getInvoiceItemsBySubscription() ::= <<
 create() ::= <<
   INSERT INTO recurring_invoice_items(<fields()>)
   VALUES(:id, :invoiceId, :subscriptionId, :planName, :phaseName, :startDate, :endDate,
-         :amount, :rate, :currency, :reversedItemId);
+         :amount, :rate, :currency, :reversedItemId, NOW());
 >>
 
 batchCreateFromTransaction() ::= <<
   INSERT INTO recurring_invoice_items(<fields()>)
   VALUES(:id, :invoiceId, :subscriptionId, :planName, :phaseName, :startDate, :endDate,
-         :amount, :rate, :currency, :reversedItemId);
+         :amount, :rate, :currency, :reversedItemId, NOW());
 >>
 
 update() ::= <<
diff --git a/invoice/src/main/resources/com/ning/billing/invoice/ddl.sql b/invoice/src/main/resources/com/ning/billing/invoice/ddl.sql
index c7712a8..e210044 100644
--- a/invoice/src/main/resources/com/ning/billing/invoice/ddl.sql
+++ b/invoice/src/main/resources/com/ning/billing/invoice/ddl.sql
@@ -12,6 +12,7 @@ CREATE TABLE recurring_invoice_items (
   rate numeric(10,4) NULL,
   currency char(3) NOT NULL,
   reversed_item_id char(36),
+  created_date datetime NOT NULL,
   PRIMARY KEY(id)
 ) ENGINE=innodb;
 CREATE INDEX recurring_invoice_items_subscription_id ON recurring_invoice_items(subscription_id ASC);
@@ -28,16 +29,13 @@ CREATE TABLE fixed_invoice_items (
   end_date datetime NOT NULL,
   amount numeric(10,4) NULL,
   currency char(3) NOT NULL,
+  created_date datetime NOT NULL,
   PRIMARY KEY(id)
 ) ENGINE=innodb;
 CREATE INDEX fixed_invoice_items_subscription_id ON fixed_invoice_items(subscription_id ASC);
 CREATE INDEX fixed_invoice_items_invoice_id ON fixed_invoice_items(invoice_id ASC);
 
 DROP TABLE IF EXISTS invoice_locking;
-CREATE TABLE invoice_locking (
-  account_id char(36) NOT NULL,
-  PRIMARY KEY(account_id)
-) ENGINE = innodb;
 
 DROP TABLE IF EXISTS invoices;
 CREATE TABLE invoices (
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
index 6957ba0..9d7805a 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
@@ -21,8 +21,13 @@ import static org.testng.Assert.fail;
 
 import java.io.IOException;
 
+import com.google.inject.Inject;
 import com.ning.billing.invoice.tests.InvoicingTestBase;
 import org.apache.commons.io.IOUtils;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.TransactionCallback;
+import org.skife.jdbi.v2.TransactionStatus;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
@@ -32,6 +37,7 @@ import com.google.inject.Stage;
 import com.ning.billing.invoice.glue.InvoiceModuleWithEmbeddedDb;
 import com.ning.billing.util.bus.BusService;
 import com.ning.billing.util.bus.DefaultBusService;
+import org.testng.annotations.BeforeMethod;
 
 public abstract class InvoiceDaoTestBase extends InvoicingTestBase {
     protected InvoiceDao invoiceDao;
@@ -70,6 +76,32 @@ public abstract class InvoiceDaoTestBase extends InvoicingTestBase {
         }
     }
 
+    @BeforeMethod(alwaysRun = true)
+    public void cleanupData() {
+        module.getDbi().inTransaction(new TransactionCallback<Void>() {
+            @Override
+            public Void inTransaction(Handle h, TransactionStatus status)
+                    throws Exception {
+                h.execute("truncate table accounts");
+                h.execute("truncate table entitlement_events");
+                h.execute("truncate table subscriptions");
+                h.execute("truncate table bundles");
+                h.execute("truncate table notifications");
+                h.execute("truncate table claimed_notifications");
+                h.execute("truncate table invoices");
+                h.execute("truncate table fixed_invoice_items");
+                h.execute("truncate table recurring_invoice_items");
+                h.execute("truncate table tag_definitions");
+                h.execute("truncate table tags");
+                h.execute("truncate table custom_fields");
+                h.execute("truncate table invoice_payments");
+                h.execute("truncate table payment_attempts");
+                h.execute("truncate table payments");
+                return null;
+            }
+        });
+    }
+
     @AfterClass(alwaysRun = true)
     protected void tearDown() {
         module.stopDb();
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTests.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTests.java
index 4e07a39..711665a 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTests.java
@@ -169,7 +169,7 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
     @Test
     public void testGetInvoicesForPayment() {
         List<UUID> invoices;
-        DateTime notionalDate = new DateTime();
+        DateTime notionalDate = clock.getUTCNow();
 
         // create a new invoice with one item
         UUID accountId = UUID.randomUUID();
diff --git a/invoice/src/test/java/com/ning/billing/invoice/glue/InvoiceModuleWithEmbeddedDb.java b/invoice/src/test/java/com/ning/billing/invoice/glue/InvoiceModuleWithEmbeddedDb.java
index 4ab3495..bb9094e 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/glue/InvoiceModuleWithEmbeddedDb.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/glue/InvoiceModuleWithEmbeddedDb.java
@@ -23,6 +23,7 @@ import com.ning.billing.invoice.api.test.DefaultInvoiceTestApi;
 import com.ning.billing.invoice.dao.InvoicePaymentSqlDao;
 import com.ning.billing.invoice.dao.RecurringInvoiceItemSqlDao;
 import com.ning.billing.util.glue.GlobalLockerModule;
+import com.ning.billing.util.notificationq.NotificationConfig;
 import org.skife.jdbi.v2.IDBI;
 import com.ning.billing.account.glue.AccountModule;
 import com.ning.billing.catalog.glue.CatalogModule;
@@ -50,6 +51,10 @@ public class InvoiceModuleWithEmbeddedDb extends InvoiceModule {
         helper.stopMysql();
     }
 
+    public IDBI getDbi() {
+        return dbi;
+    }
+
     public RecurringInvoiceItemSqlDao getInvoiceItemSqlDao() {
         return dbi.onDemand(RecurringInvoiceItemSqlDao.class);
     }
@@ -80,4 +85,23 @@ public class InvoiceModuleWithEmbeddedDb extends InvoiceModule {
 
         install(new BusModule());
     }
+
+    private class TestNotificationConfig implements NotificationConfig {
+        @Override
+        public boolean isNotificationProcessingOff() {
+            return false;
+        }
+        @Override
+        public long getNotificationSleepTimeMs() {
+            return 10;
+        }
+        @Override
+        public int getDaoMaxReadyEvents() {
+            return 1;
+        }
+        @Override
+        public long getDaoClaimTimeMs() {
+            return 60000;
+        }
+    }
 }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/glue/InvoiceModuleWithMocks.java b/invoice/src/test/java/com/ning/billing/invoice/glue/InvoiceModuleWithMocks.java
index b180a03..01f0eb2 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/glue/InvoiceModuleWithMocks.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/glue/InvoiceModuleWithMocks.java
@@ -31,6 +31,11 @@ public class InvoiceModuleWithMocks extends InvoiceModule {
     }
 
     @Override
+    protected void installGlobalLocker() {
+        bind(GlobalLocker.class).to(MockGlobalLocker.class).asEagerSingleton();
+    }
+
+    @Override
     protected void installInvoiceListener() {
 
     }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/MockModule.java b/invoice/src/test/java/com/ning/billing/invoice/MockModule.java
new file mode 100644
index 0000000..ec96f60
--- /dev/null
+++ b/invoice/src/test/java/com/ning/billing/invoice/MockModule.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice;
+
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.IDBI;
+
+import com.google.inject.AbstractModule;
+import com.ning.billing.account.glue.AccountModule;
+import com.ning.billing.catalog.glue.CatalogModule;
+import com.ning.billing.dbi.DBIProvider;
+import com.ning.billing.dbi.DbiConfig;
+import com.ning.billing.dbi.MysqlTestingHelper;
+import com.ning.billing.entitlement.glue.EntitlementModule;
+import com.ning.billing.invoice.glue.InvoiceModule;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.glue.BusModule;
+import com.ning.billing.util.glue.GlobalLockerModule;
+import com.ning.billing.util.glue.NotificationQueueModule;
+
+
+public class MockModule extends AbstractModule {
+
+
+    public static final String PLUGIN_NAME = "yoyo";
+
+    @Override
+    protected void configure() {
+
+        bind(Clock.class).to(ClockMock.class).asEagerSingleton();
+        bind(ClockMock.class).asEagerSingleton();
+
+        final MysqlTestingHelper helper = new MysqlTestingHelper();
+        bind(MysqlTestingHelper.class).toInstance(helper);
+        if (helper.isUsingLocalInstance()) {
+            bind(IDBI.class).toProvider(DBIProvider.class).asEagerSingleton();
+            final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
+            bind(DbiConfig.class).toInstance(config);
+        } else {
+            final IDBI dbi = helper.getDBI();
+            bind(IDBI.class).toInstance(dbi);
+        }
+
+        install(new GlobalLockerModule());
+        install(new NotificationQueueModule());
+        install(new InvoiceModule());
+        install(new AccountModule());
+        install(new EntitlementModule());
+        install(new CatalogModule());
+        install(new BusModule());
+
+    }
+
+ 
+}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/TestInvoiceDispatcher.java b/invoice/src/test/java/com/ning/billing/invoice/TestInvoiceDispatcher.java
new file mode 100644
index 0000000..987dd4d
--- /dev/null
+++ b/invoice/src/test/java/com/ning/billing/invoice/TestInvoiceDispatcher.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.joda.time.DateTime;
+import org.testng.Assert;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.inject.Inject;
+import com.ning.billing.account.api.Account;
+import com.ning.billing.account.api.AccountUserApi;
+import com.ning.billing.catalog.MockInternationalPrice;
+import com.ning.billing.catalog.MockPlan;
+import com.ning.billing.catalog.MockPlanPhase;
+import com.ning.billing.catalog.api.BillingPeriod;
+import com.ning.billing.catalog.api.Currency;
+import com.ning.billing.catalog.api.InternationalPrice;
+import com.ning.billing.catalog.api.Plan;
+import com.ning.billing.catalog.api.PlanPhase;
+import com.ning.billing.dbi.MysqlTestingHelper;
+import com.ning.billing.entitlement.api.billing.BillingEvent;
+import com.ning.billing.entitlement.api.billing.BillingModeType;
+import com.ning.billing.entitlement.api.billing.DefaultBillingEvent;
+import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
+import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionTransition.SubscriptionTransitionType;
+import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.invoice.api.InvoiceApiException;
+import com.ning.billing.invoice.api.InvoiceUserApi;
+import com.ning.billing.invoice.dao.InvoiceDao;
+import com.ning.billing.invoice.model.InvoiceGenerator;
+import com.ning.billing.invoice.notification.NextBillingDateNotifier;
+import com.ning.billing.mock.BrainDeadProxyFactory;
+import com.ning.billing.mock.BrainDeadProxyFactory.ZombieControl;
+import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.globallocker.GlobalLocker;
+
+@Guice(modules = {MockModule.class})
+public class TestInvoiceDispatcher {
+
+    @Inject
+    InvoiceUserApi invoiceUserApi;
+    @Inject
+ 	private InvoiceGenerator generator;
+    @Inject
+    private InvoiceDao invoiceDao;
+    @Inject
+    private GlobalLocker locker;
+    
+    @Inject
+    private MysqlTestingHelper helper;
+    
+    @Inject
+    NextBillingDateNotifier notifier;
+    
+    @Inject
+    private BusService busService;
+
+
+
+    @BeforeSuite(alwaysRun = true)
+    public void setup() throws IOException
+    {
+
+
+        final String accountDdl = IOUtils.toString(TestInvoiceDispatcher.class.getResourceAsStream("/com/ning/billing/account/ddl.sql"));
+        final String entitlementDdl = IOUtils.toString(TestInvoiceDispatcher.class.getResourceAsStream("/com/ning/billing/entitlement/ddl.sql"));
+        final String invoiceDdl = IOUtils.toString(TestInvoiceDispatcher.class.getResourceAsStream("/com/ning/billing/invoice/ddl.sql"));
+//        final String paymentDdl = IOUtils.toString(TestInvoiceDispatcher.class.getResourceAsStream("/com/ning/billing/payment/ddl.sql"));
+        final String utilDdl = IOUtils.toString(TestInvoiceDispatcher.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
+
+        helper.startMysql();
+
+        helper.initDb(accountDdl);
+        helper.initDb(entitlementDdl);
+        helper.initDb(invoiceDdl);
+//        helper.initDb(paymentDdl);
+        helper.initDb(utilDdl);
+        notifier.initialize();
+        notifier.start();
+        
+        busService.getBus().start();
+    }
+
+
+	    @Test(groups={"fast"}, enabled=true)
+	    public void testDryrunInvoice() throws InvoiceApiException {
+	    	UUID accountId = UUID.randomUUID();
+	    	UUID subscriptionId = UUID.randomUUID();
+
+	    	AccountUserApi accountUserApi = BrainDeadProxyFactory.createBrainDeadProxyFor(AccountUserApi.class);
+	    	Account account = BrainDeadProxyFactory.createBrainDeadProxyFor(Account.class);
+	    	((ZombieControl)accountUserApi).addResult("getAccountById", account);
+	    	((ZombieControl)account).addResult("getCurrency", Currency.USD);
+	    	((ZombieControl)account).addResult("getId", accountId);
+	    	
+	    	Subscription subscription =  BrainDeadProxyFactory.createBrainDeadProxyFor(Subscription.class);
+	    	((ZombieControl)subscription).addResult("getId", subscriptionId);
+	    	SortedSet<BillingEvent> events = new TreeSet<BillingEvent>();
+	    	Plan plan = MockPlan.createBicycleNoTrialEvergreen1USD();
+	    	PlanPhase planPhase = MockPlanPhase.create1USDMonthlyEvergreen();
+			DateTime effectiveDate = new DateTime().minusDays(1);
+			InternationalPrice reccurringPrice = MockInternationalPrice.create1USD();
+			InternationalPrice fixedPrice = null;
+			events.add(new DefaultBillingEvent(subscription, effectiveDate,plan,planPhase, fixedPrice , reccurringPrice, BillingPeriod.MONTHLY, 1, BillingModeType.IN_ADVANCE,"", SubscriptionTransitionType.CREATE));
+	    	EntitlementBillingApi entitlementBillingApi = BrainDeadProxyFactory.createBrainDeadProxyFor(EntitlementBillingApi.class);
+	    	((ZombieControl)entitlementBillingApi).addResult("getBillingEventsForAccount", events);
+	    	
+	    	
+	    	DateTime target = new DateTime();
+	    	
+	    	InvoiceDispatcher dispatcher = new InvoiceDispatcher(generator, accountUserApi, entitlementBillingApi, invoiceDao, locker);
+	    	
+	    	Invoice invoice = dispatcher.processAccount(accountId, target, true);
+	    	Assert.assertNotNull(invoice);
+	    	
+	    	List<Invoice> invoices = invoiceDao.getInvoicesByAccount(accountId);
+	    	Assert.assertEquals(invoices.size(),0);
+	    	
+	    	// Try it again to double check
+	    	invoice = dispatcher.processAccount(accountId, target, true);
+	    	Assert.assertNotNull(invoice);
+	    	
+	    	invoices = invoiceDao.getInvoicesByAccount(accountId);
+	    	Assert.assertEquals(invoices.size(),0);
+	    	
+	    	// This time no dry run
+	    	invoice = dispatcher.processAccount(accountId, target, false);
+	    	Assert.assertNotNull(invoice);
+	    	
+	    	invoices = invoiceDao.getInvoicesByAccount(accountId);
+	    	Assert.assertEquals(invoices.size(),1);
+	    	
+	    }
+}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java
index f0de1ce..136eeb5 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/DefaultInvoiceGeneratorTests.java
@@ -59,8 +59,6 @@ import static org.testng.Assert.assertNull;
 
 @Test(groups = {"fast", "invoicing", "invoiceGenerator"})
 public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
-
-
     private final InvoiceGenerator generator = new DefaultInvoiceGenerator(new DefaultClock());
 
     @Test
@@ -131,8 +129,8 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         assertEquals(invoice.getNumberOfItems(), 2);
 
         BigDecimal expectedNumberOfBillingCycles;
-        expectedNumberOfBillingCycles = ONE.add(FOURTEEN.divide(THIRTY_ONE, NUMBER_OF_DECIMALS, ROUNDING_METHOD));
-        BigDecimal expectedAmount = expectedNumberOfBillingCycles.multiply(rate).setScale(NUMBER_OF_DECIMALS);
+        expectedNumberOfBillingCycles = ONE.add(FOURTEEN.divide(THIRTY_ONE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        BigDecimal expectedAmount = expectedNumberOfBillingCycles.multiply(rate).setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         assertEquals(invoice.getTotalAmount(), expectedAmount);
     }
 
@@ -192,14 +190,14 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         assertEquals(invoice.getNumberOfItems(), 4);
 
         BigDecimal numberOfCyclesEvent1;
-        numberOfCyclesEvent1 = ONE.add(FOURTEEN.divide(THIRTY_ONE, NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        numberOfCyclesEvent1 = ONE.add(FOURTEEN.divide(THIRTY_ONE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD));
 
         BigDecimal numberOfCyclesEvent2 = TWO;
 
         BigDecimal expectedValue;
         expectedValue = numberOfCyclesEvent1.multiply(rate1);
         expectedValue = expectedValue.add(numberOfCyclesEvent2.multiply(rate2));
-        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS);
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
 
         assertEquals(invoice.getTotalAmount(), expectedValue);
     }
@@ -373,7 +371,7 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
 
         // on 6/21/2011, create add-on (subscription 5)
         events.add(createBillingEvent(subscriptionId5, plan5StartDate, plan5, plan5Phase1, 10));
-        expectedAmount = TWENTY.multiply(NINETEEN.divide(THIRTY, NUMBER_OF_DECIMALS, ROUNDING_METHOD)).setScale(NUMBER_OF_DECIMALS);
+        expectedAmount = TWENTY.multiply(NINETEEN).divide(THIRTY, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         testInvoiceGeneration(events, invoiceItems, plan5StartDate, 1, expectedAmount);
 
         // on 7/7/2011, invoice subscription 4 (plan 1)
@@ -391,8 +389,8 @@ public class DefaultInvoiceGeneratorTests extends InvoicingTestBase {
         // on 7/31/2011, convert subscription 3 to annual
         events.add(createBillingEvent(subscriptionId3, plan3UpgradeToAnnualDate, plan3, plan3Phase2, 31));
         expectedAmount = ONE_HUNDRED.subtract(TEN);
-        expectedAmount = expectedAmount.add(TEN.multiply(ELEVEN.divide(THIRTY_ONE, NUMBER_OF_DECIMALS, ROUNDING_METHOD)));
-        expectedAmount = expectedAmount.setScale(NUMBER_OF_DECIMALS);
+        expectedAmount = expectedAmount.add(TEN.multiply(ELEVEN.divide(THIRTY_ONE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD)));
+        expectedAmount = expectedAmount.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         testInvoiceGeneration(events, invoiceItems, plan3UpgradeToAnnualDate, 3, expectedAmount);
 
         // on 8/7/2011, invoice subscription 4 (plan 2)
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/DoubleProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/DoubleProRationTests.java
index 64fa95c..fd37599 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/DoubleProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/annual/DoubleProRationTests.java
@@ -81,9 +81,10 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime endDate = buildDateTime(2012, 1, 27);
 
         BigDecimal expectedValue;
-        expectedValue = FOURTEEN.divide(THREE_HUNDRED_AND_SIXTY_FIVE, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        expectedValue = FOURTEEN.divide(THREE_HUNDRED_AND_SIXTY_FIVE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         expectedValue = expectedValue.add(ONE);
-        expectedValue = expectedValue.add(TWELVE.divide(THREE_HUNDRED_AND_SIXTY_SIX, NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.add(TWELVE.divide(THREE_HUNDRED_AND_SIXTY_SIX, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
 
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
@@ -95,9 +96,10 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime endDate = buildDateTime(2012, 1, 27);
 
         BigDecimal expectedValue;
-        expectedValue = FOURTEEN.divide(THREE_HUNDRED_AND_SIXTY_FIVE, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        expectedValue = FOURTEEN.divide(THREE_HUNDRED_AND_SIXTY_FIVE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         expectedValue = expectedValue.add(ONE);
-        expectedValue = expectedValue.add(TWELVE.divide(THREE_HUNDRED_AND_SIXTY_SIX, NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.add(TWELVE.divide(THREE_HUNDRED_AND_SIXTY_SIX, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
 
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
@@ -109,9 +111,10 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime endDate = buildDateTime(2012, 1, 27);
 
         BigDecimal expectedValue;
-        expectedValue = FOURTEEN.divide(THREE_HUNDRED_AND_SIXTY_FIVE, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        expectedValue = FOURTEEN.divide(THREE_HUNDRED_AND_SIXTY_FIVE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         expectedValue = expectedValue.add(ONE);
-        expectedValue = expectedValue.add(TWELVE.divide(THREE_HUNDRED_AND_SIXTY_SIX, NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.add(TWELVE.divide(THREE_HUNDRED_AND_SIXTY_SIX, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
 
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
@@ -123,9 +126,10 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime endDate = buildDateTime(2012, 1, 27);
 
         BigDecimal expectedValue;
-        expectedValue = FOURTEEN.divide(THREE_HUNDRED_AND_SIXTY_FIVE, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        expectedValue = FOURTEEN.divide(THREE_HUNDRED_AND_SIXTY_FIVE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         expectedValue = expectedValue.add(ONE);
-        expectedValue = expectedValue.add(TWELVE.divide(THREE_HUNDRED_AND_SIXTY_SIX, NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.add(TWELVE.divide(THREE_HUNDRED_AND_SIXTY_SIX, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
 
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/DoubleProRationTests.java b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/DoubleProRationTests.java
index c1d6085..184f5d5 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/DoubleProRationTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/inAdvance/quarterly/DoubleProRationTests.java
@@ -37,7 +37,8 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime targetDate = buildDateTime(2011, 1, 1);
         DateTime endDate = buildDateTime(2011, 4, 27);
 
-        BigDecimal expectedValue = FOURTEEN.divide(NINETY_TWO, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        BigDecimal expectedValue = FOURTEEN.divide(NINETY_TWO, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
 
@@ -47,7 +48,8 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime targetDate = buildDateTime(2011, 1, 7);
         DateTime endDate = buildDateTime(2011, 4, 27);
 
-        BigDecimal expectedValue = FOURTEEN.divide(NINETY_TWO, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        BigDecimal expectedValue = FOURTEEN.divide(NINETY_TWO, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
 
@@ -57,7 +59,8 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime targetDate = buildDateTime(2011, 1, 15);
         DateTime endDate = buildDateTime(2011, 4, 27);
 
-        BigDecimal expectedValue = ONE.add(FOURTEEN.divide(NINETY_TWO, NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        BigDecimal expectedValue = ONE.add(FOURTEEN.divide(NINETY_TWO, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
 
@@ -68,8 +71,9 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime endDate = buildDateTime(2011, 4, 27);
 
         BigDecimal expectedValue;
-        expectedValue = FOURTEEN.divide(NINETY_TWO, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        expectedValue = FOURTEEN.divide(NINETY_TWO, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         expectedValue = expectedValue.add(ONE);
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
 
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
@@ -81,9 +85,10 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime endDate = buildDateTime(2011, 4, 27);
 
         BigDecimal expectedValue;
-        expectedValue = FOURTEEN.divide(NINETY_TWO, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        expectedValue = FOURTEEN.divide(NINETY_TWO, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         expectedValue = expectedValue.add(ONE);
-        expectedValue = expectedValue.add(TWELVE.divide(NINETY_ONE, NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.add(TWELVE.divide(NINETY_ONE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
 
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
@@ -95,9 +100,10 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime endDate = buildDateTime(2011, 4, 27);
 
         BigDecimal expectedValue;
-        expectedValue = FOURTEEN.divide(NINETY_TWO, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        expectedValue = FOURTEEN.divide(NINETY_TWO, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         expectedValue = expectedValue.add(ONE);
-        expectedValue = expectedValue.add(TWELVE.divide(NINETY_ONE, NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.add(TWELVE.divide(NINETY_ONE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
 
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
@@ -109,9 +115,10 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime endDate = buildDateTime(2011, 4, 27);
 
         BigDecimal expectedValue;
-        expectedValue = FOURTEEN.divide(NINETY_TWO, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        expectedValue = FOURTEEN.divide(NINETY_TWO, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         expectedValue = expectedValue.add(ONE);
-        expectedValue = expectedValue.add(TWELVE.divide(NINETY_ONE, NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.add(TWELVE.divide(NINETY_ONE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
 
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
@@ -123,9 +130,10 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime endDate = buildDateTime(2011, 4, 27);
 
         BigDecimal expectedValue;
-        expectedValue = FOURTEEN.divide(NINETY_TWO, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        expectedValue = FOURTEEN.divide(NINETY_TWO, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         expectedValue = expectedValue.add(ONE);
-        expectedValue = expectedValue.add(TWELVE.divide(NINETY_ONE, NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.add(TWELVE.divide(NINETY_ONE, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD));
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
 
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
@@ -137,8 +145,9 @@ public class DoubleProRationTests extends ProRationInAdvanceTestBase {
         DateTime endDate = buildDateTime(2011, 8, 27);
 
         BigDecimal expectedValue;
-        expectedValue = FOURTEEN.divide(NINETY_TWO, NUMBER_OF_DECIMALS, ROUNDING_METHOD);
+        expectedValue = FOURTEEN.divide(NINETY_TWO, 2 * NUMBER_OF_DECIMALS, ROUNDING_METHOD);
         expectedValue = expectedValue.add(TWO);
+        expectedValue = expectedValue.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
 
         testCalculateNumberOfBillingCycles(startDate, endDate, targetDate, 15, expectedValue);
     }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/ProRationTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/tests/ProRationTestBase.java
index a24b0c0..122b13b 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/ProRationTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/ProRationTestBase.java
@@ -66,7 +66,7 @@ public abstract class ProRationTestBase extends InvoicingTestBase {
             numberOfBillingCycles = numberOfBillingCycles.add(item.getNumberOfCycles());
         }
 
-        return numberOfBillingCycles;
+        return numberOfBillingCycles.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
     }
 
     protected BigDecimal calculateNumberOfBillingCycles(DateTime startDate, DateTime targetDate, int billingCycleDay) throws InvalidDateSequenceException {
@@ -77,6 +77,6 @@ public abstract class ProRationTestBase extends InvoicingTestBase {
             numberOfBillingCycles = numberOfBillingCycles.add(item.getNumberOfCycles());
         }
 
-        return numberOfBillingCycles;
+        return numberOfBillingCycles.setScale(NUMBER_OF_DECIMALS, ROUNDING_METHOD);
     }
 }
\ No newline at end of file

payment/pom.xml 9(+2 -7)

diff --git a/payment/pom.xml b/payment/pom.xml
index bdb30c8..41218d1 100644
--- a/payment/pom.xml
+++ b/payment/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.6-SNAPSHOT</version>
+        <version>0.1.7-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-payment</artifactId>
@@ -62,7 +62,7 @@
         <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
-            <version>2.0.1</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>commons-collections</groupId>
@@ -86,11 +86,6 @@
         <dependency>
             <groupId>com.ning.billing</groupId>
             <artifactId>killbill-util</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.ning.billing</groupId>
-            <artifactId>killbill-util</artifactId>
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
diff --git a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
index a8364bf..3b1a6f9 100644
--- a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
+++ b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
@@ -23,6 +23,9 @@ import java.util.UUID;
 
 import javax.annotation.Nullable;
 
+import org.apache.commons.lang.StringUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,12 +38,14 @@ import com.ning.billing.invoice.model.DefaultInvoicePayment;
 import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.provider.PaymentProviderPlugin;
 import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
+import com.ning.billing.payment.setup.PaymentConfig;
 
 public class DefaultPaymentApi implements PaymentApi {
     private final PaymentProviderPluginRegistry pluginRegistry;
     private final AccountUserApi accountUserApi;
     private final InvoicePaymentApi invoicePaymentApi;
     private final PaymentDao paymentDao;
+    private final PaymentConfig config;
 
     private static final Logger log = LoggerFactory.getLogger(DefaultPaymentApi.class);
 
@@ -48,11 +53,13 @@ public class DefaultPaymentApi implements PaymentApi {
     public DefaultPaymentApi(PaymentProviderPluginRegistry pluginRegistry,
                              AccountUserApi accountUserApi,
                              InvoicePaymentApi invoicePaymentApi,
-                             PaymentDao paymentDao) {
+                             PaymentDao paymentDao,
+                             PaymentConfig config) {
         this.pluginRegistry = pluginRegistry;
         this.accountUserApi = accountUserApi;
         this.invoicePaymentApi = invoicePaymentApi;
         this.paymentDao = paymentDao;
+        this.config = config;
     }
 
     @Override
@@ -127,6 +134,28 @@ public class DefaultPaymentApi implements PaymentApi {
     }
 
     @Override
+    public Either<PaymentError, PaymentInfo> createPayment(UUID paymentAttemptId) {
+        PaymentAttempt paymentAttempt = paymentDao.getPaymentAttemptById(paymentAttemptId);
+
+        if (paymentAttempt != null) {
+            Invoice invoice = invoicePaymentApi.getInvoice(paymentAttempt.getInvoiceId());
+            Account account = accountUserApi.getAccountById(paymentAttempt.getAccountId());
+
+            if (invoice != null && account != null) {
+                if (invoice.getBalance().compareTo(BigDecimal.ZERO) == 0 ) {
+                    // TODO: send a notification that invoice was ignored?
+                    log.info("Received invoice for payment with outstanding amount of 0 {} ", invoice);
+                    Either.left(new PaymentError("invoice_balance_0", "Invoice balance was 0"));
+                }
+                else {
+                    return processPayment(getPaymentProviderPlugin(account), account, invoice, paymentAttempt);
+                }
+            }
+        }
+        return Either.left(new PaymentError("retry_payment_error", "Could not load payment attempt, invoice or account for id " + paymentAttemptId));
+    }
+
+    @Override
     public List<Either<PaymentError, PaymentInfo>> createPayment(Account account, List<String> invoiceIds) {
         final PaymentProviderPlugin plugin = getPaymentProviderPlugin(account);
 
@@ -136,56 +165,93 @@ public class DefaultPaymentApi implements PaymentApi {
             Invoice invoice = invoicePaymentApi.getInvoice(UUID.fromString(invoiceId));
 
             if (invoice.getBalance().compareTo(BigDecimal.ZERO) == 0 ) {
-            // TODO: send a notification that invoice was ignored?
-                log.info("Received invoice for payment with outstanding amount of 0 {} ", invoice);
-            }
-            else if (invoiceId.equals(paymentDao.getPaymentAttemptForInvoiceId(invoiceId))) {
-                //TODO: do equals on invoice instead and only reject when invoice is exactly the same?
-                log.info("Duplicate invoice payment event, already received invoice {} ", invoice);
+                // TODO: send a notification that invoice was ignored?
+                log.info("Received invoice for payment with balance of 0 {} ", invoice);
+                Either.left(new PaymentError("invoice_balance_0", "Invoice balance was 0"));
             }
             else {
                 PaymentAttempt paymentAttempt = paymentDao.createPaymentAttempt(invoice);
-                Either<PaymentError, PaymentInfo> paymentOrError = plugin.processInvoice(account, invoice);
-                processedPaymentsOrErrors.add(paymentOrError);
 
-                PaymentInfo paymentInfo = null;
+                processedPaymentsOrErrors.add(processPayment(plugin, account, invoice, paymentAttempt));
+            }
+        }
 
-                if (paymentOrError.isRight()) {
-                    paymentInfo = paymentOrError.getRight();
-                    paymentDao.savePaymentInfo(paymentInfo);
+        return processedPaymentsOrErrors;
+    }
 
-                    Either<PaymentError, PaymentMethodInfo> paymentMethodInfoOrError = plugin.getPaymentMethodInfo(paymentInfo.getPaymentMethodId());
+    private Either<PaymentError, PaymentInfo> processPayment(PaymentProviderPlugin plugin, Account account, Invoice invoice, PaymentAttempt paymentAttempt) {
+        Either<PaymentError, PaymentInfo> paymentOrError = plugin.processInvoice(account, invoice);
+        PaymentInfo paymentInfo = null;
 
-                    if (paymentMethodInfoOrError.isRight()) {
-                        PaymentMethodInfo paymentMethodInfo = paymentMethodInfoOrError.getRight();
+        if (paymentOrError.isLeft()) {
+            String error = StringUtils.substring(paymentOrError.getLeft().getMessage() + paymentOrError.getLeft().getType(), 0, 100);
+            log.info("Could not process a payment for " + paymentAttempt + " error was " + error);
 
-                        if (paymentMethodInfo instanceof CreditCardPaymentMethodInfo) {
-                            CreditCardPaymentMethodInfo ccPaymentMethod = (CreditCardPaymentMethodInfo)paymentMethodInfo;
-                            paymentDao.updatePaymentInfo(ccPaymentMethod.getType(), paymentInfo.getPaymentId(), ccPaymentMethod.getCardType(), ccPaymentMethod.getCardCountry());
-                        }
-                        else if (paymentMethodInfo instanceof PaypalPaymentMethodInfo) {
-                            PaypalPaymentMethodInfo paypalPaymentMethodInfo = (PaypalPaymentMethodInfo)paymentMethodInfo;
-                            paymentDao.updatePaymentInfo(paypalPaymentMethodInfo.getType(), paymentInfo.getPaymentId(), null, null);
-                        }
-                    }
+            scheduleRetry(paymentAttempt, error);
+        }
+        else {
+            paymentInfo = paymentOrError.getRight();
+            paymentDao.savePaymentInfo(paymentInfo);
 
+            Either<PaymentError, PaymentMethodInfo> paymentMethodInfoOrError = plugin.getPaymentMethodInfo(paymentInfo.getPaymentMethodId());
 
-                    if (paymentInfo.getPaymentId() != null) {
-                        paymentDao.updatePaymentAttemptWithPaymentId(paymentAttempt.getPaymentAttemptId(), paymentInfo.getPaymentId());
-                    }
-                }
+            if (paymentMethodInfoOrError.isRight()) {
+                PaymentMethodInfo paymentMethodInfo = paymentMethodInfoOrError.getRight();
 
-                invoicePaymentApi.notifyOfPaymentAttempt(new DefaultInvoicePayment(paymentAttempt.getPaymentAttemptId(),
-                                                                                   invoice.getId(),
-                                                                                   paymentAttempt.getPaymentAttemptDate(),
-                                                                                   paymentInfo == null || paymentInfo.getStatus().equalsIgnoreCase("Error") ? null : paymentInfo.getAmount(),
-//                                                                                 paymentInfo.getRefundAmount(), TODO
-                                                                                   paymentInfo == null || paymentInfo.getStatus().equalsIgnoreCase("Error") ? null : invoice.getCurrency()));
+                if (paymentMethodInfo instanceof CreditCardPaymentMethodInfo) {
+                    CreditCardPaymentMethodInfo ccPaymentMethod = (CreditCardPaymentMethodInfo)paymentMethodInfo;
+                    paymentDao.updatePaymentInfo(ccPaymentMethod.getType(), paymentInfo.getPaymentId(), ccPaymentMethod.getCardType(), ccPaymentMethod.getCardCountry());
+                }
+                else if (paymentMethodInfo instanceof PaypalPaymentMethodInfo) {
+                    PaypalPaymentMethodInfo paypalPaymentMethodInfo = (PaypalPaymentMethodInfo)paymentMethodInfo;
+                    paymentDao.updatePaymentInfo(paypalPaymentMethodInfo.getType(), paymentInfo.getPaymentId(), null, null);
+                }
+            }
 
+            if (paymentInfo.getPaymentId() != null) {
+                paymentDao.updatePaymentAttemptWithPaymentId(paymentAttempt.getPaymentAttemptId(), paymentInfo.getPaymentId());
             }
         }
 
-        return processedPaymentsOrErrors;
+        invoicePaymentApi.notifyOfPaymentAttempt(new DefaultInvoicePayment(paymentAttempt.getPaymentAttemptId(),
+                                                                           invoice.getId(),
+                                                                           paymentAttempt.getPaymentAttemptDate(),
+                                                                           paymentInfo == null || paymentInfo.getStatus().equalsIgnoreCase("Error") ? null : paymentInfo.getAmount(),
+//                                                                         paymentInfo.getRefundAmount(), TODO
+                                                                           paymentInfo == null || paymentInfo.getStatus().equalsIgnoreCase("Error") ? null : invoice.getCurrency()));
+
+        return paymentOrError;
+    }
+
+    private void scheduleRetry(PaymentAttempt paymentAttempt, String error) {
+        final List<Integer> retryDays = config.getPaymentRetryDays();
+
+        int retryCount = 0;
+
+        if (paymentAttempt.getRetryCount() != null) {
+            retryCount = paymentAttempt.getRetryCount();
+        }
+
+        if (retryCount < retryDays.size()) {
+            int retryInDays = 0;
+            DateTime nextRetryDate = new DateTime(DateTimeZone.UTC);
+
+            try {
+                retryInDays = retryDays.get(retryCount);
+                nextRetryDate = nextRetryDate.plusDays(retryInDays);
+            }
+            catch (NumberFormatException ex) {
+                log.error("Could not get retry day for retry count {}", retryCount);
+            }
+
+            paymentDao.updatePaymentAttemptWithRetryInfo(paymentAttempt.getPaymentAttemptId(), retryCount + 1, nextRetryDate);
+        }
+        else if (retryCount == retryDays.size()) {
+            log.info("Last payment retry failed for {} ", paymentAttempt);
+        }
+        else {
+            log.error("Cannot update payment retry information because retry count is invalid {} ", retryCount);
+        }
     }
 
     @Override
@@ -212,4 +278,19 @@ public class DefaultPaymentApi implements PaymentApi {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public List<PaymentInfo> getPaymentInfo(List<String> invoiceIds) {
+        return paymentDao.getPaymentInfo(invoiceIds);
+    }
+
+    @Override
+    public PaymentAttempt getPaymentAttemptForInvoiceId(String invoiceId) {
+        return paymentDao.getPaymentAttemptForInvoiceId(invoiceId);
+    }
+
+    @Override
+    public PaymentInfo getPaymentInfoForPaymentAttemptId(String paymentAttemptId) {
+        return paymentDao.getPaymentInfoForPaymentAttemptId(paymentAttemptId);
+    }
+
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/api/PaymentStatus.java b/payment/src/main/java/com/ning/billing/payment/api/PaymentStatus.java
new file mode 100644
index 0000000..c90a330
--- /dev/null
+++ b/payment/src/main/java/com/ning/billing/payment/api/PaymentStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.payment.api;
+
+public enum PaymentStatus {
+    Pending,
+    Created,
+    Completed,
+    Processed,
+    Incomplete,
+    Error,
+    Reversalerror,
+    Processing,
+    Expired,
+    Unknown
+}
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/com/ning/billing/payment/dao/DefaultPaymentDao.java
index eacc226..a86db13 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/DefaultPaymentDao.java
@@ -16,8 +16,10 @@
 
 package com.ning.billing.payment.dao;
 
+import java.util.List;
 import java.util.UUID;
 
+import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 
 import com.google.inject.Inject;
@@ -44,6 +46,12 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
+    public PaymentAttempt createPaymentAttempt(PaymentAttempt paymentAttempt) {
+        sqlDao.insertPaymentAttempt(paymentAttempt);
+        return paymentAttempt;
+    }
+
+    @Override
     public PaymentAttempt createPaymentAttempt(Invoice invoice) {
         final PaymentAttempt paymentAttempt = new PaymentAttempt(UUID.randomUUID(), invoice);
 
@@ -66,4 +74,29 @@ public class DefaultPaymentDao implements PaymentDao {
         sqlDao.updatePaymentInfo(type, paymentId, cardType, cardCountry);
     }
 
+    @Override
+    public List<PaymentInfo> getPaymentInfo(List<String> invoiceIds) {
+        return sqlDao.getPaymentInfos(invoiceIds);
+    }
+
+    @Override
+    public List<PaymentAttempt> getPaymentAttemptsForInvoiceIds(List<String> invoiceIds) {
+        return sqlDao.getPaymentAttemptsForInvoiceIds(invoiceIds);
+    }
+
+    @Override
+    public void updatePaymentAttemptWithRetryInfo(UUID paymentAttemptId, int retryCount, DateTime nextRetryDate) {
+        sqlDao.updatePaymentAttemptWithRetryInfo(paymentAttemptId.toString(), retryCount, nextRetryDate);
+    }
+
+    @Override
+    public PaymentAttempt getPaymentAttemptById(UUID paymentAttemptId) {
+        return sqlDao.getPaymentAttemptById(paymentAttemptId.toString());
+    }
+
+    @Override
+    public PaymentInfo getPaymentInfoForPaymentAttemptId(String paymentAttemptIdStr) {
+        return sqlDao.getPaymentInfoForPaymentAttemptId(paymentAttemptIdStr);
+    }
+
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
index 5cf065b..de2d8cc 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
@@ -16,8 +16,11 @@
 
 package com.ning.billing.payment.dao;
 
+import java.util.List;
 import java.util.UUID;
 
+import org.joda.time.DateTime;
+
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.payment.api.PaymentAttempt;
 import com.ning.billing.payment.api.PaymentInfo;
@@ -25,15 +28,23 @@ import com.ning.billing.payment.api.PaymentInfo;
 public interface PaymentDao {
 
     PaymentAttempt createPaymentAttempt(Invoice invoice);
+    PaymentAttempt createPaymentAttempt(PaymentAttempt paymentAttempt);
 
     void savePaymentInfo(PaymentInfo right);
 
     PaymentAttempt getPaymentAttemptForPaymentId(String paymentId);
+    List<PaymentAttempt> getPaymentAttemptsForInvoiceIds(List<String> invoiceIds);
 
     void updatePaymentAttemptWithPaymentId(UUID paymentAttemptId, String paymentId);
+    void updatePaymentAttemptWithRetryInfo(UUID paymentAttemptId, int retryCount, DateTime nextRetryDate);
 
     PaymentAttempt getPaymentAttemptForInvoiceId(String invoiceId);
 
     void updatePaymentInfo(String paymentMethodType, String paymentId, String cardType, String cardCountry);
 
+    List<PaymentInfo> getPaymentInfo(List<String> invoiceIds);
+
+    PaymentAttempt getPaymentAttemptById(UUID paymentAttemptId);
+    PaymentInfo getPaymentInfoForPaymentAttemptId(String paymentAttemptId);
+
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentSqlDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentSqlDao.java
index 972ee64..9919d34 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentSqlDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentSqlDao.java
@@ -21,6 +21,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.Date;
+import java.util.List;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -37,6 +38,7 @@ import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
 import org.skife.jdbi.v2.tweak.ResultSetMapper;
+import org.skife.jdbi.v2.unstable.BindIn;
 
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.payment.api.PaymentAttempt;
@@ -53,18 +55,39 @@ public interface PaymentSqlDao extends Transactional<PaymentSqlDao>, CloseMe, Tr
 
     @SqlQuery
     @Mapper(PaymentAttemptMapper.class)
+    PaymentAttempt getPaymentAttemptById(@Bind("payment_attempt_id") String paymentAttemptId);
+
+    @SqlQuery
+    @Mapper(PaymentAttemptMapper.class)
     PaymentAttempt getPaymentAttemptForInvoiceId(@Bind("invoice_id") String invoiceId);
 
+    @SqlQuery
+    @Mapper(PaymentAttemptMapper.class)
+    List<PaymentAttempt> getPaymentAttemptsForInvoiceIds(@BindIn("invoiceIds") List<String> invoiceIds);
+
+    @SqlQuery
+    @Mapper(PaymentInfoMapper.class)
+    PaymentInfo getPaymentInfoForPaymentAttemptId(@Bind("payment_attempt_id") String paymentAttemptId);
+
     @SqlUpdate
     void updatePaymentAttemptWithPaymentId(@Bind("payment_attempt_id") String paymentAttemptId,
                                            @Bind("payment_id") String paymentId);
 
     @SqlUpdate
+    void updatePaymentAttemptWithRetryInfo(@Bind("payment_attempt_id") String paymentAttemptId,
+                                           @Bind("retry_count") int retryCount,
+                                           @Bind("next_retry_dt") DateTime nextRetryDate);
+
+    @SqlUpdate
     void updatePaymentInfo(@Bind("payment_method") String paymentMethod,
                            @Bind("payment_id") String paymentId,
                            @Bind("card_type") String cardType,
                            @Bind("card_country") String cardCountry);
 
+    @SqlQuery
+    @Mapper(PaymentInfoMapper.class)
+    List<PaymentInfo> getPaymentInfos(@BindIn("invoiceIds") List<String> invoiceIds);
+
     @SqlUpdate
     void insertPaymentInfo(@Bind(binder = PaymentInfoBinder.class) PaymentInfo paymentInfo);
 
diff --git a/payment/src/main/java/com/ning/billing/payment/RequestProcessor.java b/payment/src/main/java/com/ning/billing/payment/RequestProcessor.java
index 892d424..d53c236 100644
--- a/payment/src/main/java/com/ning/billing/payment/RequestProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/RequestProcessor.java
@@ -31,7 +31,6 @@ import com.ning.billing.payment.api.Either;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentError;
 import com.ning.billing.payment.api.PaymentInfo;
-import com.ning.billing.payment.provider.PaymentProviderPlugin;
 import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.bus.Bus.EventBusException;
@@ -40,7 +39,6 @@ public class RequestProcessor {
     public static final String PAYMENT_PROVIDER_KEY = "paymentProvider";
     private final AccountUserApi accountUserApi;
     private final PaymentApi paymentApi;
-    private final PaymentProviderPluginRegistry pluginRegistry;
     private final Bus eventBus;
 
     private static final Logger log = LoggerFactory.getLogger(RequestProcessor.class);
@@ -52,7 +50,6 @@ public class RequestProcessor {
                             Bus eventBus) {
         this.accountUserApi = accountUserApi;
         this.paymentApi = paymentApi;
-        this.pluginRegistry = pluginRegistry;
         this.eventBus = eventBus;
     }
 
@@ -77,20 +74,4 @@ public class RequestProcessor {
             throw new RuntimeException(ex);
         }
     }
-
-    @Subscribe
-    public void receivePaymentInfoRequest(PaymentInfoRequest paymentInfoRequest) throws EventBusException {
-        final Account account = accountUserApi.getAccountById(paymentInfoRequest.getAccountId());
-        if (account == null) {
-            log.info("could not process payment info request: could not find a valid account for event {}", paymentInfoRequest);
-        }
-        else {
-            final String paymentProviderName = account.getFieldValue(PAYMENT_PROVIDER_KEY);
-            final PaymentProviderPlugin plugin = pluginRegistry.getPlugin(paymentProviderName);
-
-            Either<PaymentError, PaymentInfo> result = plugin.getPaymentInfo(paymentInfoRequest.getPaymentId());
-
-            eventBus.post(result.isLeft() ? result.getLeft() : result.getRight());
-        }
-    }
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/RetryService.java b/payment/src/main/java/com/ning/billing/payment/RetryService.java
new file mode 100644
index 0000000..bbe8a61
--- /dev/null
+++ b/payment/src/main/java/com/ning/billing/payment/RetryService.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.payment;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.google.inject.Inject;
+import com.ning.billing.lifecycle.KillbillService;
+import com.ning.billing.lifecycle.LifecycleHandlerType;
+import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
+import com.ning.billing.payment.api.PaymentApi;
+import com.ning.billing.payment.api.PaymentAttempt;
+import com.ning.billing.payment.api.PaymentInfo;
+import com.ning.billing.payment.api.PaymentStatus;
+import com.ning.billing.payment.setup.PaymentConfig;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public class RetryService implements KillbillService {
+    public static final String SERVICE_NAME = "retry-service";
+    public static final String QUEUE_NAME = "retry-events";
+
+    private final NotificationQueueService notificationQueueService;
+    private final PaymentConfig config;
+    private final PaymentApi paymentApi;
+    private NotificationQueue retryQueue;
+
+    @Inject
+    public RetryService(NotificationQueueService notificationQueueService,
+                        PaymentConfig config,
+                        PaymentApi paymentApi) {
+        this.notificationQueueService = notificationQueueService;
+        this.paymentApi = paymentApi;
+        this.config = config;
+    }
+
+    @Override
+    public String getName() {
+        return SERVICE_NAME;
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+    public void initialize() throws NotificationQueueAlreadyExists {
+        retryQueue = notificationQueueService.createNotificationQueue(SERVICE_NAME, QUEUE_NAME, new NotificationQueueHandler() {
+            @Override
+            public void handleReadyNotification(String notificationKey, DateTime eventDateTime) {
+                retry(notificationKey);
+            }
+        },
+        config);
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+    public void start() {
+        retryQueue.startQueue();
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+    public void stop() {
+        if (retryQueue != null) {
+            retryQueue.stopQueue();
+         }
+    }
+
+    public void scheduleRetry(Transmogrifier transactionalDao, PaymentAttempt paymentAttempt, DateTime timeOfRetry) {
+        final String id = paymentAttempt.getPaymentAttemptId().toString();
+
+        NotificationKey key = new NotificationKey() {
+            @Override
+            public String toString() {
+                return id;
+            }
+        };
+        retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, key);
+    }
+
+    private void retry(String paymentAttemptId) {
+        PaymentInfo paymentInfo = paymentApi.getPaymentInfoForPaymentAttemptId(paymentAttemptId);
+
+        if (paymentInfo != null && !PaymentStatus.Processed.equals(PaymentStatus.valueOf(paymentInfo.getStatus()))) {
+            paymentApi.createPayment(UUID.fromString(paymentAttemptId));
+        }
+    }
+}
diff --git a/payment/src/main/java/com/ning/billing/payment/setup/PaymentConfig.java b/payment/src/main/java/com/ning/billing/payment/setup/PaymentConfig.java
index cdc5384..46f00fc 100644
--- a/payment/src/main/java/com/ning/billing/payment/setup/PaymentConfig.java
+++ b/payment/src/main/java/com/ning/billing/payment/setup/PaymentConfig.java
@@ -16,11 +16,37 @@
 
 package com.ning.billing.payment.setup;
 
+import java.util.List;
+
 import org.skife.config.Config;
-import org.skife.config.DefaultNull;
+import org.skife.config.Default;
+
+import com.ning.billing.util.notificationq.NotificationConfig;
 
-public interface PaymentConfig {
+public interface PaymentConfig extends NotificationConfig {
     @Config("killbill.payment.provider.default")
-    @DefaultNull
+    @Default("noop")
     public String getDefaultPaymentProvider();
+
+    @Config("killbill.payment.retry.days")
+    @Default("8,8,8")
+    public List<Integer> getPaymentRetryDays();
+
+    @Config("killbill.payment.dao.claim.time")
+    @Default("60000")
+    public long getDaoClaimTimeMs();
+
+    @Config("killbill.payment.dao.ready.max")
+    @Default("10")
+    public int getDaoMaxReadyEvents();
+
+    @Config("killbill.payment.engine.notifications.sleep")
+    @Default("500")
+    public long getNotificationSleepTimeMs();
+
+    @Config("killbill.payment.engine.events.off")
+    // turn off payment retries by default
+    @Default("true")
+    public boolean isNotificationProcessingOff();
+
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/setup/PaymentModule.java b/payment/src/main/java/com/ning/billing/payment/setup/PaymentModule.java
index 935b968..85641ed 100644
--- a/payment/src/main/java/com/ning/billing/payment/setup/PaymentModule.java
+++ b/payment/src/main/java/com/ning/billing/payment/setup/PaymentModule.java
@@ -22,6 +22,7 @@ import org.skife.config.ConfigurationObjectFactory;
 
 import com.google.inject.AbstractModule;
 import com.ning.billing.payment.RequestProcessor;
+import com.ning.billing.payment.RetryService;
 import com.ning.billing.payment.api.DefaultPaymentApi;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentService;
@@ -47,6 +48,10 @@ public class PaymentModule extends AbstractModule {
     protected void installPaymentProviderPlugins(PaymentConfig config) {
     }
 
+    protected void installRetryEngine() {
+        bind(RetryService.class).asEagerSingleton();
+    }
+
     @Override
     protected void configure() {
         final ConfigurationObjectFactory factory = new ConfigurationObjectFactory(props);
diff --git a/payment/src/main/resources/com/ning/billing/payment/dao/PaymentSqlDao.sql.stg b/payment/src/main/resources/com/ning/billing/payment/dao/PaymentSqlDao.sql.stg
index a62c366..6576ecb 100644
--- a/payment/src/main/resources/com/ning/billing/payment/dao/PaymentSqlDao.sql.stg
+++ b/payment/src/main/resources/com/ning/billing/payment/dao/PaymentSqlDao.sql.stg
@@ -24,6 +24,7 @@ paymentInfoFields(prefix) ::= <<
     <prefix>payment_type,
     <prefix>status,
     <prefix>reference_id,
+    <prefix>payment_method_id,
     <prefix>payment_method,
     <prefix>card_type,
     <prefix>card_country,
@@ -43,6 +44,18 @@ getPaymentAttemptForPaymentId() ::= <<
      WHERE payment_id = :payment_id
 >>
 
+getPaymentAttemptById() ::= <<
+    SELECT <paymentAttemptFields()>
+      FROM payment_attempts
+     WHERE payment_attempt_id = :payment_attempt_id
+>>
+
+getPaymentAttemptsForInvoiceIds(invoiceIds) ::= <<
+    SELECT <paymentAttemptFields()>
+      FROM payment_attempts
+     WHERE invoice_id in (<invoiceIds>)
+>>
+
 getPaymentAttemptForInvoiceId() ::= <<
     SELECT <paymentAttemptFields()>
       FROM payment_attempts
@@ -58,7 +71,7 @@ updatePaymentAttemptWithPaymentId() ::= <<
 
 insertPaymentInfo() ::= <<
     INSERT INTO payments (<paymentInfoFields()>)
-    VALUES (:payment_id, :amount, :refund_amount, :bank_identification_number, :payment_number, :payment_type, :status, :reference_id, :payment_method, :card_type, :card_country, :effective_dt, :created_dt, :updated_dt);
+    VALUES (:payment_id, :amount, :refund_amount, :bank_identification_number, :payment_number, :payment_type, :status, :reference_id, :payment_method_id, :payment_method, :card_type, :card_country, :effective_dt, :created_dt, :updated_dt);
 >>
 
 updatePaymentInfo() ::= <<
@@ -68,4 +81,19 @@ updatePaymentInfo() ::= <<
            card_country = :card_country,
            updated_dt = NOW()
      WHERE payment_id = :payment_id
+>>
+
+updatePaymentAttemptWithRetryInfo() ::= <<
+    UPDATE payment_attempts
+       SET retry_count = :retry_count,
+           next_retry_dt = :next_retry_dt,
+           updated_dt = NOW()
+     WHERE payment_attempt_id = :payment_attempt_id
+>>
+
+getPaymentInfos(invoiceIds) ::= <<
+    SELECT <paymentInfoFields("p.")>
+      FROM payments p, payment_attempts pa
+     WHERE pa.invoice_id in (<invoiceIds>)
+       AND pa.payment_id = p.payment_id
 >>
\ No newline at end of file
diff --git a/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java b/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
index 8c9d623..2ef26e4 100644
--- a/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
+++ b/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
@@ -100,9 +100,19 @@ public abstract class TestPaymentApi {
         assertEquals(paymentAttempt.getPaymentId(), paymentInfo.getPaymentId());
         assertEquals(paymentAttempt.getPaymentAttemptDate().withMillisOfSecond(0).withSecondOfMinute(0), now.withMillisOfSecond(0).withSecondOfMinute(0));
 
+        List<PaymentInfo> paymentInfos = paymentApi.getPaymentInfo(Arrays.asList(invoice.getId().toString()));
+        assertNotNull(paymentInfos);
+        assertTrue(paymentInfos.size() > 0);
+
+        PaymentInfo paymentInfoFromGet = paymentInfos.get(0);
+        assertEquals(paymentInfo, paymentInfoFromGet);
+
+        PaymentAttempt paymentAttemptFromGet = paymentApi.getPaymentAttemptForInvoiceId(invoice.getId().toString());
+        assertEquals(paymentAttempt, paymentAttemptFromGet);
+
     }
 
-    private PaymentProviderAccount setupAccountWithPaymentMethod() throws AccountApiException {
+    private PaymentProviderAccount setupAccountWithPaypalPaymentMethod() throws AccountApiException {
         final Account account = testHelper.createTestPayPalAccount();
         paymentApi.createPaymentProviderAccount(account);
 
@@ -131,9 +141,10 @@ public abstract class TestPaymentApi {
     }
 
     @Test(enabled=true)
-    public void testCreatePaymentMethod() throws AccountApiException {
-        PaymentProviderAccount account = setupAccountWithPaymentMethod();
+    public void testCreatePaypalPaymentMethod() throws AccountApiException {
+        PaymentProviderAccount account = setupAccountWithPaypalPaymentMethod();
         assertNotNull(account);
+        Either<PaymentError, List<PaymentMethodInfo>> paymentMethodsOrError = paymentApi.getPaymentMethods(account.getAccountKey());
     }
 
     @Test(enabled=true)
@@ -160,7 +171,7 @@ public abstract class TestPaymentApi {
 
     @Test(enabled=true)
     public void testCannotDeleteDefaultPaymentMethod() throws AccountApiException {
-        PaymentProviderAccount account = setupAccountWithPaymentMethod();
+        PaymentProviderAccount account = setupAccountWithPaypalPaymentMethod();
 
         Either<PaymentError, Void> errorOrVoid = paymentApi.deletePaymentMethod(account.getAccountKey(), account.getDefaultPaymentMethodId());
 
@@ -197,4 +208,5 @@ public abstract class TestPaymentApi {
         assertTrue(errorOrVoid1.isRight());
         assertTrue(errorOrVoid2.isLeft());
     }
+
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
index bb83927..b245e55 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
@@ -16,10 +16,16 @@
 
 package com.ning.billing.payment.dao;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.joda.time.DateTime;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.payment.api.PaymentAttempt;
 import com.ning.billing.payment.api.PaymentInfo;
@@ -46,6 +52,12 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
+    public PaymentAttempt createPaymentAttempt(PaymentAttempt paymentAttempt) {
+        paymentAttempts.put(paymentAttempt.getPaymentAttemptId(), paymentAttempt);
+        return paymentAttempt;
+    }
+
+    @Override
     public void savePaymentInfo(PaymentInfo paymentInfo) {
         payments.put(paymentInfo.getPaymentId(), paymentInfo);
     }
@@ -63,7 +75,7 @@ public class MockPaymentDao implements PaymentDao {
     @Override
     public PaymentAttempt getPaymentAttemptForInvoiceId(String invoiceId) {
         for (PaymentAttempt paymentAttempt : paymentAttempts.values()) {
-            if (invoiceId.equals(paymentAttempt.getInvoiceId())) {
+            if (invoiceId.equals(paymentAttempt.getInvoiceId().toString())) {
                 return paymentAttempt;
             }
         }
@@ -72,8 +84,59 @@ public class MockPaymentDao implements PaymentDao {
 
     @Override
     public void updatePaymentInfo(String paymentMethodType, String paymentId, String cardType, String cardCountry) {
-        // TODO Auto-generated method stub
+        PaymentInfo existingPayment = payments.get(paymentId);
+        if (existingPayment != null) {
+            PaymentInfo payment = existingPayment.cloner().setPaymentMethod(paymentMethodType).setCardType(cardType).setCardCountry(cardCountry).build();
+            payments.put(paymentId, payment);
+        }
+    }
+
+    @Override
+    public List<PaymentInfo> getPaymentInfo(List<String> invoiceIds) {
+        List<PaymentAttempt> attempts = getPaymentAttemptsForInvoiceIds(invoiceIds);
+        List<PaymentInfo> paymentsToReturn = new ArrayList<PaymentInfo>(invoiceIds.size());
 
+        for (final PaymentAttempt attempt : attempts) {
+            paymentsToReturn.addAll(Collections2.filter(payments.values(), new Predicate<PaymentInfo>() {
+                @Override
+                public boolean apply(PaymentInfo input) {
+                    return input.getPaymentId().equals(attempt.getPaymentId());
+                }
+            }));
+        }
+        return paymentsToReturn;
+    }
+
+    @Override
+    public List<PaymentAttempt> getPaymentAttemptsForInvoiceIds(List<String> invoiceIds) {
+        List<PaymentAttempt> paymentAttempts = new ArrayList<PaymentAttempt>(invoiceIds.size());
+        for (String invoiceId : invoiceIds) {
+            PaymentAttempt attempt = getPaymentAttemptForInvoiceId(invoiceId);
+            if (attempt != null) {
+                paymentAttempts.add(attempt);
+            }
+        }
+        return paymentAttempts;
+    }
+
+    @Override
+    public void updatePaymentAttemptWithRetryInfo(UUID paymentAttemptId, int retryCount, DateTime nextRetryDate) {
+        PaymentAttempt existingAttempt = paymentAttempts.get(paymentAttemptId);
+        if (existingAttempt != null) {
+            PaymentAttempt attempt = existingAttempt.cloner().setPaymentAttemptId(paymentAttemptId).setRetryCount(retryCount).setNextRetryDate(nextRetryDate).build();
+            paymentAttempts.put(paymentAttemptId, attempt);
+        }
+    }
+
+    @Override
+    public PaymentAttempt getPaymentAttemptById(UUID paymentAttemptId) {
+        return paymentAttempts.get(paymentAttemptId);
+    }
+
+    @Override
+    public PaymentInfo getPaymentInfoForPaymentAttemptId(String paymentAttemptId) {
+        // TODO Auto-generated method stub
+        return null;
     }
 
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
index 6c57c77..18b0a15 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
@@ -17,17 +17,22 @@
 package com.ning.billing.payment.dao;
 
 import java.math.BigDecimal;
+import java.util.Arrays;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.catalog.api.Currency;
+import com.ning.billing.payment.api.PaymentAttempt;
 import com.ning.billing.payment.api.PaymentInfo;
 
 public abstract class TestPaymentDao {
 
-    protected PaymentDao dao;
+    protected PaymentDao paymentDao;
 
     @Test
     public void testCreatePayment() {
@@ -44,7 +49,7 @@ public abstract class TestPaymentDao {
                                                            .setEffectiveDate(new DateTime(DateTimeZone.UTC))
                                                            .build();
 
-        dao.savePaymentInfo(paymentInfo);
+        paymentDao.savePaymentInfo(paymentInfo);
     }
 
     @Test
@@ -62,10 +67,55 @@ public abstract class TestPaymentDao {
                                                            .setEffectiveDate(new DateTime(DateTimeZone.UTC))
                                                            .build();
 
-        dao.savePaymentInfo(paymentInfo);
+        paymentDao.savePaymentInfo(paymentInfo);
 
-        dao.updatePaymentInfo("CreditCard", paymentInfo.getPaymentId(), "Visa", "US");
+        paymentDao.updatePaymentInfo("CreditCard", paymentInfo.getPaymentId(), "Visa", "US");
 
     }
 
+    @Test
+    public void testGetPaymentForInvoice() throws AccountApiException {
+        final UUID invoiceId = UUID.randomUUID();
+        final UUID paymentAttemptId = UUID.randomUUID();
+        final UUID accountId = UUID.randomUUID();
+        final String paymentId = UUID.randomUUID().toString();
+        final BigDecimal invoiceAmount = BigDecimal.TEN;
+
+        final DateTime now = new DateTime(DateTimeZone.UTC);
+
+        PaymentAttempt originalPaymenAttempt = new PaymentAttempt(paymentAttemptId, invoiceId, accountId, invoiceAmount, Currency.USD, now, now, paymentId, null, null);
+
+        PaymentAttempt attempt = paymentDao.createPaymentAttempt(originalPaymenAttempt);
+
+        PaymentAttempt attempt2 = paymentDao.getPaymentAttemptForInvoiceId(invoiceId.toString());
+
+        Assert.assertEquals(attempt, attempt2);
+
+        PaymentAttempt attempt3 = paymentDao.getPaymentAttemptsForInvoiceIds(Arrays.asList(invoiceId.toString())).get(0);
+
+        Assert.assertEquals(attempt, attempt3);
+
+        PaymentAttempt attempt4 = paymentDao.getPaymentAttemptById(attempt3.getPaymentAttemptId());
+
+        Assert.assertEquals(attempt3, attempt4);
+
+        PaymentInfo originalPaymentInfo = new PaymentInfo.Builder().setPaymentId(paymentId)
+                                                           .setAmount(invoiceAmount)
+                                                           .setStatus("Processed")
+                                                           .setBankIdentificationNumber("1234")
+                                                           .setPaymentNumber("12345")
+                                                           .setPaymentMethodId("12345")
+                                                           .setReferenceId("12345")
+                                                           .setType("Electronic")
+                                                           .setCreatedDate(now)
+                                                           .setUpdatedDate(now)
+                                                           .setEffectiveDate(now)
+                                                           .build();
+
+        paymentDao.savePaymentInfo(originalPaymentInfo);
+        PaymentInfo paymentInfo = paymentDao.getPaymentInfo(Arrays.asList(invoiceId.toString())).get(0);
+
+        Assert.assertEquals(originalPaymentInfo, paymentInfo);
+    }
+
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDaoWithEmbeddedDb.java b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDaoWithEmbeddedDb.java
index da48c03..19ca39d 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDaoWithEmbeddedDb.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDaoWithEmbeddedDb.java
@@ -26,28 +26,25 @@ import org.testng.annotations.Test;
 
 import com.ning.billing.dbi.MysqlTestingHelper;
 
-public class TestPaymentDaoWithEmbeddedDb
-{
-    @Test(enabled = true, groups = { "slow", "database" })
-    public class TestPaymentDaoWithEmbeddedDB extends TestPaymentDao {
-        private final MysqlTestingHelper helper = new MysqlTestingHelper();
-
-        @BeforeClass(alwaysRun = true)
-        public void startMysql() throws IOException {
-            final String paymentddl = IOUtils.toString(MysqlTestingHelper.class.getResourceAsStream("/com/ning/billing/payment/ddl.sql"));
-
-            helper.startMysql();
-            helper.initDb(paymentddl);
-        }
-
-        @AfterClass(alwaysRun = true)
-        public void stopMysql() {
-            helper.stopMysql();
-        }
-
-        @BeforeMethod(alwaysRun = true)
-        public void setUp() throws IOException {
-            dao = new DefaultPaymentDao(helper.getDBI());
-        }
+@Test(enabled = true, groups = { "slow", "database" })
+public class TestPaymentDaoWithEmbeddedDb extends TestPaymentDao {
+    private final MysqlTestingHelper helper = new MysqlTestingHelper();
+
+    @BeforeClass(alwaysRun = true)
+    public void startMysql() throws IOException {
+        final String paymentddl = IOUtils.toString(MysqlTestingHelper.class.getResourceAsStream("/com/ning/billing/payment/ddl.sql"));
+
+        helper.startMysql();
+        helper.initDb(paymentddl);
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void stopMysql() {
+        helper.stopMysql();
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws IOException {
+        paymentDao = new DefaultPaymentDao(helper.getDBI());
     }
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDaoWithMock.java b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDaoWithMock.java
index f5af240..6e31f90 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDaoWithMock.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDaoWithMock.java
@@ -25,6 +25,6 @@ import org.testng.annotations.Test;
 public class TestPaymentDaoWithMock extends TestPaymentDao {
     @BeforeMethod(alwaysRun = true)
     public void setUp() throws IOException {
-        dao = new MockPaymentDao();
+        paymentDao = new MockPaymentDao();
     }
 }
\ No newline at end of file
diff --git a/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithEmbeddedDb.java b/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithEmbeddedDb.java
index 9d9372c..31fdae3 100644
--- a/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithEmbeddedDb.java
+++ b/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithEmbeddedDb.java
@@ -20,11 +20,23 @@ import com.ning.billing.util.bus.Bus;
 import org.apache.commons.collections.MapUtils;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.inject.Provider;
+import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
+import com.ning.billing.mock.BrainDeadProxyFactory;
 import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
+import com.ning.billing.payment.setup.PaymentTestModuleWithMocks.MockProvider;
 import com.ning.billing.util.bus.InMemoryBus;
 
 public class PaymentTestModuleWithEmbeddedDb extends PaymentModule {
-    public PaymentTestModuleWithEmbeddedDb() {
+	public static class MockProvider implements Provider<EntitlementBillingApi> {
+		@Override
+		public EntitlementBillingApi get() {
+			return BrainDeadProxyFactory.createBrainDeadProxyFor(EntitlementBillingApi.class);
+		}
+		
+	}
+	
+	public PaymentTestModuleWithEmbeddedDb() {
         super(MapUtils.toProperties(ImmutableMap.of("killbill.payment.provider.default", "my-mock")));
     }
 
@@ -37,5 +49,6 @@ public class PaymentTestModuleWithEmbeddedDb extends PaymentModule {
     protected void configure() {
         super.configure();
         bind(Bus.class).to(InMemoryBus.class).asEagerSingleton();
+        bind(EntitlementBillingApi.class).toProvider( MockProvider.class );
     }
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithMocks.java b/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithMocks.java
index 144afa4..3dfc637 100644
--- a/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithMocks.java
+++ b/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithMocks.java
@@ -16,21 +16,32 @@
 
 package com.ning.billing.payment.setup;
 
-import com.ning.billing.util.bus.InMemoryBus;
 import org.apache.commons.collections.MapUtils;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.inject.Provider;
 import com.ning.billing.account.dao.AccountDao;
 import com.ning.billing.account.dao.MockAccountDao;
+import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
 import com.ning.billing.invoice.dao.InvoiceDao;
 import com.ning.billing.invoice.dao.MockInvoiceDao;
-
+import com.ning.billing.mock.BrainDeadProxyFactory;
 import com.ning.billing.payment.dao.MockPaymentDao;
 import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
 import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.bus.InMemoryBus;
 
 public class PaymentTestModuleWithMocks extends PaymentModule {
+	public static class MockProvider implements Provider<EntitlementBillingApi> {
+		@Override
+		public EntitlementBillingApi get() {
+			return BrainDeadProxyFactory.createBrainDeadProxyFor(EntitlementBillingApi.class);
+		}
+		
+	}
+	
+	
     public PaymentTestModuleWithMocks() {
         super(MapUtils.toProperties(ImmutableMap.of("killbill.payment.provider.default", "my-mock")));
     }
@@ -53,5 +64,6 @@ public class PaymentTestModuleWithMocks extends PaymentModule {
         bind(AccountDao.class).to(MockAccountDao.class);
         bind(MockInvoiceDao.class).asEagerSingleton();
         bind(InvoiceDao.class).to(MockInvoiceDao.class);
+        bind(EntitlementBillingApi.class).toProvider( MockProvider.class );
     }
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/TestHelper.java b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
index ac05da4..4430eb2 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestHelper.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
@@ -53,7 +53,7 @@ public class TestHelper {
                                                                      .firstNameLength(name.length())
                                                                      .externalKey(externalKey)
                                                                      .phone("123-456-7890")
-                                                                     .email("ccuser@example.com")
+                                                                     .email("ccuser" + RandomStringUtils.randomAlphanumeric(8) + "@example.com")
                                                                      .currency(Currency.USD)
                                                                      .billingCycleDay(1)
                                                                      .build();
diff --git a/payment/src/test/java/com/ning/billing/payment/TestNotifyInvoicePaymentApi.java b/payment/src/test/java/com/ning/billing/payment/TestNotifyInvoicePaymentApi.java
index 80de67f..e691f25 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestNotifyInvoicePaymentApi.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestNotifyInvoicePaymentApi.java
@@ -20,7 +20,6 @@ import static org.testng.Assert.assertNotNull;
 
 import java.util.UUID;
 
-import com.ning.billing.invoice.api.InvoicePayment;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Guice;
@@ -31,8 +30,10 @@ import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
 import com.ning.billing.account.glue.AccountModuleWithMocks;
 import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.invoice.api.InvoicePayment;
 import com.ning.billing.invoice.api.InvoicePaymentApi;
 import com.ning.billing.invoice.glue.InvoiceModuleWithMocks;
+import com.ning.billing.payment.api.PaymentAttempt;
 import com.ning.billing.payment.setup.PaymentTestModuleWithMocks;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.bus.Bus.EventBusException;
diff --git a/payment/src/test/java/com/ning/billing/payment/TestPaymentInvoiceIntegration.java b/payment/src/test/java/com/ning/billing/payment/TestPaymentInvoiceIntegration.java
index 8768117..c1e62d8 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestPaymentInvoiceIntegration.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestPaymentInvoiceIntegration.java
@@ -90,7 +90,7 @@ public class TestPaymentInvoiceIntegration {
 
     @AfterClass(alwaysRun = true)
     public void stopMysql() {
-        helper.stopMysql();
+        if (helper != null) helper.stopMysql();
     }
 
     @BeforeMethod(alwaysRun = true)
@@ -116,9 +116,11 @@ public class TestPaymentInvoiceIntegration {
 
     @AfterMethod(alwaysRun = true)
     public void tearDown() throws EventBusException {
-        eventBus.unregister(invoiceProcessor);
-        eventBus.unregister(paymentInfoReceiver);
-        eventBus.stop();
+        if (eventBus != null) {
+            eventBus.unregister(invoiceProcessor);
+            eventBus.unregister(paymentInfoReceiver);
+            eventBus.stop();
+        }
     }
 
     @Test
diff --git a/payment/src/test/java/com/ning/billing/payment/TestPaymentProvider.java b/payment/src/test/java/com/ning/billing/payment/TestPaymentProvider.java
index 2c0aa13..99870b7 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestPaymentProvider.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestPaymentProvider.java
@@ -18,7 +18,6 @@ package com.ning.billing.payment;
 
 import static com.jayway.awaitility.Awaitility.await;
 import static java.util.concurrent.TimeUnit.MINUTES;
-import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
@@ -90,23 +89,5 @@ public class TestPaymentProvider {
         assertFalse(paymentInfoReceiver.getProcessedPayments().isEmpty());
         assertTrue(paymentInfoReceiver.getErrors().isEmpty());
 
-        final PaymentInfo paymentInfo = paymentInfoReceiver.getProcessedPayments().get(0);
-        final PaymentInfoRequest paymentInfoRequest = new PaymentInfoRequest(account.getId(), paymentInfo.getPaymentId());
-
-        paymentInfoReceiver.clear();
-        eventBus.post(paymentInfoRequest);
-        await().atMost(5, MINUTES).until(new Callable<Boolean>() {
-            @Override
-            public Boolean call() throws Exception {
-                List<PaymentInfo> processedPayments = paymentInfoReceiver.getProcessedPayments();
-                List<PaymentError> errors = paymentInfoReceiver.getErrors();
-
-                return processedPayments.size() == 1 || errors.size() == 1;
-            }
-        });
-
-        assertFalse(paymentInfoReceiver.getProcessedPayments().isEmpty());
-        assertTrue(paymentInfoReceiver.getErrors().isEmpty());
-        assertEquals(paymentInfoReceiver.getProcessedPayments().get(0), paymentInfo);
     }
 }

pom.xml 4(+2 -2)

diff --git a/pom.xml b/pom.xml
index 179d4c3..c961bf5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,7 +17,7 @@
     <groupId>com.ning.billing</groupId>
     <artifactId>killbill</artifactId>
     <packaging>pom</packaging>
-    <version>0.1.6-SNAPSHOT</version>
+    <version>0.1.7-SNAPSHOT</version>
     <name>killbill</name>
     <description>Library for managing recurring subscriptions and the associated billing</description>
     <url>http://github.com/ning/killbill</url>
@@ -436,7 +436,7 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
-                <version>2.6</version>
+                <version>2.11</version>
                 <configuration>
                     <useManifestOnlyJar>false</useManifestOnlyJar>
                     <systemPropertyVariables>

util/pom.xml 2(+1 -1)

diff --git a/util/pom.xml b/util/pom.xml
index 68648ef..cb834db 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -13,7 +13,7 @@
     <parent>
         <groupId>com.ning.billing</groupId>
         <artifactId>killbill</artifactId>
-        <version>0.1.6-SNAPSHOT</version>
+        <version>0.1.7-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-util</artifactId>
diff --git a/util/src/main/java/com/ning/billing/util/customfield/dao/FieldStoreDao.java b/util/src/main/java/com/ning/billing/util/customfield/dao/FieldStoreDao.java
index 14c123a..a766982 100644
--- a/util/src/main/java/com/ning/billing/util/customfield/dao/FieldStoreDao.java
+++ b/util/src/main/java/com/ning/billing/util/customfield/dao/FieldStoreDao.java
@@ -48,8 +48,8 @@ public interface FieldStoreDao extends EntityCollectionDao<CustomField>, Transac
     @Override
     @SqlBatch(transactional=false)
     public void batchSaveFromTransaction(@Bind("objectId") final String objectId,
-                     @Bind("objectType") final String objectType,
-                     @CustomFieldBinder final List<CustomField> entities);
+                                         @Bind("objectType") final String objectType,
+                                         @CustomFieldBinder final List<CustomField> entities);
 
 
     public class CustomFieldMapper implements ResultSetMapper<CustomField> {
diff --git a/util/src/main/java/com/ning/billing/util/glue/TagStoreModule.java b/util/src/main/java/com/ning/billing/util/glue/TagStoreModule.java
index 10651be..a598275 100644
--- a/util/src/main/java/com/ning/billing/util/glue/TagStoreModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/TagStoreModule.java
@@ -18,8 +18,6 @@ package com.ning.billing.util.glue;
 
 import com.google.inject.AbstractModule;
 import com.ning.billing.util.api.TagDefinitionUserApi;
-import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.clock.DefaultClock;
 import com.ning.billing.util.tag.api.DefaultTagDefinitionUserApi;
 import com.ning.billing.util.tag.dao.DefaultTagDefinitionDao;
 import com.ning.billing.util.tag.dao.TagDefinitionDao;
@@ -28,13 +26,16 @@ import com.ning.billing.util.tag.dao.TagStoreSqlDao;
 
 public class TagStoreModule extends AbstractModule
 {
-    @Override
-    protected void configure()
-    {
+    protected void installDaos() {
         bind(TagDefinitionSqlDao.class).toProvider(TagDescriptionDaoProvider.class).asEagerSingleton();
         bind(TagDefinitionDao.class).to(DefaultTagDefinitionDao.class).asEagerSingleton();
         bind(TagStoreSqlDao.class).toProvider(TagStoreDaoProvider.class).asEagerSingleton();
+    }
+
+    @Override
+    protected void configure()
+    {
+        installDaos();
         bind(TagDefinitionUserApi.class).to(DefaultTagDefinitionUserApi.class).asEagerSingleton();
     }
-    
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index cc1ea28..a4f4c97 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -182,6 +182,11 @@ public abstract class NotificationQueueBase implements NotificationQueue {
         waitForNotificationStartCompletion();
     }
 
+    @Override
+    public String toString() {
+        return getFullQName();
+    }
+
     private void completedQueueStop() {
     	synchronized (this) {
     		stoppedComplete = true;
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
index 57ca679..baaf9cd 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
@@ -46,7 +46,7 @@ public class DefaultTagDefinitionDao implements TagDefinitionDao {
 
         // add control tag definitions
         for (ControlTagType controlTag : ControlTagType.values()) {
-            definitionList.add(new DefaultTagDefinition(controlTag.toString(), controlTag.getDescription(), null, null));
+            definitionList.add(new DefaultTagDefinition(controlTag.toString(), controlTag.getDescription(), null));
         }
 
         return definitionList;
@@ -69,7 +69,7 @@ public class DefaultTagDefinitionDao implements TagDefinitionDao {
             throw new TagDefinitionApiException(ErrorCode.TAG_DEFINITION_ALREADY_EXISTS, definitionName);
         }
 
-        TagDefinition definition = new DefaultTagDefinition(definitionName, description, createdBy, clock.getUTCNow());
+        TagDefinition definition = new DefaultTagDefinition(definitionName, description, createdBy);
         dao.create(definition);
         return definition;
     }
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/TagDefinitionSqlDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/TagDefinitionSqlDao.java
index f13daff..b2e7dab 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/TagDefinitionSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/TagDefinitionSqlDao.java
@@ -70,8 +70,7 @@ public interface TagDefinitionSqlDao extends EntityDao<TagDefinition> {
             String name = result.getString("name");
             String description = result.getString("description");
             String createdBy = result.getString("created_by");
-            DateTime creationDate = new DateTime(result.getTimestamp("creation_date"));
-            return new DefaultTagDefinition(id, name, description, createdBy, creationDate);
+            return new DefaultTagDefinition(id, name, description, createdBy);
         }
     }
 
@@ -86,7 +85,6 @@ public interface TagDefinitionSqlDao extends EntityDao<TagDefinition> {
                         q.bind("id", tagDefinition.getId().toString());
                         q.bind("name", tagDefinition.getName());
                         q.bind("createdBy", tagDefinition.getCreatedBy());
-                        q.bind("creationDate", tagDefinition.getCreationDate().toDate());
                         q.bind("description", tagDefinition.getDescription());
                     }
                 };
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/TagMapper.java b/util/src/main/java/com/ning/billing/util/tag/dao/TagMapper.java
index 1296083..fadf98c 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/TagMapper.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/TagMapper.java
@@ -45,10 +45,9 @@ public class TagMapper implements ResultSetMapper<Tag> {
         } catch (Throwable t) {
             String description = result.getString("tag_description");
             String createdBy = result.getString("created_by");
-            DateTime creationDate = new DateTime(result.getDate("creation_date"));
 
             UUID tagDefinitionId = UUID.fromString(result.getString("tag_definition_id"));
-            TagDefinition tagDefinition = new DefaultTagDefinition(tagDefinitionId, name, description, createdBy, creationDate);
+            TagDefinition tagDefinition = new DefaultTagDefinition(tagDefinitionId, name, description, createdBy);
             tag = new DescriptiveTag(id, tagDefinition, addedBy, addedDate);
         }
 
diff --git a/util/src/main/java/com/ning/billing/util/tag/DefaultTagDefinition.java b/util/src/main/java/com/ning/billing/util/tag/DefaultTagDefinition.java
index bfce619..482e56d 100644
--- a/util/src/main/java/com/ning/billing/util/tag/DefaultTagDefinition.java
+++ b/util/src/main/java/com/ning/billing/util/tag/DefaultTagDefinition.java
@@ -17,27 +17,24 @@
 package com.ning.billing.util.tag;
 
 import java.util.UUID;
-import org.joda.time.DateTime;
 import com.ning.billing.util.entity.EntityBase;
 
 public class DefaultTagDefinition extends EntityBase implements TagDefinition {
     private String name;
     private String description;
     private String createdBy;
-    private DateTime creationDate;
 
     public DefaultTagDefinition(String name, String description,
-                                String createdBy, DateTime creationDate) {
-        this(UUID.randomUUID(), name, description, createdBy, creationDate);
+                                String createdBy) {
+        this(UUID.randomUUID(), name, description, createdBy);
     }
 
     public DefaultTagDefinition(UUID id, String name, String description,
-                                String createdBy, DateTime creationDate) {
+                                String createdBy) {
         super(id);
         this.name = name;
         this.description = description;
         this.createdBy = createdBy;
-        this.creationDate = creationDate;
     }
     
     @Override
@@ -51,11 +48,6 @@ public class DefaultTagDefinition extends EntityBase implements TagDefinition {
     }
 
     @Override
-    public DateTime getCreationDate() {
-        return creationDate;
-    }
-
-    @Override
     public String getDescription() {
         return description;
     }
diff --git a/util/src/main/resources/com/ning/billing/util/customfield/dao/FieldStoreDao.sql.stg b/util/src/main/resources/com/ning/billing/util/customfield/dao/FieldStoreDao.sql.stg
index 9d3e96e..c9ee046 100644
--- a/util/src/main/resources/com/ning/billing/util/customfield/dao/FieldStoreDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/customfield/dao/FieldStoreDao.sql.stg
@@ -1,8 +1,8 @@
 group FieldStoreDao;
 
 batchSaveFromTransaction() ::= <<
-  INSERT INTO custom_fields(id, object_id, object_type, field_name, field_value)
-  VALUES (:id, :objectId, :objectType, :fieldName, :fieldValue)
+  INSERT INTO custom_fields(id, object_id, object_type, field_name, field_value, created_date, updated_date)
+  VALUES (:id, :objectId, :objectType, :fieldName, :fieldValue, NOW(), NOW())
   ON DUPLICATE KEY UPDATE
     field_value = :fieldValue;
 >>
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index a0ef302..4ae95d7 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -5,23 +5,80 @@ CREATE TABLE custom_fields (
   object_type varchar(30) NOT NULL,
   field_name varchar(30) NOT NULL,
   field_value varchar(255) NOT NULL,
+  created_date datetime NOT NULL,
+  updated_date datetime NOT NULL,
   PRIMARY KEY(id)
 ) ENGINE=innodb;
 CREATE INDEX custom_fields_object_id_object_type ON custom_fields(object_id, object_type);
 CREATE UNIQUE INDEX custom_fields_unique ON custom_fields(object_id, object_type, field_name);
 
+DROP TABLE IF EXISTS custom_field_history;
+CREATE TABLE custom_field_history (
+  id char(36) NOT NULL,
+  object_id char(36) NOT NULL,
+  object_type varchar(30) NOT NULL,
+  field_name varchar(30),
+  field_value varchar(255),
+  date datetime NOT NULL,
+  change_type char(6) NOT NULL
+) ENGINE=innodb;
+CREATE INDEX custom_field_history_object_id_object_type ON custom_fields(object_id, object_type);
+
+CREATE TRIGGER store_custom_field_history_on_insert AFTER INSERT ON custom_fields
+    FOR EACH ROW
+        INSERT INTO custom_field_history (id, object_id, object_type, field_name, field_value, date, change_type)
+        VALUES (NEW.id, NEW.object_id, NEW.object_type, NEW.field_name, NEW.field_value, NOW(), 'CREATE');
+
+CREATE TRIGGER store_custom_field_history_on_update AFTER UPDATE ON custom_fields
+    FOR EACH ROW
+        INSERT INTO custom_field_history (id, object_id, object_type, field_name, field_value, date, change_type)
+        VALUES (NEW.id, NEW.object_id, NEW.object_type, NEW.field_name, NEW.field_value, NOW(), 'UPDATE');
+
+CREATE TRIGGER store_custom_field_history_on_delete BEFORE DELETE ON custom_fields
+    FOR EACH ROW
+        INSERT INTO custom_field_history (id, object_id, object_type, field_name, field_value, date, change_type)
+        VALUES (OLD.id, OLD.object_id, OLD.object_type, NULL, NULL, NOW(), 'DELETE');
+
 DROP TABLE IF EXISTS tag_descriptions;
 DROP TABLE IF EXISTS tag_definitions;
 CREATE TABLE tag_definitions (
   id char(36) NOT NULL,
   name varchar(20) NOT NULL,
   created_by varchar(50) NOT NULL,
-  creation_date datetime NOT NULL,
   description varchar(200) NOT NULL,
+  created_date datetime NOT NULL,
+  updated_date datetime NOT NULL,
   PRIMARY KEY(id)
 ) ENGINE=innodb;
 CREATE UNIQUE INDEX tag_definitions_name ON tag_definitions(name);
 
+DROP TABLE IF EXISTS tag_definition_history;
+CREATE TABLE tag_definition_history (
+  id char(36) NOT NULL,
+  name varchar(20) NOT NULL,
+  created_by varchar(50),
+  description varchar(200),
+  date datetime NOT NULL,
+  change_type char(6) NOT NULL
+) ENGINE=innodb;
+CREATE INDEX tag_definition_history_id ON tag_definition_history(id);
+CREATE INDEX tag_definition_history_name ON tag_definition_history(name);
+
+CREATE TRIGGER tag_definition_history_after_insert AFTER INSERT ON tag_definition_history
+    FOR EACH ROW
+        INSERT INTO tag_definition_history (id, name, created_by, description, date, change_type)
+        VALUES (NEW.id, NEW.name, NEW.created_by, NEW.description, NOW(), 'CREATE');
+
+CREATE TRIGGER tag_definition_history_after_update AFTER UPDATE ON tag_definition_history
+    FOR EACH ROW
+        INSERT INTO tag_definition_history (id, name, created_by, description, date, change_type)
+        VALUES (NEW.id, NEW.name, NEW.created_by, NEW.description, NOW(), 'UPDATE');
+
+CREATE TRIGGER tag_definition_history_before_delete BEFORE DELETE ON tag_definition_history
+    FOR EACH ROW
+        INSERT INTO tag_definition_history (id, name, created_by, description, date, change_type)
+        VALUES (OLD.id, OLD.name, NULL, NULL, NOW(), 'DELETE');
+
 DROP TABLE IF EXISTS tags;
 CREATE TABLE tags (
   id char(36) NOT NULL,
@@ -35,6 +92,32 @@ CREATE TABLE tags (
 CREATE INDEX tags_by_object ON tags(object_id);
 CREATE UNIQUE INDEX tags_unique ON tags(tag_definition_name, object_id);
 
+DROP TABLE IF EXISTS tag_history;
+CREATE TABLE tag_history (
+  id char(36) NULL,
+  tag_definition_name varchar(20) NOT NULL,
+  object_id char(36) NOT NULL,
+  object_type varchar(30) NOT NULL,
+  date datetime NOT NULL,
+  change_type char(6) NOT NULL
+) ENGINE = innodb;
+CREATE INDEX tag_history_by_object ON tags(object_id);
+
+CREATE TRIGGER tag_history_after_insert AFTER INSERT ON tag_history
+    FOR EACH ROW
+        INSERT INTO tag_history (id, tag_definition_name, object_id, object_type, date, change_type)
+        VALUES (NEW.id, NEW.tag_definition_name, NEW.object_id, NEW.object_type, NOW(), 'CREATE');
+
+CREATE TRIGGER tag_history_after_update AFTER UPDATE ON tag_history
+    FOR EACH ROW
+        INSERT INTO tag_history (id, tag_definition_name, object_id, object_type, date, change_type)
+        VALUES (NEW.id, NEW.tag_definition_name, NEW.object_id, NEW.object_type, NOW(), 'UPDATE');
+
+CREATE TRIGGER tag_history_before_delete BEFORE DELETE ON tag_history
+    FOR EACH ROW
+        INSERT INTO tag_history (id, tag_definition_name, object_id, object_type, date, change_type)
+        VALUES (OLD.id, OLD.tag_definition_name, OLD.object_id, OLD.object_type, NOW(), 'DELETE');
+
 DROP TABLE IF EXISTS notifications;
 CREATE TABLE notifications (
     id int(11) unsigned NOT NULL AUTO_INCREMENT,
@@ -60,4 +143,4 @@ CREATE TABLE claimed_notifications (
     claimed_dt datetime NOT NULL,
     notification_id char(36) NOT NULL,
     PRIMARY KEY(id)
-) ENGINE=innodb;
+) ENGINE=innodb;
\ No newline at end of file
diff --git a/util/src/main/resources/com/ning/billing/util/tag/dao/TagDefinitionSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/tag/dao/TagDefinitionSqlDao.sql.stg
index 72268d0..333da26 100644
--- a/util/src/main/resources/com/ning/billing/util/tag/dao/TagDefinitionSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/tag/dao/TagDefinitionSqlDao.sql.stg
@@ -1,24 +1,32 @@
 group TagDefinitionDao;
 
+fields(prefix) ::= <<
+    <prefix>id,
+    <prefix>name,
+    <prefix>created_by,
+    <prefix>description,
+    <prefix>created_date,
+    <prefix>updated_date
+>>
+
 get() ::= <<
-  SELECT id, name, created_by, creation_date, description
+  SELECT <fields()>
   FROM tag_definitions;
 >>
 
 create() ::= <<
-  INSERT INTO tag_definitions(id, name, created_by, creation_date, description)
-  VALUES(:id, :name, :createdBy, :creationDate, :description);
+  INSERT INTO tag_definitions(<fields()>)
+  VALUES(:id, :name, :createdBy, :description, NOW(), NOW());
 >>
 
 update() ::= <<
   UPDATE tag_definitions
-  SET name = :name, created_by = :createdBy, creation_date = :creationDate,
-      description = :description)
+  SET name = :name, created_by = :createdBy, description = :description, updated_date = NOW())
   WHERE id = :id;
 >>
 
 load() ::= <<
-  SELECT id, name, created_by, creation_date, description
+  SELECT <fields()>
   FROM tag_definitions
   WHERE id = :id;
 >>
@@ -40,7 +48,7 @@ tagDefinitionUsageCount() ::= <<
 >>
 
 getByName() ::= <<
-  SELECT id, name, created_by, creation_date, description
+  SELECT <fields()>
   FROM tag_definitions
   WHERE name = :name;
 >>
diff --git a/util/src/main/resources/com/ning/billing/util/tag/dao/TagStoreSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/tag/dao/TagStoreSqlDao.sql.stg
index 9d7ce5c..4cad04c 100644
--- a/util/src/main/resources/com/ning/billing/util/tag/dao/TagStoreSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/tag/dao/TagStoreSqlDao.sql.stg
@@ -13,7 +13,7 @@ load() ::= <<
            td.id AS tag_definition_id,
            t.tag_definition_name AS tag_definition_name,
            td.description AS tag_description,
-           td.created_by, td.creation_date
+           td.created_by
     FROM tags t
     LEFT JOIN tag_definitions td ON t.tag_definition_name = td.name
     WHERE t.object_id = :objectId AND t.object_type = :objectType;
diff --git a/util/src/test/java/com/ning/billing/dbi/MysqlTestingHelper.java b/util/src/test/java/com/ning/billing/dbi/MysqlTestingHelper.java
index 5ee7e88..93814f3 100644
--- a/util/src/test/java/com/ning/billing/dbi/MysqlTestingHelper.java
+++ b/util/src/test/java/com/ning/billing/dbi/MysqlTestingHelper.java
@@ -46,7 +46,7 @@ public class MysqlTestingHelper
 
     private static final String DB_NAME = "test_killbill";
     private static final String USERNAME = "root";
-    private static final String PASSWORD = "";
+    private static final String PASSWORD = "root";
 
     private File dbDir;
     private MysqldResource mysqldResource;
@@ -77,7 +77,6 @@ public class MysqlTestingHelper
 
     public void startMysql() throws IOException
     {
-
         if (isUsingLocalInstance()) {
             return;
         }
@@ -90,6 +89,7 @@ public class MysqlTestingHelper
         final Map<String, String> dbOpts = new HashMap<String, String>();
         dbOpts.put(MysqldResourceI.PORT, Integer.toString(port));
         dbOpts.put(MysqldResourceI.INITIALIZE_USER, "true");
+        dbOpts.put(MysqldResourceI.INITIALIZE_PASSWORD, PASSWORD);
         dbOpts.put(MysqldResourceI.INITIALIZE_USER_NAME, USERNAME);
         dbOpts.put("default-time-zone", "+00:00");
 
diff --git a/util/src/test/java/com/ning/billing/mock/BrainDeadProxyFactory.java b/util/src/test/java/com/ning/billing/mock/BrainDeadProxyFactory.java
new file mode 100644
index 0000000..31dcd30
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/mock/BrainDeadProxyFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.mock;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BrainDeadProxyFactory {
+	private static final Logger log = LoggerFactory.getLogger(BrainDeadProxyFactory.class);
+
+	public static interface ZombieControl {
+		
+		public ZombieControl addResult(String method, Object result);
+		
+		public ZombieControl clearResults();
+		
+	}
+
+	@SuppressWarnings("unchecked")
+	public static <T> T createBrainDeadProxyFor(final Class<T> clazz) {
+		return (T) Proxy.newProxyInstance(clazz.getClassLoader(),
+                new Class[] { clazz , ZombieControl.class},
+                new InvocationHandler() {
+					private Map<String,Object> results = new HashMap<String,Object>();
+			
+					@Override
+					public Object invoke(Object proxy, Method method, Object[] args)
+							throws Throwable {
+						
+						if(method.getDeclaringClass().equals(ZombieControl.class)) {
+							if(method.getName().equals("addResult")) {
+								results.put((String) args[0], args[1]);
+								return proxy;
+							} else if(method.getName().equals("clearResults")) {
+								results.clear();
+								return proxy;
+							}
+
+						} else {
+							
+							Object result = results.get(method.getName());
+							if (result != null) {
+								return result;
+							} else {
+								log.error(String.format("No result for Method: '%s' on Class '%s'",method.getName(), method.getDeclaringClass().getName()));
+								throw new UnsupportedOperationException();
+							}
+						}
+						return (Void) null;
+					}
+				});
+	}
+}
diff --git a/util/src/test/java/com/ning/billing/util/clock/ClockMock.java b/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
index ad3e5d3..8787cd1 100644
--- a/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
+++ b/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
@@ -150,7 +150,7 @@ public class ClockMock extends DefaultClock {
     }
 
     private DateTime adjustFromAbsolute(DateTime input) {
-        return input.plus(deltaFromRealityMs);
+        return truncateMs(input.plus(deltaFromRealityMs));
     }
 
 }
diff --git a/util/src/test/java/com/ning/billing/util/tag/dao/MockTagDefinitionDao.java b/util/src/test/java/com/ning/billing/util/tag/dao/MockTagDefinitionDao.java
new file mode 100644
index 0000000..425020e
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/tag/dao/MockTagDefinitionDao.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.tag.dao;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.ning.billing.util.api.TagDefinitionApiException;
+import com.ning.billing.util.tag.DefaultTagDefinition;
+import com.ning.billing.util.tag.TagDefinition;
+
+public class MockTagDefinitionDao implements TagDefinitionDao {
+    private final Map<String, TagDefinition> tags = new ConcurrentHashMap<String, TagDefinition>();
+
+    @Override
+    public List<TagDefinition> getTagDefinitions() {
+        return new ArrayList<TagDefinition>(tags.values());
+    }
+
+    @Override
+    public TagDefinition getByName(String definitionName) {
+        return tags.get(definitionName);
+    }
+
+    @Override
+    public TagDefinition create(String definitionName, String description, String createdBy) throws TagDefinitionApiException {
+        TagDefinition tag = new DefaultTagDefinition(UUID.randomUUID(), definitionName, description, createdBy);
+
+        tags.put(definitionName, tag);
+        return tag;
+    }
+
+    @Override
+    public void deleteAllTagsForDefinition(String definitionName) throws TagDefinitionApiException {
+        tags.remove(definitionName);
+    }
+
+    @Override
+    public void deleteTagDefinition(String definitionName) throws TagDefinitionApiException {
+        tags.remove(definitionName);
+    }
+}