killbill-uncached

Changes

analytics/pom.xml 10(+10 -0)

Details

analytics/pom.xml 10(+10 -0)

diff --git a/analytics/pom.xml b/analytics/pom.xml
index 10f94d4..ad46c31 100644
--- a/analytics/pom.xml
+++ b/analytics/pom.xml
@@ -89,6 +89,16 @@
         </dependency>
         <dependency>
             <groupId>com.ning.billing</groupId>
+            <artifactId>killbill-invoice</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.ning.billing</groupId>
+            <artifactId>killbill-payment</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.ning.billing</groupId>
             <artifactId>killbill-util</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java b/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
index 1735062..ba9dc2a 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
@@ -22,22 +22,22 @@ import com.ning.billing.account.api.AccountApiException;
 import com.ning.billing.account.api.AccountChangeNotification;
 import com.ning.billing.account.api.AccountCreationNotification;
 import com.ning.billing.entitlement.api.user.SubscriptionTransition;
+import com.ning.billing.invoice.api.InvoiceCreationNotification;
+import com.ning.billing.payment.api.PaymentError;
+import com.ning.billing.payment.api.PaymentInfo;
 
-public class AnalyticsListener
-{
+public class AnalyticsListener {
     private final BusinessSubscriptionTransitionRecorder bstRecorder;
     private final BusinessAccountRecorder bacRecorder;
 
     @Inject
-    public AnalyticsListener(final BusinessSubscriptionTransitionRecorder bstRecorder, final BusinessAccountRecorder bacRecorder)
-    {
+    public AnalyticsListener(final BusinessSubscriptionTransitionRecorder bstRecorder, final BusinessAccountRecorder bacRecorder) {
         this.bstRecorder = bstRecorder;
         this.bacRecorder = bacRecorder;
     }
 
     @Subscribe
-    public void handleSubscriptionTransitionChange(final SubscriptionTransition event) throws AccountApiException
-    {
+    public void handleSubscriptionTransitionChange(final SubscriptionTransition event) throws AccountApiException {
         switch (event.getTransitionType()) {
             case MIGRATE_ENTITLEMENT:
                 // TODO do nothing for now
@@ -68,18 +68,31 @@ public class AnalyticsListener
     }
 
     @Subscribe
-    public void handleAccountCreation(final AccountCreationNotification event)
-    {
+    public void handleAccountCreation(final AccountCreationNotification event) {
         bacRecorder.accountCreated(event.getData());
     }
 
     @Subscribe
-    public void handleAccountChange(final AccountChangeNotification event)
-    {
+    public void handleAccountChange(final AccountChangeNotification event) {
         if (!event.hasChanges()) {
             return;
         }
 
         bacRecorder.accountUpdated(event.getAccountId(), event.getChangedFields());
     }
+
+    @Subscribe
+    public void handleInvoice(final InvoiceCreationNotification event) {
+        bacRecorder.accountUpdated(event.getAccountId());
+    }
+
+    @Subscribe
+    public void handlePaymentInfo(final PaymentInfo paymentInfo) {
+        bacRecorder.accountUpdated(paymentInfo);
+    }
+
+    @Subscribe
+    public void handlePaymentError(final PaymentError paymentError) {
+        // TODO - we can't tie the error back to an account yet
+    }
 }
diff --git a/analytics/src/main/java/com/ning/billing/analytics/BusinessAccountRecorder.java b/analytics/src/main/java/com/ning/billing/analytics/BusinessAccountRecorder.java
index f7081c7..6832a2b 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/BusinessAccountRecorder.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/BusinessAccountRecorder.java
@@ -22,57 +22,169 @@ import com.ning.billing.account.api.AccountData;
 import com.ning.billing.account.api.AccountUserApi;
 import com.ning.billing.account.api.ChangedField;
 import com.ning.billing.analytics.dao.BusinessAccountDao;
+import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.invoice.api.InvoiceUserApi;
+import com.ning.billing.payment.api.PaymentApi;
+import com.ning.billing.payment.api.PaymentAttempt;
+import com.ning.billing.payment.api.PaymentInfo;
 import com.ning.billing.util.tag.Tag;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-public class BusinessAccountRecorder
-{
+public class BusinessAccountRecorder {
     private static final Logger log = LoggerFactory.getLogger(BusinessAccountRecorder.class);
 
     private final BusinessAccountDao dao;
     private final AccountUserApi accountApi;
+    private final InvoiceUserApi invoiceUserApi;
+    private final PaymentApi paymentApi;
 
     @Inject
-    public BusinessAccountRecorder(final BusinessAccountDao dao, final AccountUserApi accountApi)
-    {
+    public BusinessAccountRecorder(final BusinessAccountDao dao, final AccountUserApi accountApi, final InvoiceUserApi invoiceUserApi, final PaymentApi paymentApi) {
         this.dao = dao;
         this.accountApi = accountApi;
+        this.invoiceUserApi = invoiceUserApi;
+        this.paymentApi = paymentApi;
     }
 
-    public void accountCreated(final AccountData data)
-    {
+    public void accountCreated(final AccountData data) {
         final Account account = accountApi.getAccountByKey(data.getExternalKey());
+        final BusinessAccount bac = createBusinessAccountFromAccount(account);
 
+        log.info("ACCOUNT CREATION " + bac);
+        dao.createAccount(bac);
+    }
+
+    /**
+     * Notification handler for Account changes
+     *
+     * @param accountId     account id changed
+     * @param changedFields list of changed fields
+     */
+    public void accountUpdated(final UUID accountId, final List<ChangedField> changedFields) {
+        // None of the fields updated interest us so far - see DefaultAccountChangeNotification
+        // TODO We'll need notifications for tags changes eventually
+    }
+
+    /**
+     * Notification handler for Payment creations
+     *
+     * @param paymentInfo payment object (from the payment plugin)
+     */
+    public void accountUpdated(final PaymentInfo paymentInfo) {
+        final PaymentAttempt paymentAttempt = paymentApi.getPaymentAttemptForPaymentId(paymentInfo.getPaymentId());
+        if (paymentAttempt == null) {
+            return;
+        }
+
+        final Account account = accountApi.getAccountById(paymentAttempt.getAccountId());
+        if (account == null) {
+            return;
+        }
+
+        accountUpdated(account.getId());
+    }
+
+    /**
+     * Notification handler for Invoice creations
+     *
+     * @param accountId account id associated with the created invoice
+     */
+    public void accountUpdated(final UUID accountId) {
+        final Account account = accountApi.getAccountById(accountId);
+
+        if (account == null) {
+            log.warn("Couldn't find account {}", accountId);
+            return;
+        }
+
+        BusinessAccount bac = dao.getAccount(account.getExternalKey());
+        if (bac == null) {
+            bac = createBusinessAccountFromAccount(account);
+            log.info("ACCOUNT CREATION " + bac);
+            dao.createAccount(bac);
+        } else {
+            updateBusinessAccountFromAccount(account, bac);
+            log.info("ACCOUNT UPDATE " + bac);
+            dao.saveAccount(bac);
+        }
+    }
+
+    private BusinessAccount createBusinessAccountFromAccount(final Account account) {
         final List<String> tags = new ArrayList<String>();
         for (final Tag tag : account.getTagList()) {
             tags.add(tag.getTagDefinitionName());
         }
 
-        // TODO Need payment and invoice api to fill most fields
         final BusinessAccount bac = new BusinessAccount(
-            account.getExternalKey(),
-            null, // TODO
-            tags,
-            null, // TODO
-            null, // TODO
-            null, // TODO
-            null, // TODO
-            null, // TODO
-            null // TODO
+                account.getExternalKey(),
+                invoiceUserApi.getAccountBalance(account.getId()),
+                tags,
+                // These fields will be updated below
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
         );
+        updateBusinessAccountFromAccount(account, bac);
 
-        log.info("ACCOUNT CREATION " + bac);
-        dao.createAccount(bac);
+        return bac;
     }
 
-    public void accountUpdated(final UUID accountId, final List<ChangedField> changedFields)
-    {
-        // None of the fields updated interest us so far - see DefaultAccountChangeNotification
-        // TODO We'll need notifications for tags changes eventually
+    private void updateBusinessAccountFromAccount(final Account account, final BusinessAccount bac) {
+        DateTime lastInvoiceDate = null;
+        BigDecimal totalInvoiceBalance = BigDecimal.ZERO;
+        String lastPaymentStatus = null;
+        String paymentMethod = null;
+        String creditCardType = null;
+        String billingAddressCountry = null;
+
+        // Retrieve invoices information
+        final List<Invoice> invoices = invoiceUserApi.getInvoicesByAccount(account.getId());
+        if (invoices != null && invoices.size() > 0) {
+            final List<String> invoiceIds = new ArrayList<String>();
+            for (final Invoice invoice : invoices) {
+                invoiceIds.add(invoice.getId().toString());
+                totalInvoiceBalance = totalInvoiceBalance.add(invoice.getBalance());
+
+                if (lastInvoiceDate == null || invoice.getInvoiceDate().isAfter(lastInvoiceDate)) {
+                    lastInvoiceDate = invoice.getInvoiceDate();
+                }
+            }
+
+            // Retrieve payments information for these invoices
+            DateTime lastPaymentDate = null;
+            final List<PaymentInfo> payments = paymentApi.getPaymentInfo(invoiceIds);
+            if (payments != null) {
+                for (final PaymentInfo payment : payments) {
+                    // Use the last payment method/type/country as the default one for the account
+                    if (lastPaymentDate == null || payment.getCreatedDate().isAfter(lastPaymentDate)) {
+                        lastPaymentDate = payment.getCreatedDate();
+
+                        lastPaymentStatus = payment.getStatus();
+                        paymentMethod = payment.getPaymentMethod();
+                        creditCardType = payment.getCardType();
+                        billingAddressCountry = payment.getCardCountry();
+                    }
+                }
+            }
+        }
+
+        bac.setLastPaymentStatus(lastPaymentStatus);
+        bac.setPaymentMethod(paymentMethod);
+        bac.setCreditCardType(creditCardType);
+        bac.setBillingAddressCountry(billingAddressCountry);
+        bac.setLastInvoiceDate(lastInvoiceDate);
+        bac.setTotalInvoiceBalance(totalInvoiceBalance);
+
+        bac.setBalance(invoiceUserApi.getAccountBalance(account.getId()));
     }
 }
diff --git a/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java b/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java
index 25496b5..5c7ffae 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.analytics;
 
+import com.ning.billing.invoice.glue.InvoiceModule;
+import com.ning.billing.payment.setup.PaymentModule;
 import org.skife.jdbi.v2.IDBI;
 import com.ning.billing.account.glue.AccountModule;
 import com.ning.billing.analytics.setup.AnalyticsModule;
@@ -41,6 +43,8 @@ public class AnalyticsTestModule extends AnalyticsModule
         install(new CatalogModule());
         install(new BusModule());
         install(new EntitlementModule());
+        install(new InvoiceModule());
+        install(new PaymentModule());
         install(new ClockModule());
         install(new TagStoreModule());
         install(new NotificationQueueModule());
diff --git a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
index 7153f48..06d954f 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
@@ -47,14 +47,23 @@ import com.ning.billing.entitlement.api.user.SubscriptionTransition;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.user.ApiEventType;
+import com.ning.billing.invoice.api.InvoiceCreationNotification;
+import com.ning.billing.invoice.api.user.DefaultInvoiceCreationNotification;
+import com.ning.billing.invoice.dao.InvoiceDao;
+import com.ning.billing.invoice.model.DefaultInvoice;
+import com.ning.billing.invoice.model.FixedPriceInvoiceItem;
+import com.ning.billing.payment.api.PaymentAttempt;
+import com.ning.billing.payment.api.PaymentInfo;
+import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.tag.DescriptiveTag;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.clock.DefaultClock;
 import com.ning.billing.util.tag.DefaultTagDefinition;
+import com.ning.billing.util.tag.DescriptiveTag;
 import com.ning.billing.util.tag.Tag;
 import com.ning.billing.util.tag.dao.TagDefinitionSqlDao;
 import org.apache.commons.io.IOUtils;
 import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -62,21 +71,28 @@ import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
 import static org.testng.Assert.fail;
 
 @Guice(modules = AnalyticsTestModule.class)
-public class TestAnalyticsService
-{
+public class TestAnalyticsService {
     private static final UUID ID = UUID.randomUUID();
     private static final String KEY = "12345";
     private static final String ACCOUNT_KEY = "pierre-12345";
+    private static final Currency ACCOUNT_CURRENCY = Currency.EUR;
     private static final DefaultTagDefinition TAG_ONE = new DefaultTagDefinition("batch20", "something", "pierre");
     private static final DefaultTagDefinition TAG_TWO = new DefaultTagDefinition("awesome", "something", "pierre");
+    private static final BigDecimal INVOICE_AMOUNT = BigDecimal.valueOf(1243.11);
+    private static final String PAYMENT_METHOD = "Paypal";
+    private static final String CARD_COUNTRY = "France";
+
+    private final Clock clock = new DefaultClock();
 
     @Inject
     private AccountUserApi accountApi;
@@ -88,6 +104,12 @@ public class TestAnalyticsService
     private TagDefinitionSqlDao tagDao;
 
     @Inject
+    private InvoiceDao invoiceDao;
+
+    @Inject
+    private PaymentDao paymentDao;
+
+    @Inject
     private DefaultAnalyticsService service;
 
     @Inject
@@ -106,50 +128,54 @@ public class TestAnalyticsService
     private BusinessSubscriptionTransition expectedTransition;
 
     private AccountCreationNotification accountCreationNotification;
+    private InvoiceCreationNotification invoiceCreationNotification;
+    private PaymentInfo paymentInfoNotification;
 
     @BeforeClass(alwaysRun = true)
-    public void startMysql() throws IOException, ClassNotFoundException, SQLException, EntitlementUserApiException
-    {
+    public void startMysql() throws IOException, ClassNotFoundException, SQLException, EntitlementUserApiException {
         // Killbill generic setup
         setupBusAndMySQL();
 
         tagDao.create(TAG_ONE);
         tagDao.create(TAG_TWO);
 
-        final MockAccount account = new MockAccount(UUID.randomUUID(), ACCOUNT_KEY, Currency.USD);
+        final MockAccount account = new MockAccount(UUID.randomUUID(), ACCOUNT_KEY, ACCOUNT_CURRENCY);
         try {
-            List<Tag> tags = new ArrayList<Tag>();
-            tags.add(new DescriptiveTag(TAG_ONE, "pierre", new DateTime(DateTimeZone.UTC)));
-            tags.add(new DescriptiveTag(TAG_TWO, "pierre", new DateTime(DateTimeZone.UTC)));
+            final List<Tag> tags = new ArrayList<Tag>();
+            tags.add(new DescriptiveTag(TAG_ONE, "pierre", clock.getUTCNow()));
+            tags.add(new DescriptiveTag(TAG_TWO, "pierre", clock.getUTCNow()));
 
             final Account storedAccount = accountApi.createAccount(account, null, tags);
 
             // Create events for the bus and expected results
             createSubscriptionTransitionEvent(storedAccount);
             createAccountCreationEvent(storedAccount);
+            createInvoiceAndPaymentCreationEvents(storedAccount);
         } catch (Throwable t) {
             fail("Initializing accounts failed.", t);
         }
     }
 
-    private void setupBusAndMySQL() throws IOException
-    {
+    private void setupBusAndMySQL() throws IOException {
         bus.start();
 
         final String analyticsDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/analytics/ddl.sql"));
         final String accountDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/account/ddl.sql"));
         final String entitlementDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/entitlement/ddl.sql"));
+        final String invoiceDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/invoice/ddl.sql"));
+        final String paymentDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/payment/ddl.sql"));
         final String utilDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
 
         helper.startMysql();
         helper.initDb(analyticsDdl);
         helper.initDb(accountDdl);
         helper.initDb(entitlementDdl);
+        helper.initDb(invoiceDdl);
+        helper.initDb(paymentDdl);
         helper.initDb(utilDdl);
     }
 
-    private void createSubscriptionTransitionEvent(final Account account) throws EntitlementUserApiException
-    {
+    private void createSubscriptionTransitionEvent(final Account account) throws EntitlementUserApiException {
         final SubscriptionBundle bundle = entitlementApi.createBundleForAccount(account.getId(), KEY);
 
         // Verify we correctly initialized the account subsystem
@@ -161,64 +187,86 @@ public class TestAnalyticsService
         final Plan plan = new MockPlan("platinum-monthly", product);
         final PlanPhase phase = new MockPhase(PhaseType.EVERGREEN, plan, MockDuration.UNLIMITED(), 25.95);
         final UUID subscriptionId = UUID.randomUUID();
-        final DateTime effectiveTransitionTime = new DateTime(DateTimeZone.UTC);
-        final DateTime requestedTransitionTime = new DateTime(DateTimeZone.UTC);
+        final DateTime effectiveTransitionTime = clock.getUTCNow();
+        final DateTime requestedTransitionTime = clock.getUTCNow();
         final String priceList = "something";
 
         transition = new SubscriptionTransitionData(
-            ID,
-            subscriptionId,
-            bundle.getId(),
-            EntitlementEvent.EventType.API_USER,
-            ApiEventType.CREATE,
-            requestedTransitionTime,
-            effectiveTransitionTime,
-            null,
-            null,
-            null,
-            null,
-            Subscription.SubscriptionState.ACTIVE,
-            plan,
-            phase,
-            priceList,
-            true
+                ID,
+                subscriptionId,
+                bundle.getId(),
+                EntitlementEvent.EventType.API_USER,
+                ApiEventType.CREATE,
+                requestedTransitionTime,
+                effectiveTransitionTime,
+                null,
+                null,
+                null,
+                null,
+                Subscription.SubscriptionState.ACTIVE,
+                plan,
+                phase,
+                priceList,
+                true
         );
         expectedTransition = new BusinessSubscriptionTransition(
-            ID,
-            KEY,
-            ACCOUNT_KEY,
-            requestedTransitionTime,
-            BusinessSubscriptionEvent.subscriptionCreated(plan),
-            null,
-            new BusinessSubscription(priceList, plan, phase, Currency.USD, effectiveTransitionTime, Subscription.SubscriptionState.ACTIVE, subscriptionId, bundle.getId())
+                ID,
+                KEY,
+                ACCOUNT_KEY,
+                requestedTransitionTime,
+                BusinessSubscriptionEvent.subscriptionCreated(plan),
+                null,
+                new BusinessSubscription(priceList, plan, phase, ACCOUNT_CURRENCY, effectiveTransitionTime, Subscription.SubscriptionState.ACTIVE, subscriptionId, bundle.getId())
         );
     }
 
-    private void createAccountCreationEvent(final Account account)
-    {
+    private void createAccountCreationEvent(final Account account) {
         accountCreationNotification = new DefaultAccountCreationEvent(account);
     }
 
+    private void createInvoiceAndPaymentCreationEvents(final Account account) {
+        final DefaultInvoice invoice = new DefaultInvoice(account.getId(), clock.getUTCNow(), ACCOUNT_CURRENCY, clock);
+        final FixedPriceInvoiceItem invoiceItem = new FixedPriceInvoiceItem(
+                UUID.randomUUID(), invoice.getId(), UUID.randomUUID(), "somePlan", "somePhase", clock.getUTCNow(), clock.getUTCNow().plusDays(1),
+                INVOICE_AMOUNT, ACCOUNT_CURRENCY, clock.getUTCNow(), clock.getUTCNow()
+        );
+        invoice.addInvoiceItem(invoiceItem);
+
+        invoiceDao.create(invoice);
+        Assert.assertEquals(invoiceDao.getInvoicesByAccount(account.getId()).size(), 1);
+        Assert.assertEquals(invoiceDao.getInvoicesByAccount(account.getId()).get(0).getInvoiceItems().size(), 1);
+
+        // It doesn't really matter what the events contain - the listener will go back to the db
+        invoiceCreationNotification = new DefaultInvoiceCreationNotification(invoice.getId(), account.getId(),
+                INVOICE_AMOUNT, ACCOUNT_CURRENCY, clock.getUTCNow());
+
+        paymentInfoNotification = new PaymentInfo.Builder().setPaymentId(UUID.randomUUID().toString()).setPaymentMethod(PAYMENT_METHOD).setCardCountry(CARD_COUNTRY).build();
+        final PaymentAttempt paymentAttempt = new PaymentAttempt(UUID.randomUUID(), invoice.getId(), account.getId(), BigDecimal.TEN,
+                ACCOUNT_CURRENCY, clock.getUTCNow(), clock.getUTCNow(), paymentInfoNotification.getPaymentId(), 1, clock.getUTCNow().plusDays(1));
+        paymentDao.createPaymentAttempt(paymentAttempt);
+        paymentDao.savePaymentInfo(paymentInfoNotification);
+        Assert.assertEquals(paymentDao.getPaymentInfo(Arrays.asList(invoice.getId().toString())).size(), 1);
+    }
+
     @AfterClass(alwaysRun = true)
-    public void stopMysql()
-    {
+    public void stopMysql() {
         helper.stopMysql();
     }
 
     @Test(groups = "slow")
-    public void testRegisterForNotifications() throws Exception
-    {
+    public void testRegisterForNotifications() throws Exception {
         // Make sure the service has been instantiated
         Assert.assertEquals(service.getName(), "analytics-service");
 
         // Test the bus and make sure we can register our service
         try {
             service.registerForNotifications();
-        }
-        catch (Throwable t) {
+        } catch (Throwable t) {
             Assert.fail("Unable to start the bus or service! " + t);
         }
 
+        Assert.assertNull(accountDao.getAccount(ACCOUNT_KEY));
+
         // Send events and wait for the async part...
         bus.post(transition);
         bus.post(accountCreationNotification);
@@ -232,11 +280,24 @@ public class TestAnalyticsService
         Assert.assertTrue(accountDao.getAccount(ACCOUNT_KEY).getTags().indexOf(TAG_ONE.getName()) != -1);
         Assert.assertTrue(accountDao.getAccount(ACCOUNT_KEY).getTags().indexOf(TAG_TWO.getName()) != -1);
 
+        // Test invoice integration - the account creation notification has triggered a BAC update
+        Assert.assertTrue(accountDao.getAccount(ACCOUNT_KEY).getTotalInvoiceBalance().compareTo(INVOICE_AMOUNT) == 0);
+
+        // Post the same invoice event again - the invoice balance shouldn't change
+        bus.post(invoiceCreationNotification);
+        Thread.sleep(1000);
+        Assert.assertTrue(accountDao.getAccount(ACCOUNT_KEY).getTotalInvoiceBalance().compareTo(INVOICE_AMOUNT) == 0);
+
+        // Test payment integration - the fields have already been populated, just make sure the code is exercised
+        bus.post(paymentInfoNotification);
+        Thread.sleep(1000);
+        Assert.assertEquals(accountDao.getAccount(ACCOUNT_KEY).getPaymentMethod(), PAYMENT_METHOD);
+        Assert.assertEquals(accountDao.getAccount(ACCOUNT_KEY).getBillingAddressCountry(), CARD_COUNTRY);
+
         // Test the shutdown sequence
         try {
             bus.stop();
-        }
-        catch (Throwable t) {
+        } catch (Throwable t) {
             Assert.fail("Unable to stop the bus!");
         }
     }
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java b/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
index fd84927..57c76a0 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
@@ -39,7 +39,7 @@ public interface PaymentApi {
 
     List<Either<PaymentError, PaymentInfo>> createPayment(String accountKey, List<String> invoiceIds);
     List<Either<PaymentError, PaymentInfo>> createPayment(Account account, List<String> invoiceIds);
-    Either<PaymentError, PaymentInfo> createPayment(UUID paymentAttemptId);
+    Either<PaymentError, PaymentInfo> createPaymentForPaymentAttempt(UUID paymentAttemptId);
 
     List<Either<PaymentError, PaymentInfo>> createRefund(Account account, List<String> invoiceIds); //TODO
 
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentAttempt.java b/api/src/main/java/com/ning/billing/payment/api/PaymentAttempt.java
index fcccf9b..b099739 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentAttempt.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentAttempt.java
@@ -60,7 +60,7 @@ public class PaymentAttempt {
         this.invoiceDate = invoiceDate;
         this.paymentAttemptDate = paymentAttemptDate == null ? new DateTime(DateTimeZone.UTC) : paymentAttemptDate;
         this.paymentId = paymentId;
-        this.retryCount = retryCount;
+        this.retryCount = retryCount == null ? 0 : retryCount;
         this.nextRetryDate = nextRetryDate;
         this.createdDate = createdDate == null ? new DateTime(DateTimeZone.UTC) : createdDate;
         this.updatedDate = updatedDate == null ? new DateTime(DateTimeZone.UTC) : updatedDate;
@@ -285,26 +285,32 @@ public class PaymentAttempt {
     }
 
     @Override
-    public boolean equals(Object obj) {
-        if (getClass() == obj.getClass()) {
-            PaymentAttempt other = (PaymentAttempt)obj;
-            if (obj == other) {
-                return true;
-            }
-            else {
-                return Objects.equal(paymentAttemptId, other.paymentAttemptId) &&
-                       Objects.equal(invoiceId, other.invoiceId) &&
-                       Objects.equal(accountId, other.accountId) &&
-                       Objects.equal(amount, other.amount) &&
-                       Objects.equal(currency, other.currency) &&
-                       Objects.equal(invoiceDate, other.invoiceDate) &&
-                       Objects.equal(paymentAttemptDate, other.paymentAttemptDate) &&
-                       Objects.equal(retryCount, other.retryCount) &&
-                       Objects.equal(nextRetryDate, other.nextRetryDate) &&
-                       Objects.equal(paymentId, other.paymentId);
-            }
-        }
-        return false;
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        final PaymentAttempt that = (PaymentAttempt) o;
+
+        if (accountId != null ? !accountId.equals(that.accountId) : that.accountId != null) return false;
+        if (amount != null ? !(amount.compareTo(that.amount) == 0) : that.amount != null) return false;
+        if (createdDate != null ? !(getUnixTimestamp(createdDate) == getUnixTimestamp(that.createdDate)) : that.createdDate != null) return false;
+        if (currency != that.currency) return false;
+        if (invoiceDate != null ? !(getUnixTimestamp(invoiceDate) == getUnixTimestamp(that.invoiceDate)) : that.invoiceDate != null) return false;
+        if (invoiceId != null ? !invoiceId.equals(that.invoiceId) : that.invoiceId != null) return false;
+        if (nextRetryDate != null ? !(getUnixTimestamp(nextRetryDate) == getUnixTimestamp(that.nextRetryDate)) : that.nextRetryDate != null)
+            return false;
+        if (paymentAttemptDate != null ? !(getUnixTimestamp(paymentAttemptDate) == getUnixTimestamp(that.paymentAttemptDate)) : that.paymentAttemptDate != null)
+            return false;
+        if (paymentAttemptId != null ? !paymentAttemptId.equals(that.paymentAttemptId) : that.paymentAttemptId != null)
+            return false;
+        if (paymentId != null ? !paymentId.equals(that.paymentId) : that.paymentId != null) return false;
+        if (retryCount != null ? !retryCount.equals(that.retryCount) : that.retryCount != null) return false;
+        if (updatedDate != null ? !(getUnixTimestamp(updatedDate) == getUnixTimestamp(that.updatedDate)) : that.updatedDate != null) return false;
+
+        return true;
     }
 
+    private static long getUnixTimestamp(final DateTime dateTime) {
+        return dateTime.getMillis() / 1000;
+    }
 }
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentError.java b/api/src/main/java/com/ning/billing/payment/api/PaymentError.java
index f1474c8..d9b8c49 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentError.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentError.java
@@ -15,6 +15,8 @@
  */
 
 package com.ning.billing.payment.api;
+import java.util.UUID;
+
 import org.codehaus.jackson.annotate.JsonTypeInfo;
 import org.codehaus.jackson.annotate.JsonTypeInfo.Id;
 
@@ -24,15 +26,21 @@ import com.ning.billing.util.bus.BusEvent;
 public class PaymentError implements BusEvent {
     private final String type;
     private final String message;
+    private final UUID accountId;
+    private final UUID invoiceId;
 
-    public PaymentError(PaymentError src) {
+    public PaymentError(PaymentError src, UUID accountId, UUID invoiceId) {
         this.type = src.type;
         this.message = src.message;
+        this.accountId = accountId;
+        this.invoiceId = invoiceId;
     }
 
-    public PaymentError(String type, String message) {
+    public PaymentError(String type, String message, UUID accountId, UUID invoiceId) {
         this.type = type;
         this.message = message;
+        this.accountId = accountId;
+        this.invoiceId = invoiceId;
     }
 
     public String getType() {
@@ -43,10 +51,20 @@ public class PaymentError implements BusEvent {
         return message;
     }
 
+    public UUID getInvoiceId() {
+        return invoiceId;
+    }
+
+    public UUID getAccountId() {
+        return accountId;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;
+        result = prime * result + ((accountId == null) ? 0 : accountId.hashCode());
+        result = prime * result + ((invoiceId == null) ? 0 : invoiceId.hashCode());
         result = prime * result + ((message == null) ? 0 : message.hashCode());
         result = prime * result + ((type == null) ? 0 : type.hashCode());
         return result;
@@ -61,6 +79,18 @@ public class PaymentError implements BusEvent {
         if (getClass() != obj.getClass())
             return false;
         PaymentError other = (PaymentError) obj;
+        if (accountId == null) {
+            if (other.accountId != null)
+                return false;
+        }
+        else if (!accountId.equals(other.accountId))
+            return false;
+        if (invoiceId == null) {
+            if (other.invoiceId != null)
+                return false;
+        }
+        else if (!invoiceId.equals(other.invoiceId))
+            return false;
         if (message == null) {
             if (other.message != null)
                 return false;
@@ -78,6 +108,7 @@ public class PaymentError implements BusEvent {
 
     @Override
     public String toString() {
-        return "PaymentError [type=" + type + ", message=" + message + "]";
+        return "PaymentError [type=" + type + ", message=" + message + ", accountId=" + accountId + ", invoiceId=" + invoiceId + "]";
     }
+
 }
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentInfo.java b/api/src/main/java/com/ning/billing/payment/api/PaymentInfo.java
index 943c5f7..b0fbca6 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentInfo.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentInfo.java
@@ -310,31 +310,34 @@ public class PaymentInfo implements BusEvent {
     }
 
     @Override
-    public boolean equals(Object obj) {
-        if (getClass() == obj.getClass()) {
-            PaymentInfo other = (PaymentInfo)obj;
-            if (obj == other) {
-                return true;
-            }
-            else {
-                return Objects.equal(amount, other.amount) &&
-                       Objects.equal(bankIdentificationNumber, other.bankIdentificationNumber) &&
-                       Objects.equal(paymentId, other.paymentId) &&
-                       Objects.equal(paymentNumber, other.paymentNumber) &&
-                       Objects.equal(referenceId, other.referenceId) &&
-                       Objects.equal(refundAmount, other.refundAmount) &&
-                       Objects.equal(status, other.status) &&
-                       Objects.equal(type, other.type) &&
-                       Objects.equal(paymentMethodId, other.paymentMethodId) &&
-                       Objects.equal(paymentMethod, other.paymentMethod) &&
-                       Objects.equal(cardType, other.cardType) &&
-                       Objects.equal(cardCoutry, other.cardCoutry) &&
-                       Objects.equal(effectiveDate, other.effectiveDate) &&
-                       Objects.equal(createdDate, other.createdDate) &&
-                       Objects.equal(updatedDate, other.updatedDate);
-            }
-        }
-        return false;
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        final PaymentInfo that = (PaymentInfo) o;
+
+        if (amount != null ? !(amount.compareTo(that.amount) == 0) : that.amount != null) return false;
+        if (bankIdentificationNumber != null ? !bankIdentificationNumber.equals(that.bankIdentificationNumber) : that.bankIdentificationNumber != null)
+            return false;
+        if (cardCoutry != null ? !cardCoutry.equals(that.cardCoutry) : that.cardCoutry != null) return false;
+        if (cardType != null ? !cardType.equals(that.cardType) : that.cardType != null) return false;
+        if (createdDate != null ? !(getUnixTimestamp(createdDate) == getUnixTimestamp(that.createdDate)) : that.createdDate != null) return false;
+        if (effectiveDate != null ? !(getUnixTimestamp(effectiveDate) == getUnixTimestamp(that.effectiveDate)) : that.effectiveDate != null)
+            return false;
+        if (paymentId != null ? !paymentId.equals(that.paymentId) : that.paymentId != null) return false;
+        if (paymentMethod != null ? !paymentMethod.equals(that.paymentMethod) : that.paymentMethod != null)
+            return false;
+        if (paymentMethodId != null ? !paymentMethodId.equals(that.paymentMethodId) : that.paymentMethodId != null)
+            return false;
+        if (paymentNumber != null ? !paymentNumber.equals(that.paymentNumber) : that.paymentNumber != null)
+            return false;
+        if (referenceId != null ? !referenceId.equals(that.referenceId) : that.referenceId != null) return false;
+        if (refundAmount != null ? !refundAmount.equals(that.refundAmount) : that.refundAmount != null) return false;
+        if (status != null ? !status.equals(that.status) : that.status != null) return false;
+        if (type != null ? !type.equals(that.type) : that.type != null) return false;
+        if (updatedDate != null ? !(getUnixTimestamp(updatedDate) == getUnixTimestamp(that.updatedDate)) : that.updatedDate != null) return false;
+
+        return true;
     }
 
     @Override
@@ -342,4 +345,7 @@ public class PaymentInfo implements BusEvent {
         return "PaymentInfo [paymentId=" + paymentId + ", amount=" + amount + ", refundAmount=" + refundAmount + ", paymentNumber=" + paymentNumber + ", bankIdentificationNumber=" + bankIdentificationNumber + ", status=" + status + ", type=" + type + ", referenceId=" + referenceId + ", paymentMethodId=" + paymentMethodId + ", paymentMethod=" + paymentMethod + ", cardType=" + cardType + ", cardCountry=" + cardCoutry + ", effectiveDate=" + effectiveDate + ", createdDate=" + createdDate + ", updatedDate=" + updatedDate + "]";
     }
 
+    private static long getUnixTimestamp(final DateTime dateTime) {
+        return dateTime.getMillis() / 1000;
+    }
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/FixedPriceInvoiceItemSqlDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/FixedPriceInvoiceItemSqlDao.java
index 330784e..9fc593c 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/FixedPriceInvoiceItemSqlDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/FixedPriceInvoiceItemSqlDao.java
@@ -88,6 +88,8 @@ public interface FixedPriceInvoiceItemSqlDao extends EntityDao<InvoiceItem> {
                         q.bind("endDate", item.getEndDate().toDate());
                         q.bind("amount", item.getAmount());
                         q.bind("currency", item.getCurrency().toString());
+                        q.bind("createdDate", item.getCreatedDate().toDate());
+                        q.bind("updatedDate", item.getUpdatedDate().toDate());
                     }
                 };
             }
@@ -106,9 +108,11 @@ public interface FixedPriceInvoiceItemSqlDao extends EntityDao<InvoiceItem> {
             DateTime endDate = new DateTime(result.getTimestamp("end_date"));
             BigDecimal amount = result.getBigDecimal("amount");
             Currency currency = Currency.valueOf(result.getString("currency"));
+            DateTime createdDate = new DateTime(result.getTimestamp("created_date"));
+            DateTime updatedDate = new DateTime(result.getTimestamp("updated_date"));
 
             return new FixedPriceInvoiceItem(id, invoiceId, subscriptionId, planName, phaseName,
-                                            startDate, endDate, amount, currency);
+                                            startDate, endDate, amount, currency, createdDate, updatedDate);
         }
     }
 }
\ No newline at end of file
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/RecurringInvoiceItemSqlDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/RecurringInvoiceItemSqlDao.java
index 3409cfe..053eca4 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/RecurringInvoiceItemSqlDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/RecurringInvoiceItemSqlDao.java
@@ -65,7 +65,7 @@ public interface RecurringInvoiceItemSqlDao extends EntityDao<InvoiceItem> {
     @SqlUpdate
     void update(@RecurringInvoiceItemBinder final InvoiceItem invoiceItem);
 
-    @SqlBatch(transactional=false)
+    @SqlBatch(transactional = false)
     void batchCreateFromTransaction(@RecurringInvoiceItemBinder final List<InvoiceItem> items);
 
     @BindingAnnotation(RecurringInvoiceItemBinder.InvoiceItemBinderFactory.class)
@@ -89,6 +89,8 @@ public interface RecurringInvoiceItemSqlDao extends EntityDao<InvoiceItem> {
                         q.bind("rate", item.getRate());
                         q.bind("currency", item.getCurrency().toString());
                         q.bind("reversedItemId", (item.getReversedItemId() == null) ? null : item.getReversedItemId().toString());
+                        q.bind("createdDate", item.getCreatedDate().toDate());
+                        q.bind("updatedDate", item.getUpdatedDate().toDate());
                     }
                 };
             }
@@ -110,9 +112,11 @@ public interface RecurringInvoiceItemSqlDao extends EntityDao<InvoiceItem> {
             Currency currency = Currency.valueOf(result.getString("currency"));
             String reversedItemString = result.getString("reversed_item_id");
             UUID reversedItemId = (reversedItemString == null) ? null : UUID.fromString(reversedItemString);
+            DateTime createdDate = new DateTime(result.getTimestamp("created_date"));
+            DateTime updatedDate = new DateTime(result.getTimestamp("updated_date"));
 
             return new RecurringInvoiceItem(id, invoiceId, subscriptionId, planName, phaseName, startDate, endDate,
-                                           amount, rate, currency, reversedItemId);
+                    amount, rate, currency, reversedItemId, createdDate, updatedDate);
         }
     }
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java
index a8a1ecc..2d58ecf 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/model/DefaultInvoiceGenerator.java
@@ -16,28 +16,27 @@
 
 package com.ning.billing.invoice.model;
 
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-
 import com.google.inject.Inject;
 import com.ning.billing.ErrorCode;
 import com.ning.billing.catalog.api.BillingPeriod;
 import com.ning.billing.catalog.api.CatalogApiException;
+import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.catalog.api.Duration;
 import com.ning.billing.catalog.api.InternationalPrice;
-import com.ning.billing.entitlement.api.billing.BillingModeType;
-import com.ning.billing.invoice.api.InvoiceApiException;
-import org.joda.time.DateTime;
-import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.entitlement.api.billing.BillingEvent;
+import com.ning.billing.entitlement.api.billing.BillingModeType;
 import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.invoice.api.InvoiceApiException;
 import com.ning.billing.invoice.api.InvoiceItem;
 import com.ning.billing.util.clock.Clock;
+import org.joda.time.DateTime;
 
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
 import javax.annotation.Nullable;
 
 public class DefaultInvoiceGenerator implements InvoiceGenerator {
@@ -84,7 +83,7 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
             }
         }
 
-        if (proposedItems == null || proposedItems.size()  == 0) {
+        if (proposedItems == null || proposedItems.size() == 0) {
             return null;
         } else {
             invoice.addInvoiceItems(proposedItems);
@@ -92,7 +91,7 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
         }
     }
 
-   /*
+    /*
     * removes all matching items from both submitted collections
     */
     private void removeDuplicatedInvoiceItems(final List<InvoiceItem> proposedItems,
@@ -188,10 +187,10 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
                         BigDecimal amount = itemDatum.getNumberOfCycles().multiply(rate).setScale(NUMBER_OF_DECIMALS, ROUNDING_MODE);
 
                         RecurringInvoiceItem recurringItem = new RecurringInvoiceItem(invoiceId, thisEvent.getSubscription().getId(),
-                                                                                      thisEvent.getPlan().getName(),
-                                                                                      thisEvent.getPlanPhase().getName(),
-                                                                                      itemDatum.getStartDate(), itemDatum.getEndDate(),
-                                                                                      amount, rate, currency);
+                                thisEvent.getPlan().getName(),
+                                thisEvent.getPlanPhase().getName(),
+                                itemDatum.getStartDate(), itemDatum.getEndDate(),
+                                amount, rate, currency, clock.getUTCNow(), clock.getUTCNow());
                         items.add(recurringItem);
                     }
                 }
@@ -223,8 +222,9 @@ public class DefaultInvoiceGenerator implements InvoiceGenerator {
                     DateTime endDate = duration.addToDateTime(thisEvent.getEffectiveDate());
                     BigDecimal fixedPrice = thisEvent.getFixedPrice().getPrice(currency);
                     fixedPriceInvoiceItem = new FixedPriceInvoiceItem(invoiceId, thisEvent.getSubscription().getId(),
-                                                                      thisEvent.getPlan().getName(), thisEvent.getPlanPhase().getName(),
-                                                                      thisEvent.getEffectiveDate(), endDate, fixedPrice, currency);
+                            thisEvent.getPlan().getName(), thisEvent.getPlanPhase().getName(),
+                            thisEvent.getEffectiveDate(), endDate, fixedPrice, currency,
+                            clock.getUTCNow(), clock.getUTCNow());
                 } catch (CatalogApiException e) {
                     throw new InvoiceApiException(e, ErrorCode.CAT_NO_PRICE_FOR_CURRENCY, currency.toString());
                 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/model/FixedPriceInvoiceItem.java b/invoice/src/main/java/com/ning/billing/invoice/model/FixedPriceInvoiceItem.java
index 2265abd..f53c1a3 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/model/FixedPriceInvoiceItem.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/model/FixedPriceInvoiceItem.java
@@ -24,12 +24,16 @@ import java.math.BigDecimal;
 import java.util.UUID;
 
 public class FixedPriceInvoiceItem extends InvoiceItemBase {
-    public FixedPriceInvoiceItem(UUID invoiceId, UUID subscriptionId, String planName, String phaseName, DateTime startDate, DateTime endDate, BigDecimal amount, Currency currency) {
-        super(invoiceId, subscriptionId, planName, phaseName, startDate, endDate, amount, currency);
+    public FixedPriceInvoiceItem(UUID invoiceId, UUID subscriptionId, String planName, String phaseName,
+                                 DateTime startDate, DateTime endDate, BigDecimal amount, Currency currency,
+                                 DateTime createdDate, DateTime updatedDate) {
+        super(invoiceId, subscriptionId, planName, phaseName, startDate, endDate, amount, currency, createdDate, updatedDate);
     }
 
-    public FixedPriceInvoiceItem(UUID id, UUID invoiceId, UUID subscriptionId, String planName, String phaseName, DateTime startDate, DateTime endDate, BigDecimal amount, Currency currency) {
-        super(id, invoiceId, subscriptionId, planName, phaseName, startDate, endDate, amount, currency);
+    public FixedPriceInvoiceItem(UUID id, UUID invoiceId, UUID subscriptionId, String planName, String phaseName,
+                                 DateTime startDate, DateTime endDate, BigDecimal amount, Currency currency,
+                                 DateTime createdDate, DateTime updatedDate) {
+        super(id, invoiceId, subscriptionId, planName, phaseName, startDate, endDate, amount, currency, createdDate, updatedDate);
     }
 
     @Override
diff --git a/invoice/src/main/java/com/ning/billing/invoice/model/InvoiceItemBase.java b/invoice/src/main/java/com/ning/billing/invoice/model/InvoiceItemBase.java
index 45aa26d..924ed0c 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/model/InvoiceItemBase.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/model/InvoiceItemBase.java
@@ -33,15 +33,19 @@ public abstract class InvoiceItemBase implements InvoiceItem {
     protected final DateTime endDate;
     protected final BigDecimal amount;
     protected final Currency currency;
+    protected final DateTime createdDate;
+    protected final DateTime updatedDate;
 
     public InvoiceItemBase(UUID invoiceId, UUID subscriptionId, String planName, String phaseName,
-                           DateTime startDate, DateTime endDate, BigDecimal amount, Currency currency) {
+                           DateTime startDate, DateTime endDate, BigDecimal amount, Currency currency,
+                           DateTime createdDate, DateTime updatedDate) {
         this(UUID.randomUUID(), invoiceId, subscriptionId, planName, phaseName,
-             startDate, endDate, amount, currency);
+                startDate, endDate, amount, currency, createdDate, updatedDate);
     }
 
     public InvoiceItemBase(UUID id, UUID invoiceId, UUID subscriptionId, String planName, String phaseName,
-                           DateTime startDate, DateTime endDate, BigDecimal amount, Currency currency) {
+                           DateTime startDate, DateTime endDate, BigDecimal amount, Currency currency,
+                           DateTime createdDate, DateTime updatedDate) {
         this.id = id;
         this.invoiceId = invoiceId;
         this.subscriptionId = subscriptionId;
@@ -51,6 +55,16 @@ public abstract class InvoiceItemBase implements InvoiceItem {
         this.endDate = endDate;
         this.amount = amount;
         this.currency = currency;
+        this.createdDate = createdDate;
+        this.updatedDate = updatedDate;
+    }
+
+    public DateTime getCreatedDate() {
+        return createdDate;
+    }
+
+    public DateTime getUpdatedDate() {
+        return updatedDate;
     }
 
     @Override
diff --git a/invoice/src/main/java/com/ning/billing/invoice/model/RecurringInvoiceItem.java b/invoice/src/main/java/com/ning/billing/invoice/model/RecurringInvoiceItem.java
index 93ef474..db73939 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/model/RecurringInvoiceItem.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/model/RecurringInvoiceItem.java
@@ -30,24 +30,27 @@ public class RecurringInvoiceItem extends InvoiceItemBase {
     public RecurringInvoiceItem(UUID invoiceId, UUID subscriptionId, String planName, String phaseName,
                                 DateTime startDate, DateTime endDate,
                                 BigDecimal amount, BigDecimal rate,
-                                Currency currency) {
+                                Currency currency,
+                                DateTime createdDate, DateTime updatedDate) {
         this(UUID.randomUUID(), invoiceId, subscriptionId, planName, phaseName, startDate, endDate,
-             amount, rate, currency);
+                amount, rate, currency, createdDate, updatedDate);
     }
 
     public RecurringInvoiceItem(UUID invoiceId, UUID subscriptionId, String planName, String phaseName,
                                 DateTime startDate, DateTime endDate,
                                 BigDecimal amount, BigDecimal rate,
-                                Currency currency, UUID reversedItemId) {
+                                Currency currency, UUID reversedItemId,
+                                DateTime createdDate, DateTime updatedDate) {
         this(UUID.randomUUID(), invoiceId, subscriptionId, planName, phaseName, startDate, endDate,
-             amount, rate, currency, reversedItemId);
+                amount, rate, currency, reversedItemId, createdDate, updatedDate);
     }
 
     public RecurringInvoiceItem(UUID id, UUID invoiceId, UUID subscriptionId, String planName, String phaseName,
                                 DateTime startDate, DateTime endDate,
                                 BigDecimal amount, BigDecimal rate,
-                                Currency currency) {
-        super(id, invoiceId, subscriptionId, planName, phaseName, startDate, endDate, amount, currency);
+                                Currency currency,
+                                DateTime createdDate, DateTime updatedDate) {
+        super(id, invoiceId, subscriptionId, planName, phaseName, startDate, endDate, amount, currency, createdDate, updatedDate);
 
         this.rate = rate;
         this.reversedItemId = null;
@@ -56,8 +59,9 @@ public class RecurringInvoiceItem extends InvoiceItemBase {
     public RecurringInvoiceItem(UUID id, UUID invoiceId, UUID subscriptionId, String planName, String phaseName,
                                 DateTime startDate, DateTime endDate,
                                 BigDecimal amount, BigDecimal rate,
-                                Currency currency, UUID reversedItemId) {
-        super(id, invoiceId, subscriptionId, planName, phaseName, startDate, endDate, amount, currency);
+                                Currency currency, UUID reversedItemId,
+                                DateTime createdDate, DateTime updatedDate) {
+        super(id, invoiceId, subscriptionId, planName, phaseName, startDate, endDate, amount, currency, createdDate, updatedDate);
 
         this.rate = rate;
         this.reversedItemId = reversedItemId;
@@ -67,7 +71,7 @@ public class RecurringInvoiceItem extends InvoiceItemBase {
     public InvoiceItem asCredit() {
         BigDecimal amountNegated = amount == null ? null : amount.negate();
         return new RecurringInvoiceItem(invoiceId, subscriptionId, planName, phaseName, startDate, endDate,
-                                        amountNegated, rate, currency, id);
+                amountNegated, rate, currency, id, createdDate, updatedDate);
     }
 
     @Override
@@ -89,8 +93,12 @@ public class RecurringInvoiceItem extends InvoiceItemBase {
 
     @Override
     public int compareTo(InvoiceItem item) {
-        if (item == null) {return -1;}
-        if (!(item instanceof RecurringInvoiceItem)) {return -1;}
+        if (item == null) {
+            return -1;
+        }
+        if (!(item instanceof RecurringInvoiceItem)) {
+            return -1;
+        }
 
         RecurringInvoiceItem that = (RecurringInvoiceItem) item;
 
diff --git a/invoice/src/main/resources/com/ning/billing/invoice/dao/FixedPriceInvoiceItemSqlDao.sql.stg b/invoice/src/main/resources/com/ning/billing/invoice/dao/FixedPriceInvoiceItemSqlDao.sql.stg
index 79fd3db..c1d14ef 100644
--- a/invoice/src/main/resources/com/ning/billing/invoice/dao/FixedPriceInvoiceItemSqlDao.sql.stg
+++ b/invoice/src/main/resources/com/ning/billing/invoice/dao/FixedPriceInvoiceItemSqlDao.sql.stg
@@ -10,7 +10,8 @@ fields(prefix) ::= <<
   <prefix>end_date,
   <prefix>amount,
   <prefix>currency,
-  <prefix>created_date
+  <prefix>created_date,
+  <prefix>updated_date
 >>
 
 getById() ::= <<
@@ -41,19 +42,19 @@ getInvoiceItemsBySubscription() ::= <<
 create() ::= <<
   INSERT INTO fixed_invoice_items(<fields()>)
   VALUES(:id, :invoiceId, :subscriptionId, :planName, :phaseName,
-         :startDate, :endDate, :amount, :currency, NOW());
+         :startDate, :endDate, :amount, :currency, :createdDate, :updatedDate);
 >>
 
 batchCreateFromTransaction() ::= <<
   INSERT INTO fixed_invoice_items(<fields()>)
   VALUES(:id, :invoiceId, :subscriptionId, :planName, :phaseName,
-         :startDate, :endDate, :amount, :currency, NOW());
+         :startDate, :endDate, :amount, :currency, :createdDate, :updatedDate);
 >>
 
 update() ::= <<
   UPDATE fixed_invoice_items
   SET invoice_id = :invoiceId, subscription_id = :subscriptionId, plan_name = :planName, phase_name = :phaseName,
-      start_date = :startDate, end_date = :endDate, amount = :amount, currency = :currency
+      start_date = :startDate, end_date = :endDate, amount = :amount, currency = :currency, updated_date = :updatedDate
   WHERE id = :id;
 >>
 
diff --git a/invoice/src/main/resources/com/ning/billing/invoice/dao/RecurringInvoiceItemSqlDao.sql.stg b/invoice/src/main/resources/com/ning/billing/invoice/dao/RecurringInvoiceItemSqlDao.sql.stg
index a5291e8..0fcbae1 100644
--- a/invoice/src/main/resources/com/ning/billing/invoice/dao/RecurringInvoiceItemSqlDao.sql.stg
+++ b/invoice/src/main/resources/com/ning/billing/invoice/dao/RecurringInvoiceItemSqlDao.sql.stg
@@ -12,7 +12,8 @@ fields(prefix) ::= <<
   <prefix>rate,
   <prefix>currency,
   <prefix>reversed_item_id,
-  <prefix>created_date
+  <prefix>created_date,
+  <prefix>updated_date
 >>
 
 getById() ::= <<
@@ -43,20 +44,20 @@ getInvoiceItemsBySubscription() ::= <<
 create() ::= <<
   INSERT INTO recurring_invoice_items(<fields()>)
   VALUES(:id, :invoiceId, :subscriptionId, :planName, :phaseName, :startDate, :endDate,
-         :amount, :rate, :currency, :reversedItemId, NOW());
+         :amount, :rate, :currency, :reversedItemId, :createdDate, :updatedDate);
 >>
 
 batchCreateFromTransaction() ::= <<
   INSERT INTO recurring_invoice_items(<fields()>)
   VALUES(:id, :invoiceId, :subscriptionId, :planName, :phaseName, :startDate, :endDate,
-         :amount, :rate, :currency, :reversedItemId, NOW());
+         :amount, :rate, :currency, :reversedItemId, :createdDate, :updatedDate);
 >>
 
 update() ::= <<
   UPDATE recurring_invoice_items
   SET invoice_id = :invoiceId, subscription_id = :subscriptionId, plan_name = :planName, phase_name = :phaseName,
       start_date = :startDate, end_date = :endDate, amount = :amount, rate = :rate, currency = :currency,
-      reversed_item_id = :reversedItemId
+      reversed_item_id = :reversedItemId, updated_date = :updatedDate
   WHERE id = :id;
 >>
 
diff --git a/invoice/src/main/resources/com/ning/billing/invoice/ddl.sql b/invoice/src/main/resources/com/ning/billing/invoice/ddl.sql
index e210044..b9d3faa 100644
--- a/invoice/src/main/resources/com/ning/billing/invoice/ddl.sql
+++ b/invoice/src/main/resources/com/ning/billing/invoice/ddl.sql
@@ -13,6 +13,7 @@ CREATE TABLE recurring_invoice_items (
   currency char(3) NOT NULL,
   reversed_item_id char(36),
   created_date datetime NOT NULL,
+  updated_date datetime NOT NULL,
   PRIMARY KEY(id)
 ) ENGINE=innodb;
 CREATE INDEX recurring_invoice_items_subscription_id ON recurring_invoice_items(subscription_id ASC);
@@ -30,6 +31,7 @@ CREATE TABLE fixed_invoice_items (
   amount numeric(10,4) NULL,
   currency char(3) NOT NULL,
   created_date datetime NOT NULL,
+  updated_date datetime NOT NULL,
   PRIMARY KEY(id)
 ) ENGINE=innodb;
 CREATE INDEX fixed_invoice_items_subscription_id ON fixed_invoice_items(subscription_id ASC);
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTests.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTests.java
index 711665a..4f936fd 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTests.java
@@ -35,10 +35,10 @@ import com.ning.billing.invoice.api.InvoicePayment;
 import com.ning.billing.invoice.model.BillingEventSet;
 import com.ning.billing.invoice.model.DefaultInvoice;
 import com.ning.billing.invoice.model.DefaultInvoiceGenerator;
-import com.ning.billing.invoice.model.RecurringInvoiceItem;
 import com.ning.billing.invoice.model.DefaultInvoicePayment;
 import com.ning.billing.invoice.model.InvoiceGenerator;
 import com.ning.billing.invoice.model.InvoiceItemList;
+import com.ning.billing.invoice.model.RecurringInvoiceItem;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.DefaultClock;
 import org.joda.time.DateTime;
@@ -50,8 +50,10 @@ import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 
-import static org.testng.Assert.*;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
 @Test(groups = {"invoicing", "invoicing-invoiceDao"})
 public class InvoiceDaoTests extends InvoiceDaoTestBase {
@@ -85,7 +87,8 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         UUID subscriptionId = UUID.randomUUID();
         DateTime startDate = new DateTime(2010, 1, 1, 0, 0, 0, 0);
         DateTime endDate = new DateTime(2010, 4, 1, 0, 0, 0, 0);
-        InvoiceItem invoiceItem = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", startDate, endDate, new BigDecimal("21.00"), new BigDecimal("7.00"), Currency.USD);
+        InvoiceItem invoiceItem = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", startDate, endDate,
+                new BigDecimal("21.00"), new BigDecimal("7.00"), Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         invoice.addInvoiceItem(invoiceItem);
         invoiceDao.create(invoice);
 
@@ -182,7 +185,8 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         BigDecimal rate = new BigDecimal("9.0");
         BigDecimal amount = rate.multiply(new BigDecimal("3.0"));
 
-        RecurringInvoiceItem item = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", targetDate, endDate, amount, rate, Currency.USD);
+        RecurringInvoiceItem item = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", targetDate, endDate,
+                amount, rate, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         invoice.addInvoiceItem(item);
         invoiceDao.create(invoice);
 
@@ -245,7 +249,7 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
     }
 
     private List<Invoice> getInvoicesDueForPaymentAttempt(final List<Invoice> invoices, final DateTime date) {
-        List<Invoice> invoicesDue= new ArrayList<Invoice>();
+        List<Invoice> invoicesDue = new ArrayList<Invoice>();
 
         for (final Invoice invoice : invoices) {
             if (invoice.isDueForPayment(date, NUMBER_OF_DAY_BETWEEN_RETRIES)) {
@@ -260,10 +264,14 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
     public void testGetInvoicesBySubscription() {
         UUID accountId = UUID.randomUUID();
 
-        UUID subscriptionId1 = UUID.randomUUID(); BigDecimal rate1 = new BigDecimal("17.0");
-        UUID subscriptionId2 = UUID.randomUUID(); BigDecimal rate2 = new BigDecimal("42.0");
-        UUID subscriptionId3 = UUID.randomUUID(); BigDecimal rate3 = new BigDecimal("3.0");
-        UUID subscriptionId4 = UUID.randomUUID(); BigDecimal rate4 = new BigDecimal("12.0");
+        UUID subscriptionId1 = UUID.randomUUID();
+        BigDecimal rate1 = new BigDecimal("17.0");
+        UUID subscriptionId2 = UUID.randomUUID();
+        BigDecimal rate2 = new BigDecimal("42.0");
+        UUID subscriptionId3 = UUID.randomUUID();
+        BigDecimal rate3 = new BigDecimal("3.0");
+        UUID subscriptionId4 = UUID.randomUUID();
+        BigDecimal rate4 = new BigDecimal("12.0");
 
         DateTime targetDate = new DateTime(2011, 5, 23, 0, 0, 0, 0);
 
@@ -277,16 +285,20 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         DateTime startDate = new DateTime(2011, 3, 1, 0, 0, 0, 0);
         DateTime endDate = startDate.plusMonths(1);
 
-        RecurringInvoiceItem item1 = new RecurringInvoiceItem(invoiceId1, subscriptionId1, "test plan", "test A", startDate, endDate, rate1, rate1, Currency.USD);
+        RecurringInvoiceItem item1 = new RecurringInvoiceItem(invoiceId1, subscriptionId1, "test plan", "test A", startDate, endDate,
+                rate1, rate1, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item1);
 
-        RecurringInvoiceItem item2 = new RecurringInvoiceItem(invoiceId1, subscriptionId2, "test plan", "test B", startDate, endDate, rate2, rate2, Currency.USD);
+        RecurringInvoiceItem item2 = new RecurringInvoiceItem(invoiceId1, subscriptionId2, "test plan", "test B", startDate, endDate,
+                rate2, rate2, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item2);
 
-        RecurringInvoiceItem item3 = new RecurringInvoiceItem(invoiceId1, subscriptionId3, "test plan", "test C", startDate, endDate, rate3, rate3, Currency.USD);
+        RecurringInvoiceItem item3 = new RecurringInvoiceItem(invoiceId1, subscriptionId3, "test plan", "test C", startDate, endDate,
+                rate3, rate3, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item3);
 
-        RecurringInvoiceItem item4 = new RecurringInvoiceItem(invoiceId1, subscriptionId4, "test plan", "test D", startDate, endDate, rate4, rate4, Currency.USD);
+        RecurringInvoiceItem item4 = new RecurringInvoiceItem(invoiceId1, subscriptionId4, "test plan", "test D", startDate, endDate,
+                rate4, rate4, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item4);
 
         // create invoice 2 (subscriptions 1-3)
@@ -298,13 +310,16 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         startDate = endDate;
         endDate = startDate.plusMonths(1);
 
-        RecurringInvoiceItem item5 = new RecurringInvoiceItem(invoiceId2, subscriptionId1, "test plan", "test phase A", startDate, endDate, rate1, rate1, Currency.USD);
+        RecurringInvoiceItem item5 = new RecurringInvoiceItem(invoiceId2, subscriptionId1, "test plan", "test phase A", startDate, endDate,
+                rate1, rate1, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item5);
 
-        RecurringInvoiceItem item6 = new RecurringInvoiceItem(invoiceId2, subscriptionId2, "test plan", "test phase B", startDate, endDate, rate2, rate2, Currency.USD);
+        RecurringInvoiceItem item6 = new RecurringInvoiceItem(invoiceId2, subscriptionId2, "test plan", "test phase B", startDate, endDate,
+                rate2, rate2, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item6);
 
-        RecurringInvoiceItem item7 = new RecurringInvoiceItem(invoiceId2, subscriptionId3, "test plan", "test phase C", startDate, endDate, rate3, rate3, Currency.USD);
+        RecurringInvoiceItem item7 = new RecurringInvoiceItem(invoiceId2, subscriptionId3, "test plan", "test phase C", startDate, endDate,
+                rate3, rate3, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item7);
 
         // check that each subscription returns the correct number of invoices
@@ -363,10 +378,12 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         BigDecimal rate1 = new BigDecimal("17.0");
         BigDecimal rate2 = new BigDecimal("42.0");
 
-        RecurringInvoiceItem item1 = new RecurringInvoiceItem(invoice1.getId(), UUID.randomUUID(), "test plan", "test phase A", startDate, endDate, rate1, rate1, Currency.USD);
+        RecurringInvoiceItem item1 = new RecurringInvoiceItem(invoice1.getId(), UUID.randomUUID(), "test plan", "test phase A", startDate,
+                endDate, rate1, rate1, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item1);
 
-        RecurringInvoiceItem item2 = new RecurringInvoiceItem(invoice1.getId(), UUID.randomUUID(), "test plan", "test phase B", startDate, endDate, rate2, rate2, Currency.USD);
+        RecurringInvoiceItem item2 = new RecurringInvoiceItem(invoice1.getId(), UUID.randomUUID(), "test plan", "test phase B", startDate,
+                endDate, rate2, rate2, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item2);
 
         BigDecimal payment1 = new BigDecimal("48.0");
@@ -390,10 +407,12 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         BigDecimal rate1 = new BigDecimal("17.0");
         BigDecimal rate2 = new BigDecimal("42.0");
 
-        RecurringInvoiceItem item1 = new RecurringInvoiceItem(invoice1.getId(), UUID.randomUUID(), "test plan", "test phase A", startDate, endDate, rate1, rate1, Currency.USD);
+        RecurringInvoiceItem item1 = new RecurringInvoiceItem(invoice1.getId(), UUID.randomUUID(), "test plan", "test phase A", startDate, endDate,
+                rate1, rate1, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item1);
 
-        RecurringInvoiceItem item2 = new RecurringInvoiceItem(invoice1.getId(), UUID.randomUUID(), "test plan", "test phase B", startDate, endDate, rate2, rate2, Currency.USD);
+        RecurringInvoiceItem item2 = new RecurringInvoiceItem(invoice1.getId(), UUID.randomUUID(), "test plan", "test phase B", startDate, endDate,
+                rate2, rate2, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item2);
 
         BigDecimal balance = invoiceDao.getAccountBalance(accountId);
@@ -428,10 +447,12 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         BigDecimal rate1 = new BigDecimal("17.0");
         BigDecimal rate2 = new BigDecimal("42.0");
 
-        RecurringInvoiceItem item1 = new RecurringInvoiceItem(invoice1.getId(), UUID.randomUUID(), "test plan", "test phase A", startDate, endDate, rate1, rate1, Currency.USD);
+        RecurringInvoiceItem item1 = new RecurringInvoiceItem(invoice1.getId(), UUID.randomUUID(), "test plan", "test phase A", startDate, endDate,
+                rate1, rate1, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item1);
 
-        RecurringInvoiceItem item2 = new RecurringInvoiceItem(invoice1.getId(), UUID.randomUUID(), "test plan", "test phase B", startDate, endDate, rate2, rate2, Currency.USD);
+        RecurringInvoiceItem item2 = new RecurringInvoiceItem(invoice1.getId(), UUID.randomUUID(), "test plan", "test phase B", startDate, endDate,
+                rate2, rate2, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item2);
 
         DateTime upToDate;
@@ -454,7 +475,8 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
 
         BigDecimal rate3 = new BigDecimal("21.0");
 
-        RecurringInvoiceItem item3 = new RecurringInvoiceItem(invoice2.getId(), UUID.randomUUID(), "test plan", "test phase C", startDate2, endDate2, rate3, rate3, Currency.USD);
+        RecurringInvoiceItem item3 = new RecurringInvoiceItem(invoice2.getId(), UUID.randomUUID(), "test plan", "test phase C", startDate2, endDate2,
+                rate3, rate3, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item3);
 
         upToDate = new DateTime(2011, 1, 1, 0, 0, 0, 0);
@@ -489,8 +511,8 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         Subscription subscription = new MockSubscription();
         DateTime effectiveDate1 = new DateTime(2011, 2, 1, 0, 0, 0, 0);
         BillingEvent event1 = new DefaultBillingEvent(subscription, effectiveDate1, plan1, phase1, null,
-                                                      recurringPrice, BillingPeriod.MONTHLY, 1, BillingModeType.IN_ADVANCE,
-                                                      "testEvent1", SubscriptionTransitionType.CREATE);
+                recurringPrice, BillingPeriod.MONTHLY, 1, BillingModeType.IN_ADVANCE,
+                "testEvent1", SubscriptionTransitionType.CREATE);
 
         BillingEventSet events = new BillingEventSet();
         events.add(event1);
@@ -507,8 +529,8 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
 
         DateTime effectiveDate2 = new DateTime(2011, 2, 15, 0, 0, 0, 0);
         BillingEvent event2 = new DefaultBillingEvent(subscription, effectiveDate2, plan2, phase2, null,
-                                                      recurringPrice2, BillingPeriod.MONTHLY, 1, BillingModeType.IN_ADVANCE,
-                                                      "testEvent2", SubscriptionTransitionType.CREATE);
+                recurringPrice2, BillingPeriod.MONTHLY, 1, BillingModeType.IN_ADVANCE,
+                "testEvent2", SubscriptionTransitionType.CREATE);
         events.add(event2);
 
         // second invoice should be for one half (14/28 days) the difference between the rate plans
@@ -538,8 +560,8 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         DateTime effectiveDate = buildDateTime(2011, 1, 1);
 
         BillingEvent event = new DefaultBillingEvent(subscription, effectiveDate, plan, phase, null,
-                                                     recurringPrice, BillingPeriod.MONTHLY, 15, BillingModeType.IN_ADVANCE,
-                                                     "testEvent", SubscriptionTransitionType.CREATE);
+                recurringPrice, BillingPeriod.MONTHLY, 15, BillingModeType.IN_ADVANCE,
+                "testEvent", SubscriptionTransitionType.CREATE);
         BillingEventSet events = new BillingEventSet();
         events.add(event);
 
@@ -568,8 +590,8 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         DateTime effectiveDate1 = buildDateTime(2011, 1, 1);
 
         BillingEvent event1 = new DefaultBillingEvent(subscription, effectiveDate1, plan, phase1, fixedPrice,
-                                                     null, BillingPeriod.MONTHLY, 1, BillingModeType.IN_ADVANCE,
-                                                     "testEvent1", SubscriptionTransitionType.CREATE);
+                null, BillingPeriod.MONTHLY, 1, BillingModeType.IN_ADVANCE,
+                "testEvent1", SubscriptionTransitionType.CREATE);
         BillingEventSet events = new BillingEventSet();
         events.add(event1);
 
@@ -583,8 +605,8 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
 
         DateTime effectiveDate2 = effectiveDate1.plusDays(30);
         BillingEvent event2 = new DefaultBillingEvent(subscription, effectiveDate2, plan, phase2, null,
-                                                     recurringPrice, BillingPeriod.MONTHLY, 31, BillingModeType.IN_ADVANCE,
-                                                     "testEvent2", SubscriptionTransitionType.CHANGE);
+                recurringPrice, BillingPeriod.MONTHLY, 31, BillingModeType.IN_ADVANCE,
+                "testEvent2", SubscriptionTransitionType.CHANGE);
         events.add(event2);
 
         Invoice invoice2 = generator.generateInvoice(UUID.randomUUID(), events, existingItems, effectiveDate2, Currency.USD);
@@ -625,15 +647,15 @@ public class InvoiceDaoTests extends InvoiceDaoTestBase {
         DateTime effectiveDate1 = buildDateTime(2011, 1, 1);
 
         BillingEvent event1 = new DefaultBillingEvent(subscription, effectiveDate1, plan, phase1, fixedPrice,
-                                                     null, BillingPeriod.MONTHLY, 1, BillingModeType.IN_ADVANCE,
-                                                     "testEvent1", SubscriptionTransitionType.CREATE);
+                null, BillingPeriod.MONTHLY, 1, BillingModeType.IN_ADVANCE,
+                "testEvent1", SubscriptionTransitionType.CREATE);
         BillingEventSet events = new BillingEventSet();
         events.add(event1);
 
         DateTime effectiveDate2 = effectiveDate1.plusDays(30);
         BillingEvent event2 = new DefaultBillingEvent(subscription, effectiveDate2, plan, phase2, null,
-                                                     recurringPrice, BillingPeriod.MONTHLY, 31, BillingModeType.IN_ADVANCE,
-                                                     "testEvent2", SubscriptionTransitionType.CHANGE);
+                recurringPrice, BillingPeriod.MONTHLY, 31, BillingModeType.IN_ADVANCE,
+                "testEvent2", SubscriptionTransitionType.CHANGE);
         events.add(event2);
 
         InvoiceGenerator generator = new DefaultInvoiceGenerator(clock);
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceItemDaoTests.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceItemDaoTests.java
index 16e95fb..2f79c8b 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceItemDaoTests.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceItemDaoTests.java
@@ -22,7 +22,6 @@ import com.ning.billing.invoice.model.DefaultInvoice;
 import com.ning.billing.invoice.model.RecurringInvoiceItem;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.DefaultClock;
-
 import org.joda.time.DateTime;
 import org.testng.annotations.Test;
 
@@ -38,7 +37,7 @@ public class InvoiceItemDaoTests extends InvoiceDaoTestBase {
 
     private final Clock clock = new DefaultClock();
 
-    @Test
+    @Test(groups = "slow")
     public void testInvoiceItemCreation() {
         UUID invoiceId = UUID.randomUUID();
         UUID subscriptionId = UUID.randomUUID();
@@ -46,7 +45,9 @@ public class InvoiceItemDaoTests extends InvoiceDaoTestBase {
         DateTime endDate = new DateTime(2011, 11, 1, 0, 0, 0, 0);
         BigDecimal rate = new BigDecimal("20.00");
 
-        RecurringInvoiceItem item = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", startDate, endDate, rate, rate, Currency.USD);
+        final DateTime expectedCreatedDate = clock.getUTCNow();
+        RecurringInvoiceItem item = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", startDate, endDate,
+                rate, rate, Currency.USD, expectedCreatedDate, expectedCreatedDate);
         recurringInvoiceItemDao.create(item);
 
         RecurringInvoiceItem thisItem = (RecurringInvoiceItem) recurringInvoiceItemDao.getById(item.getId().toString());
@@ -59,9 +60,26 @@ public class InvoiceItemDaoTests extends InvoiceDaoTestBase {
         assertEquals(thisItem.getAmount().compareTo(item.getRate()), 0);
         assertEquals(thisItem.getRate().compareTo(item.getRate()), 0);
         assertEquals(thisItem.getCurrency(), item.getCurrency());
+        assertEquals(thisItem.getCreatedDate(), item.getCreatedDate());
+        assertEquals(thisItem.getUpdatedDate(), item.getUpdatedDate());
+        assertEquals(thisItem.getUpdatedDate(), thisItem.getUpdatedDate());
+        assertEquals(thisItem.getUpdatedDate(), expectedCreatedDate);
+
+        // Try to update the object and check the updated_date column
+        final DateTime updatedDate = clock.getUTCNow().plusDays(10);
+        RecurringInvoiceItem expectedUpdatedItem = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", startDate, endDate,
+                rate, rate, Currency.USD, expectedCreatedDate, updatedDate);
+        recurringInvoiceItemDao.update(item);
+
+        RecurringInvoiceItem updatedItem = (RecurringInvoiceItem) recurringInvoiceItemDao.getById(item.getId().toString());
+        assertNotNull(updatedItem);
+        assertEquals(updatedItem.getId(), item.getId());
+        assertEquals(updatedItem.getCreatedDate(), item.getCreatedDate());
+        assertEquals(updatedItem.getUpdatedDate(), expectedUpdatedItem.getUpdatedDate());
+        assertEquals(updatedItem.getUpdatedDate(), updatedDate);
     }
 
-    @Test
+    @Test(groups = "slow")
     public void testGetInvoiceItemsBySubscriptionId() {
         UUID subscriptionId = UUID.randomUUID();
         DateTime startDate = new DateTime(2011, 3, 1, 0, 0, 0, 0);
@@ -69,7 +87,8 @@ public class InvoiceItemDaoTests extends InvoiceDaoTestBase {
 
         for (int i = 0; i < 3; i++) {
             UUID invoiceId = UUID.randomUUID();
-            RecurringInvoiceItem item = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", startDate.plusMonths(i), startDate.plusMonths(i + 1), rate, rate, Currency.USD);
+            RecurringInvoiceItem item = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", startDate.plusMonths(i), startDate.plusMonths(i + 1),
+                    rate, rate, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
             recurringInvoiceItemDao.create(item);
         }
 
@@ -77,7 +96,7 @@ public class InvoiceItemDaoTests extends InvoiceDaoTestBase {
         assertEquals(items.size(), 3);
     }
 
-    @Test
+    @Test(groups = "slow")
     public void testGetInvoiceItemsByInvoiceId() {
         UUID invoiceId = UUID.randomUUID();
         DateTime startDate = new DateTime(2011, 3, 1, 0, 0, 0, 0);
@@ -86,7 +105,8 @@ public class InvoiceItemDaoTests extends InvoiceDaoTestBase {
         for (int i = 0; i < 5; i++) {
             UUID subscriptionId = UUID.randomUUID();
             BigDecimal amount = rate.multiply(new BigDecimal(i + 1));
-            RecurringInvoiceItem item = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", startDate, startDate.plusMonths(1), amount, amount, Currency.USD);
+            RecurringInvoiceItem item = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", startDate, startDate.plusMonths(1),
+                    amount, amount, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
             recurringInvoiceItemDao.create(item);
         }
 
@@ -94,7 +114,7 @@ public class InvoiceItemDaoTests extends InvoiceDaoTestBase {
         assertEquals(items.size(), 5);
     }
 
-    @Test
+    @Test(groups = "slow")
     public void testGetInvoiceItemsByAccountId() {
         UUID accountId = UUID.randomUUID();
         DateTime targetDate = new DateTime(2011, 5, 23, 0, 0, 0, 0);
@@ -107,7 +127,8 @@ public class InvoiceItemDaoTests extends InvoiceDaoTestBase {
         BigDecimal rate = new BigDecimal("20.00");
 
         UUID subscriptionId = UUID.randomUUID();
-        RecurringInvoiceItem item = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", startDate, startDate.plusMonths(1), rate, rate, Currency.USD);
+        RecurringInvoiceItem item = new RecurringInvoiceItem(invoiceId, subscriptionId, "test plan", "test phase", startDate, startDate.plusMonths(1),
+                rate, rate, Currency.USD, clock.getUTCNow(), clock.getUTCNow());
         recurringInvoiceItemDao.create(item);
 
         List<InvoiceItem> items = recurringInvoiceItemDao.getInvoiceItemsByAccount(accountId.toString());
diff --git a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
index 3b1a6f9..4dd5040 100644
--- a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
+++ b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
@@ -25,7 +25,6 @@ import javax.annotation.Nullable;
 
 import org.apache.commons.lang.StringUtils;
 import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +34,7 @@ import com.ning.billing.account.api.AccountUserApi;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoicePaymentApi;
 import com.ning.billing.invoice.model.DefaultInvoicePayment;
+import com.ning.billing.payment.RetryService;
 import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.provider.PaymentProviderPlugin;
 import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
@@ -44,6 +44,7 @@ public class DefaultPaymentApi implements PaymentApi {
     private final PaymentProviderPluginRegistry pluginRegistry;
     private final AccountUserApi accountUserApi;
     private final InvoicePaymentApi invoicePaymentApi;
+    private final RetryService retryService;
     private final PaymentDao paymentDao;
     private final PaymentConfig config;
 
@@ -53,11 +54,13 @@ public class DefaultPaymentApi implements PaymentApi {
     public DefaultPaymentApi(PaymentProviderPluginRegistry pluginRegistry,
                              AccountUserApi accountUserApi,
                              InvoicePaymentApi invoicePaymentApi,
+                             RetryService retryService,
                              PaymentDao paymentDao,
                              PaymentConfig config) {
         this.pluginRegistry = pluginRegistry;
         this.accountUserApi = accountUserApi;
         this.invoicePaymentApi = invoicePaymentApi;
+        this.retryService = retryService;
         this.paymentDao = paymentDao;
         this.config = config;
     }
@@ -134,7 +137,7 @@ public class DefaultPaymentApi implements PaymentApi {
     }
 
     @Override
-    public Either<PaymentError, PaymentInfo> createPayment(UUID paymentAttemptId) {
+    public Either<PaymentError, PaymentInfo> createPaymentForPaymentAttempt(UUID paymentAttemptId) {
         PaymentAttempt paymentAttempt = paymentDao.getPaymentAttemptById(paymentAttemptId);
 
         if (paymentAttempt != null) {
@@ -142,17 +145,23 @@ public class DefaultPaymentApi implements PaymentApi {
             Account account = accountUserApi.getAccountById(paymentAttempt.getAccountId());
 
             if (invoice != null && account != null) {
-                if (invoice.getBalance().compareTo(BigDecimal.ZERO) == 0 ) {
+                if (invoice.getBalance().compareTo(BigDecimal.ZERO) <= 0 ) {
                     // TODO: send a notification that invoice was ignored?
                     log.info("Received invoice for payment with outstanding amount of 0 {} ", invoice);
-                    Either.left(new PaymentError("invoice_balance_0", "Invoice balance was 0"));
+                    return Either.left(new PaymentError("invoice_balance_0",
+                                                        "Invoice balance was 0 or less",
+                                                        paymentAttempt.getAccountId(),
+                                                        paymentAttempt.getInvoiceId()));
                 }
                 else {
                     return processPayment(getPaymentProviderPlugin(account), account, invoice, paymentAttempt);
                 }
             }
         }
-        return Either.left(new PaymentError("retry_payment_error", "Could not load payment attempt, invoice or account for id " + paymentAttemptId));
+        return Either.left(new PaymentError("retry_payment_error",
+                                            "Could not load payment attempt, invoice or account for id " + paymentAttemptId,
+                                            paymentAttempt.getAccountId(),
+                                            paymentAttempt.getInvoiceId()));
     }
 
     @Override
@@ -164,10 +173,14 @@ public class DefaultPaymentApi implements PaymentApi {
         for (String invoiceId : invoiceIds) {
             Invoice invoice = invoicePaymentApi.getInvoice(UUID.fromString(invoiceId));
 
-            if (invoice.getBalance().compareTo(BigDecimal.ZERO) == 0 ) {
+            if (invoice.getBalance().compareTo(BigDecimal.ZERO) <= 0 ) {
                 // TODO: send a notification that invoice was ignored?
                 log.info("Received invoice for payment with balance of 0 {} ", invoice);
-                Either.left(new PaymentError("invoice_balance_0", "Invoice balance was 0"));
+                Either<PaymentError, PaymentInfo> result = Either.left(new PaymentError("invoice_balance_0",
+                                                                                        "Invoice balance was 0 or less",
+                                                                                        account.getId(),
+                                                                                        UUID.fromString(invoiceId)));
+                processedPaymentsOrErrors.add(result);
             }
             else {
                 PaymentAttempt paymentAttempt = paymentDao.createPaymentAttempt(invoice);
@@ -234,7 +247,7 @@ public class DefaultPaymentApi implements PaymentApi {
 
         if (retryCount < retryDays.size()) {
             int retryInDays = 0;
-            DateTime nextRetryDate = new DateTime(DateTimeZone.UTC);
+            DateTime nextRetryDate = paymentAttempt.getPaymentAttemptDate();
 
             try {
                 retryInDays = retryDays.get(retryCount);
@@ -244,6 +257,7 @@ public class DefaultPaymentApi implements PaymentApi {
                 log.error("Could not get retry day for retry count {}", retryCount);
             }
 
+            retryService.scheduleRetry(paymentAttempt, nextRetryDate);
             paymentDao.updatePaymentAttemptWithRetryInfo(paymentAttempt.getPaymentAttemptId(), retryCount + 1, nextRetryDate);
         }
         else if (retryCount == retryDays.size()) {
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/com/ning/billing/payment/dao/DefaultPaymentDao.java
index a86db13..49a4165 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/DefaultPaymentDao.java
@@ -16,23 +16,28 @@
 
 package com.ning.billing.payment.dao;
 
+import java.util.Date;
 import java.util.List;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 
+import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.payment.api.PaymentAttempt;
 import com.ning.billing.payment.api.PaymentInfo;
+import com.ning.billing.util.clock.Clock;
 
 public class DefaultPaymentDao implements PaymentDao {
     private final PaymentSqlDao sqlDao;
+    private final Clock clock;
 
     @Inject
-    public DefaultPaymentDao(IDBI dbi) {
+    public DefaultPaymentDao(IDBI dbi, Clock clock) {
         this.sqlDao = dbi.onDemand(PaymentSqlDao.class);
+        this.clock = clock;
     }
 
     @Override
@@ -66,27 +71,37 @@ public class DefaultPaymentDao implements PaymentDao {
 
     @Override
     public void updatePaymentAttemptWithPaymentId(UUID paymentAttemptId, String paymentId) {
-        sqlDao.updatePaymentAttemptWithPaymentId(paymentAttemptId.toString(), paymentId);
+        sqlDao.updatePaymentAttemptWithPaymentId(paymentAttemptId.toString(), paymentId, clock.getUTCNow().toDate());
     }
 
     @Override
     public void updatePaymentInfo(String type, String paymentId, String cardType, String cardCountry) {
-        sqlDao.updatePaymentInfo(type, paymentId, cardType, cardCountry);
+        sqlDao.updatePaymentInfo(type, paymentId, cardType, cardCountry, clock.getUTCNow().toDate());
     }
 
     @Override
     public List<PaymentInfo> getPaymentInfo(List<String> invoiceIds) {
-        return sqlDao.getPaymentInfos(invoiceIds);
+        if (invoiceIds == null || invoiceIds.size() == 0) {
+            return ImmutableList.<PaymentInfo>of();
+        } else {
+            return sqlDao.getPaymentInfos(invoiceIds);
+        }
     }
 
     @Override
     public List<PaymentAttempt> getPaymentAttemptsForInvoiceIds(List<String> invoiceIds) {
-        return sqlDao.getPaymentAttemptsForInvoiceIds(invoiceIds);
+        if (invoiceIds == null || invoiceIds.size() == 0) {
+            return ImmutableList.<PaymentAttempt>of();
+        } else {
+            return sqlDao.getPaymentAttemptsForInvoiceIds(invoiceIds);
+        }
     }
 
     @Override
     public void updatePaymentAttemptWithRetryInfo(UUID paymentAttemptId, int retryCount, DateTime nextRetryDate) {
-        sqlDao.updatePaymentAttemptWithRetryInfo(paymentAttemptId.toString(), retryCount, nextRetryDate);
+
+        final Date retryDate = nextRetryDate == null ? null : nextRetryDate.toDate();
+        sqlDao.updatePaymentAttemptWithRetryInfo(paymentAttemptId.toString(), retryCount, retryDate, clock.getUTCNow().toDate());
     }
 
     @Override
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentSqlDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentSqlDao.java
index 9919d34..ea564f3 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentSqlDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentSqlDao.java
@@ -71,18 +71,22 @@ public interface PaymentSqlDao extends Transactional<PaymentSqlDao>, CloseMe, Tr
 
     @SqlUpdate
     void updatePaymentAttemptWithPaymentId(@Bind("payment_attempt_id") String paymentAttemptId,
-                                           @Bind("payment_id") String paymentId);
+                                           @Bind("payment_id") String paymentId,
+                                           @Bind("updated_dt") Date updatedDate);
 
     @SqlUpdate
     void updatePaymentAttemptWithRetryInfo(@Bind("payment_attempt_id") String paymentAttemptId,
                                            @Bind("retry_count") int retryCount,
-                                           @Bind("next_retry_dt") DateTime nextRetryDate);
+                                           @Bind("next_retry_dt") Date nextRetryDate,
+                                           @Bind("updated_dt") Date updatedDate);
 
     @SqlUpdate
+
     void updatePaymentInfo(@Bind("payment_method") String paymentMethod,
                            @Bind("payment_id") String paymentId,
                            @Bind("card_type") String cardType,
-                           @Bind("card_country") String cardCountry);
+                           @Bind("card_country") String cardCountry,
+                           @Bind("updated_dt") Date updatedDate);
 
     @SqlQuery
     @Mapper(PaymentInfoMapper.class)
diff --git a/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java b/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java
index 71c7e41..280d1e0 100644
--- a/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java
+++ b/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java
@@ -53,7 +53,10 @@ public class NoOpPaymentProviderPlugin implements PaymentProviderPlugin {
 
     @Override
     public Either<PaymentError, String> createPaymentProviderAccount(Account account) {
-        return Either.left(new PaymentError("unsupported", "Account creation not supported in this plugin"));
+        return Either.left(new PaymentError("unsupported",
+                                            "Account creation not supported in this plugin",
+                                            account.getId(),
+                                            null));
     }
 
     @Override
diff --git a/payment/src/main/java/com/ning/billing/payment/RetryService.java b/payment/src/main/java/com/ning/billing/payment/RetryService.java
index bbe8a61..ee57397 100644
--- a/payment/src/main/java/com/ning/billing/payment/RetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/RetryService.java
@@ -19,7 +19,6 @@ package com.ning.billing.payment;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 import com.google.inject.Inject;
 import com.ning.billing.lifecycle.KillbillService;
@@ -82,7 +81,7 @@ public class RetryService implements KillbillService {
          }
     }
 
-    public void scheduleRetry(Transmogrifier transactionalDao, PaymentAttempt paymentAttempt, DateTime timeOfRetry) {
+    public void scheduleRetry(PaymentAttempt paymentAttempt, DateTime timeOfRetry) {
         final String id = paymentAttempt.getPaymentAttemptId().toString();
 
         NotificationKey key = new NotificationKey() {
@@ -91,14 +90,14 @@ public class RetryService implements KillbillService {
                 return id;
             }
         };
-        retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, key);
+        retryQueue.recordFutureNotification(timeOfRetry, key);
     }
 
     private void retry(String paymentAttemptId) {
         PaymentInfo paymentInfo = paymentApi.getPaymentInfoForPaymentAttemptId(paymentAttemptId);
 
-        if (paymentInfo != null && !PaymentStatus.Processed.equals(PaymentStatus.valueOf(paymentInfo.getStatus()))) {
-            paymentApi.createPayment(UUID.fromString(paymentAttemptId));
+        if (paymentInfo == null || !PaymentStatus.Processed.equals(PaymentStatus.valueOf(paymentInfo.getStatus()))) {
+            paymentApi.createPaymentForPaymentAttempt(UUID.fromString(paymentAttemptId));
         }
     }
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/setup/PaymentModule.java b/payment/src/main/java/com/ning/billing/payment/setup/PaymentModule.java
index 85641ed..d3d9320 100644
--- a/payment/src/main/java/com/ning/billing/payment/setup/PaymentModule.java
+++ b/payment/src/main/java/com/ning/billing/payment/setup/PaymentModule.java
@@ -64,5 +64,6 @@ public class PaymentModule extends AbstractModule {
         bind(PaymentService.class).to(DefaultPaymentService.class).asEagerSingleton();
         installPaymentProviderPlugins(paymentConfig);
         installPaymentDao();
+        installRetryEngine();
     }
 }
diff --git a/payment/src/main/resources/com/ning/billing/payment/dao/PaymentSqlDao.sql.stg b/payment/src/main/resources/com/ning/billing/payment/dao/PaymentSqlDao.sql.stg
index 6576ecb..b349b5e 100644
--- a/payment/src/main/resources/com/ning/billing/payment/dao/PaymentSqlDao.sql.stg
+++ b/payment/src/main/resources/com/ning/billing/payment/dao/PaymentSqlDao.sql.stg
@@ -65,7 +65,7 @@ getPaymentAttemptForInvoiceId() ::= <<
 updatePaymentAttemptWithPaymentId() ::= <<
     UPDATE payment_attempts
        SET payment_id = :payment_id,
-           updated_dt = NOW()
+           updated_dt = :updated_dt
      WHERE payment_attempt_id = :payment_attempt_id
 >>
 
@@ -79,7 +79,7 @@ updatePaymentInfo() ::= <<
        SET payment_method = :payment_method,
            card_type = :card_type,
            card_country = :card_country,
-           updated_dt = NOW()
+           updated_dt = :updated_dt
      WHERE payment_id = :payment_id
 >>
 
@@ -87,7 +87,7 @@ updatePaymentAttemptWithRetryInfo() ::= <<
     UPDATE payment_attempts
        SET retry_count = :retry_count,
            next_retry_dt = :next_retry_dt,
-           updated_dt = NOW()
+           updated_dt = :updated_dt
      WHERE payment_attempt_id = :payment_attempt_id
 >>
 
diff --git a/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java b/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
index 2ef26e4..39ae479 100644
--- a/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
+++ b/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
@@ -77,7 +77,9 @@ public abstract class TestPaymentApi {
                                                        now.plusMonths(1),
                                                        amount,
                                                        new BigDecimal("1.0"),
-                                                       Currency.USD));
+                                                       Currency.USD,
+                                                       now,
+                                                       now));
 
         List<Either<PaymentError, PaymentInfo>> results = paymentApi.createPayment(account.getExternalKey(), Arrays.asList(invoice.getId().toString()));
 
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
index b245e55..10b81da 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Collections2;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.payment.api.PaymentAttempt;
 import com.ning.billing.payment.api.PaymentInfo;
+import org.joda.time.DateTimeZone;
 
 public class MockPaymentDao implements PaymentDao {
     private final Map<String, PaymentInfo> payments = new ConcurrentHashMap<String, PaymentInfo>();
@@ -86,7 +87,13 @@ public class MockPaymentDao implements PaymentDao {
     public void updatePaymentInfo(String paymentMethodType, String paymentId, String cardType, String cardCountry) {
         PaymentInfo existingPayment = payments.get(paymentId);
         if (existingPayment != null) {
-            PaymentInfo payment = existingPayment.cloner().setPaymentMethod(paymentMethodType).setCardType(cardType).setCardCountry(cardCountry).build();
+            PaymentInfo payment = existingPayment.cloner()
+                    .setPaymentMethod(paymentMethodType)
+                    .setCardType(cardType)
+                    .setCardCountry(cardCountry)
+                    // TODO pass the clock?
+                    .setUpdatedDate(new DateTime(DateTimeZone.UTC))
+                    .build();
             payments.put(paymentId, payment);
         }
     }
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
index 18b0a15..6318d28 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
@@ -16,19 +16,18 @@
 
 package com.ning.billing.payment.dao;
 
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.UUID;
-
+import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.catalog.api.Currency;
+import com.ning.billing.payment.api.PaymentAttempt;
+import com.ning.billing.payment.api.PaymentInfo;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import com.ning.billing.account.api.AccountApiException;
-import com.ning.billing.catalog.api.Currency;
-import com.ning.billing.payment.api.PaymentAttempt;
-import com.ning.billing.payment.api.PaymentInfo;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.UUID;
 
 public abstract class TestPaymentDao {
 
@@ -37,35 +36,35 @@ public abstract class TestPaymentDao {
     @Test
     public void testCreatePayment() {
         PaymentInfo paymentInfo = new PaymentInfo.Builder().setPaymentId(UUID.randomUUID().toString())
-                                                           .setAmount(BigDecimal.TEN)
-                                                           .setStatus("Processed")
-                                                           .setBankIdentificationNumber("1234")
-                                                           .setPaymentNumber("12345")
-                                                           .setPaymentMethodId("12345")
-                                                           .setReferenceId("12345")
-                                                           .setType("Electronic")
-                                                           .setCreatedDate(new DateTime(DateTimeZone.UTC))
-                                                           .setUpdatedDate(new DateTime(DateTimeZone.UTC))
-                                                           .setEffectiveDate(new DateTime(DateTimeZone.UTC))
-                                                           .build();
+                .setAmount(BigDecimal.TEN)
+                .setStatus("Processed")
+                .setBankIdentificationNumber("1234")
+                .setPaymentNumber("12345")
+                .setPaymentMethodId("12345")
+                .setReferenceId("12345")
+                .setType("Electronic")
+                .setCreatedDate(new DateTime(DateTimeZone.UTC))
+                .setUpdatedDate(new DateTime(DateTimeZone.UTC))
+                .setEffectiveDate(new DateTime(DateTimeZone.UTC))
+                .build();
 
         paymentDao.savePaymentInfo(paymentInfo);
     }
 
     @Test
-    public void testUpdatePayment() {
+    public void testUpdatePaymenInfo() {
         PaymentInfo paymentInfo = new PaymentInfo.Builder().setPaymentId(UUID.randomUUID().toString())
-                                                           .setAmount(BigDecimal.TEN)
-                                                           .setStatus("Processed")
-                                                           .setBankIdentificationNumber("1234")
-                                                           .setPaymentNumber("12345")
-                                                           .setPaymentMethodId("12345")
-                                                           .setReferenceId("12345")
-                                                           .setType("Electronic")
-                                                           .setCreatedDate(new DateTime(DateTimeZone.UTC))
-                                                           .setUpdatedDate(new DateTime(DateTimeZone.UTC))
-                                                           .setEffectiveDate(new DateTime(DateTimeZone.UTC))
-                                                           .build();
+                .setAmount(BigDecimal.TEN)
+                .setStatus("Processed")
+                .setBankIdentificationNumber("1234")
+                .setPaymentNumber("12345")
+                .setPaymentMethodId("12345")
+                .setReferenceId("12345")
+                .setType("Electronic")
+                .setCreatedDate(new DateTime(DateTimeZone.UTC))
+                .setUpdatedDate(new DateTime(DateTimeZone.UTC))
+                .setEffectiveDate(new DateTime(DateTimeZone.UTC))
+                .build();
 
         paymentDao.savePaymentInfo(paymentInfo);
 
@@ -74,6 +73,24 @@ public abstract class TestPaymentDao {
     }
 
     @Test
+    public void testUpdatePaymentAttempt() {
+        PaymentAttempt paymentAttempt = new PaymentAttempt.Builder().setPaymentAttemptId(UUID.randomUUID())
+                .setPaymentId(UUID.randomUUID().toString())
+                .setInvoiceId(UUID.randomUUID())
+                .setAccountId(UUID.randomUUID())
+                .setAmount(BigDecimal.TEN)
+                .setCurrency(Currency.USD)
+                .setInvoiceDate(new DateTime(DateTimeZone.UTC))
+                .setCreatedDate(new DateTime(DateTimeZone.UTC))
+                .setUpdatedDate(new DateTime(DateTimeZone.UTC))
+                .build();
+
+        paymentDao.createPaymentAttempt(paymentAttempt);
+
+        paymentDao.updatePaymentAttemptWithRetryInfo(paymentAttempt.getPaymentAttemptId(), 1, paymentAttempt.getCreatedDate().plusDays(1));
+    }
+
+    @Test
     public void testGetPaymentForInvoice() throws AccountApiException {
         final UUID invoiceId = UUID.randomUUID();
         final UUID paymentAttemptId = UUID.randomUUID();
@@ -81,7 +98,8 @@ public abstract class TestPaymentDao {
         final String paymentId = UUID.randomUUID().toString();
         final BigDecimal invoiceAmount = BigDecimal.TEN;
 
-        final DateTime now = new DateTime(DateTimeZone.UTC);
+        // Move the clock backwards to test the updated_date field (see below)
+        final DateTime now = new DateTime(DateTimeZone.UTC).minusDays(1);
 
         PaymentAttempt originalPaymenAttempt = new PaymentAttempt(paymentAttemptId, invoiceId, accountId, invoiceAmount, Currency.USD, now, now, paymentId, null, null);
 
@@ -100,22 +118,25 @@ public abstract class TestPaymentDao {
         Assert.assertEquals(attempt3, attempt4);
 
         PaymentInfo originalPaymentInfo = new PaymentInfo.Builder().setPaymentId(paymentId)
-                                                           .setAmount(invoiceAmount)
-                                                           .setStatus("Processed")
-                                                           .setBankIdentificationNumber("1234")
-                                                           .setPaymentNumber("12345")
-                                                           .setPaymentMethodId("12345")
-                                                           .setReferenceId("12345")
-                                                           .setType("Electronic")
-                                                           .setCreatedDate(now)
-                                                           .setUpdatedDate(now)
-                                                           .setEffectiveDate(now)
-                                                           .build();
+                .setAmount(invoiceAmount)
+                .setStatus("Processed")
+                .setBankIdentificationNumber("1234")
+                .setPaymentNumber("12345")
+                .setPaymentMethodId("12345")
+                .setReferenceId("12345")
+                .setType("Electronic")
+                .setCreatedDate(now)
+                .setUpdatedDate(now)
+                .setEffectiveDate(now)
+                .build();
 
         paymentDao.savePaymentInfo(originalPaymentInfo);
         PaymentInfo paymentInfo = paymentDao.getPaymentInfo(Arrays.asList(invoiceId.toString())).get(0);
+        Assert.assertEquals(paymentInfo, originalPaymentInfo);
 
-        Assert.assertEquals(originalPaymentInfo, paymentInfo);
+        paymentDao.updatePaymentInfo(originalPaymentInfo.getPaymentMethod(), originalPaymentInfo.getPaymentId(), originalPaymentInfo.getCardType(), originalPaymentInfo.getCardCountry());
+        paymentInfo = paymentDao.getPaymentInfo(Arrays.asList(invoiceId.toString())).get(0);
+        Assert.assertEquals(paymentInfo.getCreatedDate().getMillis() / 1000, originalPaymentInfo.getCreatedDate().getMillis() / 1000);
+        Assert.assertTrue(paymentInfo.getUpdatedDate().isAfter(originalPaymentInfo.getUpdatedDate()));
     }
-
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDaoWithEmbeddedDb.java b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDaoWithEmbeddedDb.java
index 19ca39d..a5da11b 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDaoWithEmbeddedDb.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDaoWithEmbeddedDb.java
@@ -18,6 +18,7 @@ package com.ning.billing.payment.dao;
 
 import java.io.IOException;
 
+import com.ning.billing.util.clock.DefaultClock;
 import org.apache.commons.io.IOUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -45,6 +46,6 @@ public class TestPaymentDaoWithEmbeddedDb extends TestPaymentDao {
 
     @BeforeMethod(alwaysRun = true)
     public void setUp() throws IOException {
-        paymentDao = new DefaultPaymentDao(helper.getDBI());
+        paymentDao = new DefaultPaymentDao(helper.getDBI(), new DefaultClock());
     }
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java b/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
index 375bcfd..dee2a28 100644
--- a/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
+++ b/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.RandomStringUtils;
 import org.joda.time.DateTime;
@@ -39,24 +40,34 @@ import com.ning.billing.payment.api.PaymentProviderAccount;
 import com.ning.billing.payment.api.PaypalPaymentMethodInfo;
 
 public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
+    private final AtomicBoolean makeNextInvoiceFail = new AtomicBoolean(false);
     private final Map<String, PaymentInfo> payments = new ConcurrentHashMap<String, PaymentInfo>();
     private final Map<String, PaymentProviderAccount> accounts = new ConcurrentHashMap<String, PaymentProviderAccount>();
     private final Map<String, PaymentMethodInfo> paymentMethods = new ConcurrentHashMap<String, PaymentMethodInfo>();
 
+    public void makeNextInvoiceFail() {
+        makeNextInvoiceFail.set(true);
+    }
+
     @Override
     public Either<PaymentError, PaymentInfo> processInvoice(Account account, Invoice invoice) {
-        PaymentInfo payment = new PaymentInfo.Builder().setPaymentId(UUID.randomUUID().toString())
-                                             .setAmount(invoice.getBalance())
-                                             .setStatus("Processed")
-                                             .setBankIdentificationNumber("1234")
-                                             .setCreatedDate(new DateTime())
-                                             .setEffectiveDate(new DateTime())
-                                             .setPaymentNumber("12345")
-                                             .setReferenceId("12345")
-                                             .setType("Electronic")
-                                             .build();
-        payments.put(payment.getPaymentId(), payment);
-        return Either.right(payment);
+        if (makeNextInvoiceFail.getAndSet(false)) {
+            return Either.left(new PaymentError("unknown", "test error", account.getId(), invoice.getId()));
+        }
+        else {
+            PaymentInfo payment = new PaymentInfo.Builder().setPaymentId(UUID.randomUUID().toString())
+                                                 .setAmount(invoice.getBalance())
+                                                 .setStatus("Processed")
+                                                 .setBankIdentificationNumber("1234")
+                                                 .setCreatedDate(new DateTime())
+                                                 .setEffectiveDate(new DateTime())
+                                                 .setPaymentNumber("12345")
+                                                 .setReferenceId("12345")
+                                                 .setType("Electronic")
+                                                 .build();
+            payments.put(payment.getPaymentId(), payment);
+            return Either.right(payment);
+        }
     }
 
     @Override
@@ -64,7 +75,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
         PaymentInfo payment = payments.get(paymentId);
 
         if (payment == null) {
-            return Either.left(new PaymentError("notfound", "No payment found for id " + paymentId));
+            return Either.left(new PaymentError("notfound", "No payment found for id " + paymentId, null, null));
         }
         else {
             return Either.right(payment);
@@ -83,7 +94,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
             return Either.right(id);
         }
         else {
-            return Either.left(new PaymentError("unknown", "Did not get account to create payment provider account"));
+            return Either.left(new PaymentError("unknown", "Did not get account to create payment provider account", null, null));
         }
     }
 
@@ -93,7 +104,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
             return Either.right(accounts.get(accountKey));
         }
         else {
-            return Either.left(new PaymentError("unknown", "Did not get account for accountKey " + accountKey));
+            return Either.left(new PaymentError("unknown", "Did not get account for accountKey " + accountKey, null, null));
         }
     }
 
@@ -125,7 +136,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
                     realPaymentMethod = new CreditCardPaymentMethodInfo.Builder(ccPaymentMethod).setId(paymentMethodId).build();
                 }
                 if (realPaymentMethod == null) {
-                    return Either.left(new PaymentError("unsupported", "Payment method " + paymentMethod.getType() + " not supported by the plugin"));
+                    return Either.left(new PaymentError("unsupported", "Payment method " + paymentMethod.getType() + " not supported by the plugin", null, null));
                 }
                 else {
                     if (shouldBeDefault) {
@@ -136,11 +147,11 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
                 }
             }
                 else {
-                    return Either.left(new PaymentError("noaccount", "Could not retrieve account for accountKey " + accountKey));
+                    return Either.left(new PaymentError("noaccount", "Could not retrieve account for accountKey " + accountKey, null, null));
                 }
         }
         else {
-            return Either.left(new PaymentError("unknown", "Could not create add payment method " + paymentMethod + " for " + accountKey));
+            return Either.left(new PaymentError("unknown", "Could not create add payment method " + paymentMethod + " for " + accountKey, null, null));
         }
     }
 
@@ -184,7 +195,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
                 realPaymentMethod = new CreditCardPaymentMethodInfo.Builder(ccPaymentMethod).build();
             }
             if (realPaymentMethod == null) {
-                return Either.left(new PaymentError("unsupported", "Payment method " + paymentMethod.getType() + " not supported by the plugin"));
+                return Either.left(new PaymentError("unsupported", "Payment method " + paymentMethod.getType() + " not supported by the plugin", null, null));
             }
             else {
                 paymentMethods.put(paymentMethod.getId(), paymentMethod);
@@ -192,7 +203,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
             }
         }
         else {
-            return Either.left(new PaymentError("unknown", "Could not create add payment method " + paymentMethod + " for " + accountKey));
+            return Either.left(new PaymentError("unknown", "Could not create add payment method " + paymentMethod + " for " + accountKey, null, null));
         }
     }
 
@@ -202,11 +213,11 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
         if (paymentMethodInfo != null) {
             if (Boolean.FALSE.equals(paymentMethodInfo.getDefaultMethod()) || paymentMethodInfo.getDefaultMethod() == null) {
                 if (paymentMethods.remove(paymentMethodId) == null) {
-                    return Either.left(new PaymentError("unknown", "Did not get any result back"));
+                    return Either.left(new PaymentError("unknown", "Did not get any result back", null, null));
                 }
             }
             else {
-                return Either.left(new PaymentError("error", "Cannot delete default payment method"));
+                return Either.left(new PaymentError("error", "Cannot delete default payment method", null, null));
             }
         }
         return Either.right(null);
@@ -215,7 +226,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
     @Override
     public Either<PaymentError, PaymentMethodInfo> getPaymentMethodInfo(String paymentMethodId) {
         if (paymentMethodId == null) {
-            return Either.left(new PaymentError("unknown", "Could not retrieve payment method for paymentMethodId " + paymentMethodId));
+            return Either.left(new PaymentError("unknown", "Could not retrieve payment method for paymentMethodId " + paymentMethodId, null, null));
         }
 
         return Either.right(paymentMethods.get(paymentMethodId));
diff --git a/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithEmbeddedDb.java b/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithEmbeddedDb.java
index 31fdae3..97aa31e 100644
--- a/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithEmbeddedDb.java
+++ b/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithEmbeddedDb.java
@@ -16,7 +16,6 @@
 
 package com.ning.billing.payment.setup;
 
-import com.ning.billing.util.bus.Bus;
 import org.apache.commons.collections.MapUtils;
 
 import com.google.common.collect.ImmutableMap;
@@ -24,8 +23,10 @@ import com.google.inject.Provider;
 import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
 import com.ning.billing.mock.BrainDeadProxyFactory;
 import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
-import com.ning.billing.payment.setup.PaymentTestModuleWithMocks.MockProvider;
+import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.bus.InMemoryBus;
+import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
 
 public class PaymentTestModuleWithEmbeddedDb extends PaymentModule {
 	public static class MockProvider implements Provider<EntitlementBillingApi> {
@@ -33,9 +34,9 @@ public class PaymentTestModuleWithEmbeddedDb extends PaymentModule {
 		public EntitlementBillingApi get() {
 			return BrainDeadProxyFactory.createBrainDeadProxyFor(EntitlementBillingApi.class);
 		}
-		
+
 	}
-	
+
 	public PaymentTestModuleWithEmbeddedDb() {
         super(MapUtils.toProperties(ImmutableMap.of("killbill.payment.provider.default", "my-mock")));
     }
@@ -49,6 +50,7 @@ public class PaymentTestModuleWithEmbeddedDb extends PaymentModule {
     protected void configure() {
         super.configure();
         bind(Bus.class).to(InMemoryBus.class).asEagerSingleton();
-        bind(EntitlementBillingApi.class).toProvider( MockProvider.class );
+        bind(EntitlementBillingApi.class).toProvider(MockProvider.class);
+        bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
     }
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithMocks.java b/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithMocks.java
index 3dfc637..c8f79bc 100644
--- a/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithMocks.java
+++ b/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithMocks.java
@@ -31,6 +31,8 @@ import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.bus.InMemoryBus;
+import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
 
 public class PaymentTestModuleWithMocks extends PaymentModule {
 	public static class MockProvider implements Provider<EntitlementBillingApi> {
@@ -38,12 +40,13 @@ public class PaymentTestModuleWithMocks extends PaymentModule {
 		public EntitlementBillingApi get() {
 			return BrainDeadProxyFactory.createBrainDeadProxyFor(EntitlementBillingApi.class);
 		}
-		
+
 	}
-	
-	
+
+
     public PaymentTestModuleWithMocks() {
-        super(MapUtils.toProperties(ImmutableMap.of("killbill.payment.provider.default", "my-mock")));
+        super(MapUtils.toProperties(ImmutableMap.of("killbill.payment.provider.default", "my-mock",
+                                                    "killbill.payment.engine.events.off", "false")));
     }
 
     @Override
@@ -65,5 +68,6 @@ public class PaymentTestModuleWithMocks extends PaymentModule {
         bind(MockInvoiceDao.class).asEagerSingleton();
         bind(InvoiceDao.class).to(MockInvoiceDao.class);
         bind(EntitlementBillingApi.class).toProvider( MockProvider.class );
+        bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
     }
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/TestHelper.java b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
index 4430eb2..1ffda5c 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestHelper.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
@@ -93,7 +93,9 @@ public class TestHelper {
                                                                recurringInvoiceItem.getEndDate(),
                                                                recurringInvoiceItem.getAmount(),
                                                                recurringInvoiceItem.getRate(),
-                                                               recurringInvoiceItem.getCurrency()));
+                                                               recurringInvoiceItem.getCurrency(),
+                                                               recurringInvoiceItem.getCreatedDate(),
+                                                               recurringInvoiceItem.getUpdatedDate()));
             }
         }
         invoiceDao.create(invoice);
@@ -104,7 +106,8 @@ public class TestHelper {
         final DateTime now = new DateTime(DateTimeZone.UTC);
         final UUID subscriptionId = UUID.randomUUID();
         final BigDecimal amount = new BigDecimal("10.00");
-        final InvoiceItem item = new RecurringInvoiceItem(null, subscriptionId, "test plan", "test phase", now, now.plusMonths(1), amount, new BigDecimal("1.0"), Currency.USD);
+        final InvoiceItem item = new RecurringInvoiceItem(null, subscriptionId, "test plan", "test phase", now, now.plusMonths(1),
+                amount, new BigDecimal("1.0"), Currency.USD, now, now);
 
         return createTestInvoice(account, now, Currency.USD, item);
     }
diff --git a/payment/src/test/java/com/ning/billing/payment/TestPaymentInvoiceIntegration.java b/payment/src/test/java/com/ning/billing/payment/TestPaymentInvoiceIntegration.java
index c1e62d8..888e017 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestPaymentInvoiceIntegration.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestPaymentInvoiceIntegration.java
@@ -26,7 +26,6 @@ import java.math.BigDecimal;
 import java.util.List;
 import java.util.concurrent.Callable;
 
-import com.ning.billing.invoice.glue.InvoiceModuleWithMocks;
 import org.apache.commons.io.IOUtils;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
@@ -46,6 +45,7 @@ import com.ning.billing.account.glue.AccountModule;
 import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoicePaymentApi;
+import com.ning.billing.invoice.glue.InvoiceModuleWithMocks;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentAttempt;
 import com.ning.billing.payment.api.PaymentError;
@@ -54,6 +54,7 @@ import com.ning.billing.payment.setup.PaymentTestModuleWithEmbeddedDb;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.bus.Bus.EventBusException;
 import com.ning.billing.util.clock.MockClockModule;
+import com.ning.billing.util.notificationq.NotificationQueueService;
 
 public class TestPaymentInvoiceIntegration {
     // create payment for received invoice and save it -- positive and negative
@@ -69,6 +70,8 @@ public class TestPaymentInvoiceIntegration {
     private PaymentApi paymentApi;
     @Inject
     private TestHelper testHelper;
+    @Inject
+    private NotificationQueueService notificationQueueService;
 
     private MockPaymentInfoReceiver paymentInfoReceiver;
 
diff --git a/payment/src/test/java/com/ning/billing/payment/TestRetryService.java b/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
new file mode 100644
index 0000000..6a08eb5
--- /dev/null
+++ b/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.payment;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.inject.Inject;
+import com.ning.billing.account.api.Account;
+import com.ning.billing.account.glue.AccountModuleWithMocks;
+import com.ning.billing.catalog.api.Currency;
+import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.invoice.glue.InvoiceModuleWithMocks;
+import com.ning.billing.invoice.model.RecurringInvoiceItem;
+import com.ning.billing.payment.api.Either;
+import com.ning.billing.payment.api.PaymentApi;
+import com.ning.billing.payment.api.PaymentAttempt;
+import com.ning.billing.payment.api.PaymentError;
+import com.ning.billing.payment.api.PaymentInfo;
+import com.ning.billing.payment.api.PaymentStatus;
+import com.ning.billing.payment.dao.PaymentDao;
+import com.ning.billing.payment.provider.MockPaymentProviderPlugin;
+import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
+import com.ning.billing.payment.setup.PaymentConfig;
+import com.ning.billing.payment.setup.PaymentTestModuleWithMocks;
+import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.notificationq.MockNotificationQueue;
+import com.ning.billing.util.notificationq.Notification;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
+@Guice(modules = { PaymentTestModuleWithMocks.class, AccountModuleWithMocks.class, InvoiceModuleWithMocks.class })
+@Test(groups = "fast")
+public class TestRetryService {
+    @Inject
+    private PaymentConfig paymentConfig;
+    @Inject
+    private Bus eventBus;
+    @Inject
+    private PaymentApi paymentApi;
+    @Inject
+    private TestHelper testHelper;
+    @Inject
+    private PaymentProviderPluginRegistry registry;
+    @Inject
+    private PaymentDao paymentDao;
+    @Inject
+    private RetryService retryService;
+    @Inject
+    private NotificationQueueService notificationQueueService;
+
+    private MockPaymentProviderPlugin mockPaymentProviderPlugin;
+    private MockNotificationQueue mockNotificationQueue;
+
+    @BeforeClass(alwaysRun = true)
+    public void initialize() throws Exception {
+        retryService.initialize();
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws Exception {
+        eventBus.start();
+        retryService.start();
+
+        mockPaymentProviderPlugin = (MockPaymentProviderPlugin)registry.getPlugin(null);
+        mockNotificationQueue = (MockNotificationQueue)notificationQueueService.getNotificationQueue(RetryService.SERVICE_NAME, RetryService.QUEUE_NAME);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        retryService.stop();
+        eventBus.stop();
+    }
+
+    @Test
+    public void testSchedulesRetry() throws Exception {
+        final DateTime now = new DateTime(DateTimeZone.UTC);
+        final Account account = testHelper.createTestCreditCardAccount();
+        final Invoice invoice = testHelper.createTestInvoice(account, now, Currency.USD);
+        final BigDecimal amount = new BigDecimal("10.00");
+        final UUID subscriptionId = UUID.randomUUID();
+
+        invoice.addInvoiceItem(new RecurringInvoiceItem(invoice.getId(),
+                                                       subscriptionId,
+                                                       "test plan", "test phase",
+                                                       now,
+                                                       now.plusMonths(1),
+                                                       amount,
+                                                       new BigDecimal("1.0"),
+                                                       Currency.USD,
+                                                       new DateTime(DateTimeZone.UTC),
+                                                       new DateTime(DateTimeZone.UTC)));
+
+        mockPaymentProviderPlugin.makeNextInvoiceFail();
+
+        List<Either<PaymentError, PaymentInfo>> results = paymentApi.createPayment(account.getExternalKey(), Arrays.asList(invoice.getId().toString()));
+
+        assertEquals(results.size(), 1);
+        assertTrue(results.get(0).isLeft());
+
+        List<Notification> pendingNotifications = mockNotificationQueue.getPendingEvents();
+
+        assertEquals(pendingNotifications.size(), 1);
+
+        Notification notification = pendingNotifications.get(0);
+        PaymentAttempt paymentAttempt = paymentApi.getPaymentAttemptForInvoiceId(invoice.getId().toString());
+
+        assertNotNull(paymentAttempt);
+        assertEquals(notification.getNotificationKey(), paymentAttempt.getPaymentAttemptId().toString());
+        assertEquals(paymentAttempt.getRetryCount(), new Integer(1));
+
+        DateTime expectedRetryDate = paymentAttempt.getPaymentAttemptDate().plusDays(paymentConfig.getPaymentRetryDays().get(0));
+
+        assertEquals(notification.getEffectiveDate(), expectedRetryDate);
+        assertEquals(paymentAttempt.getNextRetryDate(), expectedRetryDate);
+    }
+
+    @Test
+    public void testRetries() throws Exception {
+        final DateTime now = new DateTime(DateTimeZone.UTC);
+        final Account account = testHelper.createTestCreditCardAccount();
+        final Invoice invoice = testHelper.createTestInvoice(account, now, Currency.USD);
+        final BigDecimal amount = new BigDecimal("10.00");
+        final UUID subscriptionId = UUID.randomUUID();
+
+        invoice.addInvoiceItem(new RecurringInvoiceItem(invoice.getId(),
+                                                       subscriptionId,
+                                                       "test plan", "test phase",
+                                                       now,
+                                                       now.plusMonths(1),
+                                                       amount,
+                                                       new BigDecimal("1.0"),
+                                                       Currency.USD,
+                                                       new DateTime(DateTimeZone.UTC),
+                                                       new DateTime(DateTimeZone.UTC)));
+
+        DateTime nextRetryDate = new DateTime(DateTimeZone.UTC).minusDays(1);
+        DateTime paymentAttemptDate = nextRetryDate.minusDays(paymentConfig.getPaymentRetryDays().get(0));
+        PaymentAttempt paymentAttempt = new PaymentAttempt(UUID.randomUUID(), invoice).cloner()
+                                                                                      .setRetryCount(1)
+                                                                                      .setPaymentAttemptDate(paymentAttemptDate)
+                                                                                      .setNextRetryDate(nextRetryDate)
+                                                                                      .build();
+
+        paymentDao.createPaymentAttempt(paymentAttempt);
+        retryService.scheduleRetry(paymentAttempt, nextRetryDate);
+
+        // wait a little to give the queue time to process
+        Thread.sleep(paymentConfig.getNotificationSleepTimeMs() * 2);
+
+        List<Notification> pendingNotifications = mockNotificationQueue.getPendingEvents();
+
+        assertEquals(pendingNotifications.size(), 0);
+
+        List<PaymentInfo> paymentInfos = paymentApi.getPaymentInfo(Arrays.asList(invoice.getId().toString()));
+
+        assertEquals(paymentInfos.size(), 1);
+
+        PaymentInfo paymentInfo = paymentInfos.get(0);
+
+        assertEquals(paymentInfo.getStatus(), PaymentStatus.Processed.toString());
+
+        PaymentAttempt updatedAttempt = paymentApi.getPaymentAttemptForInvoiceId(invoice.getId().toString());
+
+        assertEquals(updatedAttempt.getPaymentAttemptId(), paymentAttempt.getPaymentAttemptId());
+        assertEquals(paymentInfo.getPaymentId(), updatedAttempt.getPaymentId());
+
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 8e2aaf8..c0c88fe 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.notificationq;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
@@ -64,6 +65,12 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     }
 
     @Override
+    public void recordFutureNotification(DateTime futureNotificationTime, NotificationKey notificationKey) {
+        Notification notification = new DefaultNotification(getFullQName(), notificationKey.toString(), futureNotificationTime);
+        dao.insertNotification(notification);
+    }
+
+    @Override
     public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
             final DateTime futureNotificationTime, final NotificationKey notificationKey) {
         NotificationSqlDao transactionalNotificationDao =  transactionalDao.become(NotificationSqlDao.class);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index e1dcdbf..fb88d4c 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -23,9 +23,18 @@ import com.ning.billing.util.notificationq.NotificationQueueService.Notification
 
 public interface NotificationQueue {
 
-    /**
+   /**
+    *
+    * Record the need to be called back when the notification is ready
+    *
+    * @param futureNotificationTime the time at which the notification is ready
+    * @param notificationKey the key for that notification
+    */
+   public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey);
+
+   /**
     *
-    *  Record from within a transaction the need to be called back when the notification is ready
+    * Record from within a transaction the need to be called back when the notification is ready
     *
     * @param transactionalDao the transactionalDao
     * @param futureNotificationTime the time at which the notification is ready
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index e96d2cf..b76a8ad 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -49,9 +49,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    public void recordFutureNotificationFromTransaction(
-            Transmogrifier transactionalDao, DateTime futureNotificationTime,
-            NotificationKey notificationKey) {
+    public void recordFutureNotification(DateTime futureNotificationTime, NotificationKey notificationKey) {
         Notification notification = new DefaultNotification("MockQueue", notificationKey.toString(), futureNotificationTime);
         synchronized(notifications) {
             notifications.add(notification);
@@ -59,6 +57,24 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
+    public void recordFutureNotificationFromTransaction(
+            Transmogrifier transactionalDao, DateTime futureNotificationTime,
+            NotificationKey notificationKey) {
+        recordFutureNotification(futureNotificationTime, notificationKey);
+    }
+
+    public List<Notification> getPendingEvents() {
+        List<Notification> result = new ArrayList<Notification>();
+
+        for (Notification notification : notifications) {
+            if (notification.getProcessingState() == NotificationLifecycleState.AVAILABLE) {
+                result.add(notification);
+            }
+        }
+        return result;
+    }
+
+    @Override
     protected int doProcessEvents(int sequenceId) {
 
         int result = 0;