killbill-aplcache

Merge branch 'integration' of github.com:ning/killbill

3/2/2012 6:53:32 PM

Changes

analytics/pom.xml 10(+10 -0)

Details

analytics/pom.xml 10(+10 -0)

diff --git a/analytics/pom.xml b/analytics/pom.xml
index 10f94d4..ad46c31 100644
--- a/analytics/pom.xml
+++ b/analytics/pom.xml
@@ -89,6 +89,16 @@
         </dependency>
         <dependency>
             <groupId>com.ning.billing</groupId>
+            <artifactId>killbill-invoice</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.ning.billing</groupId>
+            <artifactId>killbill-payment</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.ning.billing</groupId>
             <artifactId>killbill-util</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java b/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
index 1735062..ba9dc2a 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
@@ -22,22 +22,22 @@ import com.ning.billing.account.api.AccountApiException;
 import com.ning.billing.account.api.AccountChangeNotification;
 import com.ning.billing.account.api.AccountCreationNotification;
 import com.ning.billing.entitlement.api.user.SubscriptionTransition;
+import com.ning.billing.invoice.api.InvoiceCreationNotification;
+import com.ning.billing.payment.api.PaymentError;
+import com.ning.billing.payment.api.PaymentInfo;
 
-public class AnalyticsListener
-{
+public class AnalyticsListener {
     private final BusinessSubscriptionTransitionRecorder bstRecorder;
     private final BusinessAccountRecorder bacRecorder;
 
     @Inject
-    public AnalyticsListener(final BusinessSubscriptionTransitionRecorder bstRecorder, final BusinessAccountRecorder bacRecorder)
-    {
+    public AnalyticsListener(final BusinessSubscriptionTransitionRecorder bstRecorder, final BusinessAccountRecorder bacRecorder) {
         this.bstRecorder = bstRecorder;
         this.bacRecorder = bacRecorder;
     }
 
     @Subscribe
-    public void handleSubscriptionTransitionChange(final SubscriptionTransition event) throws AccountApiException
-    {
+    public void handleSubscriptionTransitionChange(final SubscriptionTransition event) throws AccountApiException {
         switch (event.getTransitionType()) {
             case MIGRATE_ENTITLEMENT:
                 // TODO do nothing for now
@@ -68,18 +68,31 @@ public class AnalyticsListener
     }
 
     @Subscribe
-    public void handleAccountCreation(final AccountCreationNotification event)
-    {
+    public void handleAccountCreation(final AccountCreationNotification event) {
         bacRecorder.accountCreated(event.getData());
     }
 
     @Subscribe
-    public void handleAccountChange(final AccountChangeNotification event)
-    {
+    public void handleAccountChange(final AccountChangeNotification event) {
         if (!event.hasChanges()) {
             return;
         }
 
         bacRecorder.accountUpdated(event.getAccountId(), event.getChangedFields());
     }
+
+    @Subscribe
+    public void handleInvoice(final InvoiceCreationNotification event) {
+        bacRecorder.accountUpdated(event.getAccountId());
+    }
+
+    @Subscribe
+    public void handlePaymentInfo(final PaymentInfo paymentInfo) {
+        bacRecorder.accountUpdated(paymentInfo);
+    }
+
+    @Subscribe
+    public void handlePaymentError(final PaymentError paymentError) {
+        // TODO - we can't tie the error back to an account yet
+    }
 }
diff --git a/analytics/src/main/java/com/ning/billing/analytics/BusinessAccountRecorder.java b/analytics/src/main/java/com/ning/billing/analytics/BusinessAccountRecorder.java
index f7081c7..6832a2b 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/BusinessAccountRecorder.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/BusinessAccountRecorder.java
@@ -22,57 +22,169 @@ import com.ning.billing.account.api.AccountData;
 import com.ning.billing.account.api.AccountUserApi;
 import com.ning.billing.account.api.ChangedField;
 import com.ning.billing.analytics.dao.BusinessAccountDao;
+import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.invoice.api.InvoiceUserApi;
+import com.ning.billing.payment.api.PaymentApi;
+import com.ning.billing.payment.api.PaymentAttempt;
+import com.ning.billing.payment.api.PaymentInfo;
 import com.ning.billing.util.tag.Tag;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-public class BusinessAccountRecorder
-{
+public class BusinessAccountRecorder {
     private static final Logger log = LoggerFactory.getLogger(BusinessAccountRecorder.class);
 
     private final BusinessAccountDao dao;
     private final AccountUserApi accountApi;
+    private final InvoiceUserApi invoiceUserApi;
+    private final PaymentApi paymentApi;
 
     @Inject
-    public BusinessAccountRecorder(final BusinessAccountDao dao, final AccountUserApi accountApi)
-    {
+    public BusinessAccountRecorder(final BusinessAccountDao dao, final AccountUserApi accountApi, final InvoiceUserApi invoiceUserApi, final PaymentApi paymentApi) {
         this.dao = dao;
         this.accountApi = accountApi;
+        this.invoiceUserApi = invoiceUserApi;
+        this.paymentApi = paymentApi;
     }
 
-    public void accountCreated(final AccountData data)
-    {
+    public void accountCreated(final AccountData data) {
         final Account account = accountApi.getAccountByKey(data.getExternalKey());
+        final BusinessAccount bac = createBusinessAccountFromAccount(account);
 
+        log.info("ACCOUNT CREATION " + bac);
+        dao.createAccount(bac);
+    }
+
+    /**
+     * Notification handler for Account changes
+     *
+     * @param accountId     account id changed
+     * @param changedFields list of changed fields
+     */
+    public void accountUpdated(final UUID accountId, final List<ChangedField> changedFields) {
+        // None of the fields updated interest us so far - see DefaultAccountChangeNotification
+        // TODO We'll need notifications for tags changes eventually
+    }
+
+    /**
+     * Notification handler for Payment creations
+     *
+     * @param paymentInfo payment object (from the payment plugin)
+     */
+    public void accountUpdated(final PaymentInfo paymentInfo) {
+        final PaymentAttempt paymentAttempt = paymentApi.getPaymentAttemptForPaymentId(paymentInfo.getPaymentId());
+        if (paymentAttempt == null) {
+            return;
+        }
+
+        final Account account = accountApi.getAccountById(paymentAttempt.getAccountId());
+        if (account == null) {
+            return;
+        }
+
+        accountUpdated(account.getId());
+    }
+
+    /**
+     * Notification handler for Invoice creations
+     *
+     * @param accountId account id associated with the created invoice
+     */
+    public void accountUpdated(final UUID accountId) {
+        final Account account = accountApi.getAccountById(accountId);
+
+        if (account == null) {
+            log.warn("Couldn't find account {}", accountId);
+            return;
+        }
+
+        BusinessAccount bac = dao.getAccount(account.getExternalKey());
+        if (bac == null) {
+            bac = createBusinessAccountFromAccount(account);
+            log.info("ACCOUNT CREATION " + bac);
+            dao.createAccount(bac);
+        } else {
+            updateBusinessAccountFromAccount(account, bac);
+            log.info("ACCOUNT UPDATE " + bac);
+            dao.saveAccount(bac);
+        }
+    }
+
+    private BusinessAccount createBusinessAccountFromAccount(final Account account) {
         final List<String> tags = new ArrayList<String>();
         for (final Tag tag : account.getTagList()) {
             tags.add(tag.getTagDefinitionName());
         }
 
-        // TODO Need payment and invoice api to fill most fields
         final BusinessAccount bac = new BusinessAccount(
-            account.getExternalKey(),
-            null, // TODO
-            tags,
-            null, // TODO
-            null, // TODO
-            null, // TODO
-            null, // TODO
-            null, // TODO
-            null // TODO
+                account.getExternalKey(),
+                invoiceUserApi.getAccountBalance(account.getId()),
+                tags,
+                // These fields will be updated below
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
         );
+        updateBusinessAccountFromAccount(account, bac);
 
-        log.info("ACCOUNT CREATION " + bac);
-        dao.createAccount(bac);
+        return bac;
     }
 
-    public void accountUpdated(final UUID accountId, final List<ChangedField> changedFields)
-    {
-        // None of the fields updated interest us so far - see DefaultAccountChangeNotification
-        // TODO We'll need notifications for tags changes eventually
+    private void updateBusinessAccountFromAccount(final Account account, final BusinessAccount bac) {
+        DateTime lastInvoiceDate = null;
+        BigDecimal totalInvoiceBalance = BigDecimal.ZERO;
+        String lastPaymentStatus = null;
+        String paymentMethod = null;
+        String creditCardType = null;
+        String billingAddressCountry = null;
+
+        // Retrieve invoices information
+        final List<Invoice> invoices = invoiceUserApi.getInvoicesByAccount(account.getId());
+        if (invoices != null && invoices.size() > 0) {
+            final List<String> invoiceIds = new ArrayList<String>();
+            for (final Invoice invoice : invoices) {
+                invoiceIds.add(invoice.getId().toString());
+                totalInvoiceBalance = totalInvoiceBalance.add(invoice.getBalance());
+
+                if (lastInvoiceDate == null || invoice.getInvoiceDate().isAfter(lastInvoiceDate)) {
+                    lastInvoiceDate = invoice.getInvoiceDate();
+                }
+            }
+
+            // Retrieve payments information for these invoices
+            DateTime lastPaymentDate = null;
+            final List<PaymentInfo> payments = paymentApi.getPaymentInfo(invoiceIds);
+            if (payments != null) {
+                for (final PaymentInfo payment : payments) {
+                    // Use the last payment method/type/country as the default one for the account
+                    if (lastPaymentDate == null || payment.getCreatedDate().isAfter(lastPaymentDate)) {
+                        lastPaymentDate = payment.getCreatedDate();
+
+                        lastPaymentStatus = payment.getStatus();
+                        paymentMethod = payment.getPaymentMethod();
+                        creditCardType = payment.getCardType();
+                        billingAddressCountry = payment.getCardCountry();
+                    }
+                }
+            }
+        }
+
+        bac.setLastPaymentStatus(lastPaymentStatus);
+        bac.setPaymentMethod(paymentMethod);
+        bac.setCreditCardType(creditCardType);
+        bac.setBillingAddressCountry(billingAddressCountry);
+        bac.setLastInvoiceDate(lastInvoiceDate);
+        bac.setTotalInvoiceBalance(totalInvoiceBalance);
+
+        bac.setBalance(invoiceUserApi.getAccountBalance(account.getId()));
     }
 }
diff --git a/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java b/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java
index 25496b5..5c7ffae 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.analytics;
 
+import com.ning.billing.invoice.glue.InvoiceModule;
+import com.ning.billing.payment.setup.PaymentModule;
 import org.skife.jdbi.v2.IDBI;
 import com.ning.billing.account.glue.AccountModule;
 import com.ning.billing.analytics.setup.AnalyticsModule;
@@ -41,6 +43,8 @@ public class AnalyticsTestModule extends AnalyticsModule
         install(new CatalogModule());
         install(new BusModule());
         install(new EntitlementModule());
+        install(new InvoiceModule());
+        install(new PaymentModule());
         install(new ClockModule());
         install(new TagStoreModule());
         install(new NotificationQueueModule());
diff --git a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
index c413f0d..adf685f 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
@@ -47,14 +47,23 @@ import com.ning.billing.entitlement.api.user.SubscriptionTransition;
 import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.user.ApiEventType;
+import com.ning.billing.invoice.api.InvoiceCreationNotification;
+import com.ning.billing.invoice.api.user.DefaultInvoiceCreationNotification;
+import com.ning.billing.invoice.dao.InvoiceDao;
+import com.ning.billing.invoice.model.DefaultInvoice;
+import com.ning.billing.invoice.model.FixedPriceInvoiceItem;
+import com.ning.billing.payment.api.PaymentAttempt;
+import com.ning.billing.payment.api.PaymentInfo;
+import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.tag.DescriptiveTag;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.clock.DefaultClock;
 import com.ning.billing.util.tag.DefaultTagDefinition;
+import com.ning.billing.util.tag.DescriptiveTag;
 import com.ning.billing.util.tag.Tag;
 import com.ning.billing.util.tag.dao.TagDefinitionSqlDao;
 import org.apache.commons.io.IOUtils;
 import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -62,21 +71,28 @@ import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
 import static org.testng.Assert.fail;
 
 @Guice(modules = AnalyticsTestModule.class)
-public class TestAnalyticsService
-{
+public class TestAnalyticsService {
     private static final UUID ID = UUID.randomUUID();
     private static final String KEY = "12345";
     private static final String ACCOUNT_KEY = "pierre-12345";
+    private static final Currency ACCOUNT_CURRENCY = Currency.EUR;
     private static final DefaultTagDefinition TAG_ONE = new DefaultTagDefinition("batch20", "something", "pierre");
     private static final DefaultTagDefinition TAG_TWO = new DefaultTagDefinition("awesome", "something", "pierre");
+    private static final BigDecimal INVOICE_AMOUNT = BigDecimal.valueOf(1243.11);
+    private static final String PAYMENT_METHOD = "Paypal";
+    private static final String CARD_COUNTRY = "France";
+
+    private final Clock clock = new DefaultClock();
 
     @Inject
     private AccountUserApi accountApi;
@@ -88,6 +104,12 @@ public class TestAnalyticsService
     private TagDefinitionSqlDao tagDao;
 
     @Inject
+    private InvoiceDao invoiceDao;
+
+    @Inject
+    private PaymentDao paymentDao;
+
+    @Inject
     private DefaultAnalyticsService service;
 
     @Inject
@@ -106,50 +128,54 @@ public class TestAnalyticsService
     private BusinessSubscriptionTransition expectedTransition;
 
     private AccountCreationNotification accountCreationNotification;
+    private InvoiceCreationNotification invoiceCreationNotification;
+    private PaymentInfo paymentInfoNotification;
 
     @BeforeClass(alwaysRun = true)
-    public void startMysql() throws IOException, ClassNotFoundException, SQLException, EntitlementUserApiException
-    {
+    public void startMysql() throws IOException, ClassNotFoundException, SQLException, EntitlementUserApiException {
         // Killbill generic setup
         setupBusAndMySQL();
 
         tagDao.create(TAG_ONE);
         tagDao.create(TAG_TWO);
 
-        final MockAccount account = new MockAccount(UUID.randomUUID(), ACCOUNT_KEY, Currency.USD);
+        final MockAccount account = new MockAccount(UUID.randomUUID(), ACCOUNT_KEY, ACCOUNT_CURRENCY);
         try {
-            List<Tag> tags = new ArrayList<Tag>();
-            tags.add(new DescriptiveTag(TAG_ONE, "pierre", new DateTime(DateTimeZone.UTC)));
-            tags.add(new DescriptiveTag(TAG_TWO, "pierre", new DateTime(DateTimeZone.UTC)));
+            final List<Tag> tags = new ArrayList<Tag>();
+            tags.add(new DescriptiveTag(TAG_ONE, "pierre", clock.getUTCNow()));
+            tags.add(new DescriptiveTag(TAG_TWO, "pierre", clock.getUTCNow()));
 
             final Account storedAccount = accountApi.createAccount(account, null, tags);
 
             // Create events for the bus and expected results
             createSubscriptionTransitionEvent(storedAccount);
             createAccountCreationEvent(storedAccount);
+            createInvoiceAndPaymentCreationEvents(storedAccount);
         } catch (Throwable t) {
             fail("Initializing accounts failed.", t);
         }
     }
 
-    private void setupBusAndMySQL() throws IOException
-    {
+    private void setupBusAndMySQL() throws IOException {
         bus.start();
 
         final String analyticsDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/analytics/ddl.sql"));
         final String accountDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/account/ddl.sql"));
         final String entitlementDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/entitlement/ddl.sql"));
+        final String invoiceDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/invoice/ddl.sql"));
+        final String paymentDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/payment/ddl.sql"));
         final String utilDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
 
         helper.startMysql();
         helper.initDb(analyticsDdl);
         helper.initDb(accountDdl);
         helper.initDb(entitlementDdl);
+        helper.initDb(invoiceDdl);
+        helper.initDb(paymentDdl);
         helper.initDb(utilDdl);
     }
 
-    private void createSubscriptionTransitionEvent(final Account account) throws EntitlementUserApiException
-    {
+    private void createSubscriptionTransitionEvent(final Account account) throws EntitlementUserApiException {
         final SubscriptionBundle bundle = entitlementApi.createBundleForAccount(account.getId(), KEY);
 
         // Verify we correctly initialized the account subsystem
@@ -161,63 +187,85 @@ public class TestAnalyticsService
         final Plan plan = new MockPlan("platinum-monthly", product);
         final PlanPhase phase = new MockPhase(PhaseType.EVERGREEN, plan, MockDuration.UNLIMITED(), 25.95);
         final UUID subscriptionId = UUID.randomUUID();
-        final DateTime effectiveTransitionTime = new DateTime(DateTimeZone.UTC);
-        final DateTime requestedTransitionTime = new DateTime(DateTimeZone.UTC);
+        final DateTime effectiveTransitionTime = clock.getUTCNow();
+        final DateTime requestedTransitionTime = clock.getUTCNow();
         final String priceList = "something";
 
         transition = new SubscriptionTransitionData(
-            ID,
-            subscriptionId,
-            bundle.getId(),
-            EntitlementEvent.EventType.API_USER,
-            ApiEventType.CREATE,
-            requestedTransitionTime,
-            effectiveTransitionTime,
-            null,
-            null,
-            null,
-            null,
-            Subscription.SubscriptionState.ACTIVE,
-            plan,
-            phase,
-            priceList
+                ID,
+                subscriptionId,
+                bundle.getId(),
+                EntitlementEvent.EventType.API_USER,
+                ApiEventType.CREATE,
+                requestedTransitionTime,
+                effectiveTransitionTime,
+                null,
+                null,
+                null,
+                null,
+                Subscription.SubscriptionState.ACTIVE,
+                plan,
+                phase,
+                priceList
         );
         expectedTransition = new BusinessSubscriptionTransition(
-            ID,
-            KEY,
-            ACCOUNT_KEY,
-            requestedTransitionTime,
-            BusinessSubscriptionEvent.subscriptionCreated(plan),
-            null,
-            new BusinessSubscription(priceList, plan, phase, Currency.USD, effectiveTransitionTime, Subscription.SubscriptionState.ACTIVE, subscriptionId, bundle.getId())
+                ID,
+                KEY,
+                ACCOUNT_KEY,
+                requestedTransitionTime,
+                BusinessSubscriptionEvent.subscriptionCreated(plan),
+                null,
+                new BusinessSubscription(priceList, plan, phase, ACCOUNT_CURRENCY, effectiveTransitionTime, Subscription.SubscriptionState.ACTIVE, subscriptionId, bundle.getId())
         );
     }
 
-    private void createAccountCreationEvent(final Account account)
-    {
+    private void createAccountCreationEvent(final Account account) {
         accountCreationNotification = new DefaultAccountCreationEvent(account);
     }
 
+    private void createInvoiceAndPaymentCreationEvents(final Account account) {
+        final DefaultInvoice invoice = new DefaultInvoice(account.getId(), clock.getUTCNow(), ACCOUNT_CURRENCY, clock);
+        final FixedPriceInvoiceItem invoiceItem = new FixedPriceInvoiceItem(
+                UUID.randomUUID(), invoice.getId(), UUID.randomUUID(), "somePlan", "somePhase", clock.getUTCNow(), clock.getUTCNow().plusDays(1),
+                INVOICE_AMOUNT, ACCOUNT_CURRENCY, clock.getUTCNow(), clock.getUTCNow()
+        );
+        invoice.addInvoiceItem(invoiceItem);
+
+        invoiceDao.create(invoice);
+        Assert.assertEquals(invoiceDao.getInvoicesByAccount(account.getId()).size(), 1);
+        Assert.assertEquals(invoiceDao.getInvoicesByAccount(account.getId()).get(0).getInvoiceItems().size(), 1);
+
+        // It doesn't really matter what the events contain - the listener will go back to the db
+        invoiceCreationNotification = new DefaultInvoiceCreationNotification(invoice.getId(), account.getId(),
+                INVOICE_AMOUNT, ACCOUNT_CURRENCY, clock.getUTCNow());
+
+        paymentInfoNotification = new PaymentInfo.Builder().setPaymentId(UUID.randomUUID().toString()).setPaymentMethod(PAYMENT_METHOD).setCardCountry(CARD_COUNTRY).build();
+        final PaymentAttempt paymentAttempt = new PaymentAttempt(UUID.randomUUID(), invoice.getId(), account.getId(), BigDecimal.TEN,
+                ACCOUNT_CURRENCY, clock.getUTCNow(), clock.getUTCNow(), paymentInfoNotification.getPaymentId(), 1, clock.getUTCNow().plusDays(1));
+        paymentDao.createPaymentAttempt(paymentAttempt);
+        paymentDao.savePaymentInfo(paymentInfoNotification);
+        Assert.assertEquals(paymentDao.getPaymentInfo(Arrays.asList(invoice.getId().toString())).size(), 1);
+    }
+
     @AfterClass(alwaysRun = true)
-    public void stopMysql()
-    {
+    public void stopMysql() {
         helper.stopMysql();
     }
 
     @Test(groups = "slow")
-    public void testRegisterForNotifications() throws Exception
-    {
+    public void testRegisterForNotifications() throws Exception {
         // Make sure the service has been instantiated
         Assert.assertEquals(service.getName(), "analytics-service");
 
         // Test the bus and make sure we can register our service
         try {
             service.registerForNotifications();
-        }
-        catch (Throwable t) {
+        } catch (Throwable t) {
             Assert.fail("Unable to start the bus or service! " + t);
         }
 
+        Assert.assertNull(accountDao.getAccount(ACCOUNT_KEY));
+
         // Send events and wait for the async part...
         bus.post(transition);
         bus.post(accountCreationNotification);
@@ -231,11 +279,24 @@ public class TestAnalyticsService
         Assert.assertTrue(accountDao.getAccount(ACCOUNT_KEY).getTags().indexOf(TAG_ONE.getName()) != -1);
         Assert.assertTrue(accountDao.getAccount(ACCOUNT_KEY).getTags().indexOf(TAG_TWO.getName()) != -1);
 
+        // Test invoice integration - the account creation notification has triggered a BAC update
+        Assert.assertTrue(accountDao.getAccount(ACCOUNT_KEY).getTotalInvoiceBalance().compareTo(INVOICE_AMOUNT) == 0);
+
+        // Post the same invoice event again - the invoice balance shouldn't change
+        bus.post(invoiceCreationNotification);
+        Thread.sleep(1000);
+        Assert.assertTrue(accountDao.getAccount(ACCOUNT_KEY).getTotalInvoiceBalance().compareTo(INVOICE_AMOUNT) == 0);
+
+        // Test payment integration - the fields have already been populated, just make sure the code is exercised
+        bus.post(paymentInfoNotification);
+        Thread.sleep(1000);
+        Assert.assertEquals(accountDao.getAccount(ACCOUNT_KEY).getPaymentMethod(), PAYMENT_METHOD);
+        Assert.assertEquals(accountDao.getAccount(ACCOUNT_KEY).getBillingAddressCountry(), CARD_COUNTRY);
+
         // Test the shutdown sequence
         try {
             bus.stop();
-        }
-        catch (Throwable t) {
+        } catch (Throwable t) {
             Assert.fail("Unable to stop the bus!");
         }
     }
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java b/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
index fd84927..57c76a0 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
@@ -39,7 +39,7 @@ public interface PaymentApi {
 
     List<Either<PaymentError, PaymentInfo>> createPayment(String accountKey, List<String> invoiceIds);
     List<Either<PaymentError, PaymentInfo>> createPayment(Account account, List<String> invoiceIds);
-    Either<PaymentError, PaymentInfo> createPayment(UUID paymentAttemptId);
+    Either<PaymentError, PaymentInfo> createPaymentForPaymentAttempt(UUID paymentAttemptId);
 
     List<Either<PaymentError, PaymentInfo>> createRefund(Account account, List<String> invoiceIds); //TODO
 
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentError.java b/api/src/main/java/com/ning/billing/payment/api/PaymentError.java
index f1474c8..d9b8c49 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentError.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentError.java
@@ -15,6 +15,8 @@
  */
 
 package com.ning.billing.payment.api;
+import java.util.UUID;
+
 import org.codehaus.jackson.annotate.JsonTypeInfo;
 import org.codehaus.jackson.annotate.JsonTypeInfo.Id;
 
@@ -24,15 +26,21 @@ import com.ning.billing.util.bus.BusEvent;
 public class PaymentError implements BusEvent {
     private final String type;
     private final String message;
+    private final UUID accountId;
+    private final UUID invoiceId;
 
-    public PaymentError(PaymentError src) {
+    public PaymentError(PaymentError src, UUID accountId, UUID invoiceId) {
         this.type = src.type;
         this.message = src.message;
+        this.accountId = accountId;
+        this.invoiceId = invoiceId;
     }
 
-    public PaymentError(String type, String message) {
+    public PaymentError(String type, String message, UUID accountId, UUID invoiceId) {
         this.type = type;
         this.message = message;
+        this.accountId = accountId;
+        this.invoiceId = invoiceId;
     }
 
     public String getType() {
@@ -43,10 +51,20 @@ public class PaymentError implements BusEvent {
         return message;
     }
 
+    public UUID getInvoiceId() {
+        return invoiceId;
+    }
+
+    public UUID getAccountId() {
+        return accountId;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;
+        result = prime * result + ((accountId == null) ? 0 : accountId.hashCode());
+        result = prime * result + ((invoiceId == null) ? 0 : invoiceId.hashCode());
         result = prime * result + ((message == null) ? 0 : message.hashCode());
         result = prime * result + ((type == null) ? 0 : type.hashCode());
         return result;
@@ -61,6 +79,18 @@ public class PaymentError implements BusEvent {
         if (getClass() != obj.getClass())
             return false;
         PaymentError other = (PaymentError) obj;
+        if (accountId == null) {
+            if (other.accountId != null)
+                return false;
+        }
+        else if (!accountId.equals(other.accountId))
+            return false;
+        if (invoiceId == null) {
+            if (other.invoiceId != null)
+                return false;
+        }
+        else if (!invoiceId.equals(other.invoiceId))
+            return false;
         if (message == null) {
             if (other.message != null)
                 return false;
@@ -78,6 +108,7 @@ public class PaymentError implements BusEvent {
 
     @Override
     public String toString() {
-        return "PaymentError [type=" + type + ", message=" + message + "]";
+        return "PaymentError [type=" + type + ", message=" + message + ", accountId=" + accountId + ", invoiceId=" + invoiceId + "]";
     }
+
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
index 3b1a6f9..4dd5040 100644
--- a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
+++ b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
@@ -25,7 +25,6 @@ import javax.annotation.Nullable;
 
 import org.apache.commons.lang.StringUtils;
 import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +34,7 @@ import com.ning.billing.account.api.AccountUserApi;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoicePaymentApi;
 import com.ning.billing.invoice.model.DefaultInvoicePayment;
+import com.ning.billing.payment.RetryService;
 import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.provider.PaymentProviderPlugin;
 import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
@@ -44,6 +44,7 @@ public class DefaultPaymentApi implements PaymentApi {
     private final PaymentProviderPluginRegistry pluginRegistry;
     private final AccountUserApi accountUserApi;
     private final InvoicePaymentApi invoicePaymentApi;
+    private final RetryService retryService;
     private final PaymentDao paymentDao;
     private final PaymentConfig config;
 
@@ -53,11 +54,13 @@ public class DefaultPaymentApi implements PaymentApi {
     public DefaultPaymentApi(PaymentProviderPluginRegistry pluginRegistry,
                              AccountUserApi accountUserApi,
                              InvoicePaymentApi invoicePaymentApi,
+                             RetryService retryService,
                              PaymentDao paymentDao,
                              PaymentConfig config) {
         this.pluginRegistry = pluginRegistry;
         this.accountUserApi = accountUserApi;
         this.invoicePaymentApi = invoicePaymentApi;
+        this.retryService = retryService;
         this.paymentDao = paymentDao;
         this.config = config;
     }
@@ -134,7 +137,7 @@ public class DefaultPaymentApi implements PaymentApi {
     }
 
     @Override
-    public Either<PaymentError, PaymentInfo> createPayment(UUID paymentAttemptId) {
+    public Either<PaymentError, PaymentInfo> createPaymentForPaymentAttempt(UUID paymentAttemptId) {
         PaymentAttempt paymentAttempt = paymentDao.getPaymentAttemptById(paymentAttemptId);
 
         if (paymentAttempt != null) {
@@ -142,17 +145,23 @@ public class DefaultPaymentApi implements PaymentApi {
             Account account = accountUserApi.getAccountById(paymentAttempt.getAccountId());
 
             if (invoice != null && account != null) {
-                if (invoice.getBalance().compareTo(BigDecimal.ZERO) == 0 ) {
+                if (invoice.getBalance().compareTo(BigDecimal.ZERO) <= 0 ) {
                     // TODO: send a notification that invoice was ignored?
                     log.info("Received invoice for payment with outstanding amount of 0 {} ", invoice);
-                    Either.left(new PaymentError("invoice_balance_0", "Invoice balance was 0"));
+                    return Either.left(new PaymentError("invoice_balance_0",
+                                                        "Invoice balance was 0 or less",
+                                                        paymentAttempt.getAccountId(),
+                                                        paymentAttempt.getInvoiceId()));
                 }
                 else {
                     return processPayment(getPaymentProviderPlugin(account), account, invoice, paymentAttempt);
                 }
             }
         }
-        return Either.left(new PaymentError("retry_payment_error", "Could not load payment attempt, invoice or account for id " + paymentAttemptId));
+        return Either.left(new PaymentError("retry_payment_error",
+                                            "Could not load payment attempt, invoice or account for id " + paymentAttemptId,
+                                            paymentAttempt.getAccountId(),
+                                            paymentAttempt.getInvoiceId()));
     }
 
     @Override
@@ -164,10 +173,14 @@ public class DefaultPaymentApi implements PaymentApi {
         for (String invoiceId : invoiceIds) {
             Invoice invoice = invoicePaymentApi.getInvoice(UUID.fromString(invoiceId));
 
-            if (invoice.getBalance().compareTo(BigDecimal.ZERO) == 0 ) {
+            if (invoice.getBalance().compareTo(BigDecimal.ZERO) <= 0 ) {
                 // TODO: send a notification that invoice was ignored?
                 log.info("Received invoice for payment with balance of 0 {} ", invoice);
-                Either.left(new PaymentError("invoice_balance_0", "Invoice balance was 0"));
+                Either<PaymentError, PaymentInfo> result = Either.left(new PaymentError("invoice_balance_0",
+                                                                                        "Invoice balance was 0 or less",
+                                                                                        account.getId(),
+                                                                                        UUID.fromString(invoiceId)));
+                processedPaymentsOrErrors.add(result);
             }
             else {
                 PaymentAttempt paymentAttempt = paymentDao.createPaymentAttempt(invoice);
@@ -234,7 +247,7 @@ public class DefaultPaymentApi implements PaymentApi {
 
         if (retryCount < retryDays.size()) {
             int retryInDays = 0;
-            DateTime nextRetryDate = new DateTime(DateTimeZone.UTC);
+            DateTime nextRetryDate = paymentAttempt.getPaymentAttemptDate();
 
             try {
                 retryInDays = retryDays.get(retryCount);
@@ -244,6 +257,7 @@ public class DefaultPaymentApi implements PaymentApi {
                 log.error("Could not get retry day for retry count {}", retryCount);
             }
 
+            retryService.scheduleRetry(paymentAttempt, nextRetryDate);
             paymentDao.updatePaymentAttemptWithRetryInfo(paymentAttempt.getPaymentAttemptId(), retryCount + 1, nextRetryDate);
         }
         else if (retryCount == retryDays.size()) {
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/com/ning/billing/payment/dao/DefaultPaymentDao.java
index b505fa0..49a4165 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/DefaultPaymentDao.java
@@ -20,14 +20,15 @@ import java.util.Date;
 import java.util.List;
 import java.util.UUID;
 
-import com.ning.billing.util.clock.Clock;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 
+import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.payment.api.PaymentAttempt;
 import com.ning.billing.payment.api.PaymentInfo;
+import com.ning.billing.util.clock.Clock;
 
 public class DefaultPaymentDao implements PaymentDao {
     private final PaymentSqlDao sqlDao;
@@ -80,16 +81,25 @@ public class DefaultPaymentDao implements PaymentDao {
 
     @Override
     public List<PaymentInfo> getPaymentInfo(List<String> invoiceIds) {
-        return sqlDao.getPaymentInfos(invoiceIds);
+        if (invoiceIds == null || invoiceIds.size() == 0) {
+            return ImmutableList.<PaymentInfo>of();
+        } else {
+            return sqlDao.getPaymentInfos(invoiceIds);
+        }
     }
 
     @Override
     public List<PaymentAttempt> getPaymentAttemptsForInvoiceIds(List<String> invoiceIds) {
-        return sqlDao.getPaymentAttemptsForInvoiceIds(invoiceIds);
+        if (invoiceIds == null || invoiceIds.size() == 0) {
+            return ImmutableList.<PaymentAttempt>of();
+        } else {
+            return sqlDao.getPaymentAttemptsForInvoiceIds(invoiceIds);
+        }
     }
 
     @Override
     public void updatePaymentAttemptWithRetryInfo(UUID paymentAttemptId, int retryCount, DateTime nextRetryDate) {
+
         final Date retryDate = nextRetryDate == null ? null : nextRetryDate.toDate();
         sqlDao.updatePaymentAttemptWithRetryInfo(paymentAttemptId.toString(), retryCount, retryDate, clock.getUTCNow().toDate());
     }
diff --git a/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java b/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java
index 71c7e41..280d1e0 100644
--- a/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java
+++ b/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java
@@ -53,7 +53,10 @@ public class NoOpPaymentProviderPlugin implements PaymentProviderPlugin {
 
     @Override
     public Either<PaymentError, String> createPaymentProviderAccount(Account account) {
-        return Either.left(new PaymentError("unsupported", "Account creation not supported in this plugin"));
+        return Either.left(new PaymentError("unsupported",
+                                            "Account creation not supported in this plugin",
+                                            account.getId(),
+                                            null));
     }
 
     @Override
diff --git a/payment/src/main/java/com/ning/billing/payment/RetryService.java b/payment/src/main/java/com/ning/billing/payment/RetryService.java
index bbe8a61..ee57397 100644
--- a/payment/src/main/java/com/ning/billing/payment/RetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/RetryService.java
@@ -19,7 +19,6 @@ package com.ning.billing.payment;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 import com.google.inject.Inject;
 import com.ning.billing.lifecycle.KillbillService;
@@ -82,7 +81,7 @@ public class RetryService implements KillbillService {
          }
     }
 
-    public void scheduleRetry(Transmogrifier transactionalDao, PaymentAttempt paymentAttempt, DateTime timeOfRetry) {
+    public void scheduleRetry(PaymentAttempt paymentAttempt, DateTime timeOfRetry) {
         final String id = paymentAttempt.getPaymentAttemptId().toString();
 
         NotificationKey key = new NotificationKey() {
@@ -91,14 +90,14 @@ public class RetryService implements KillbillService {
                 return id;
             }
         };
-        retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, key);
+        retryQueue.recordFutureNotification(timeOfRetry, key);
     }
 
     private void retry(String paymentAttemptId) {
         PaymentInfo paymentInfo = paymentApi.getPaymentInfoForPaymentAttemptId(paymentAttemptId);
 
-        if (paymentInfo != null && !PaymentStatus.Processed.equals(PaymentStatus.valueOf(paymentInfo.getStatus()))) {
-            paymentApi.createPayment(UUID.fromString(paymentAttemptId));
+        if (paymentInfo == null || !PaymentStatus.Processed.equals(PaymentStatus.valueOf(paymentInfo.getStatus()))) {
+            paymentApi.createPaymentForPaymentAttempt(UUID.fromString(paymentAttemptId));
         }
     }
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/setup/PaymentModule.java b/payment/src/main/java/com/ning/billing/payment/setup/PaymentModule.java
index 85641ed..d3d9320 100644
--- a/payment/src/main/java/com/ning/billing/payment/setup/PaymentModule.java
+++ b/payment/src/main/java/com/ning/billing/payment/setup/PaymentModule.java
@@ -64,5 +64,6 @@ public class PaymentModule extends AbstractModule {
         bind(PaymentService.class).to(DefaultPaymentService.class).asEagerSingleton();
         installPaymentProviderPlugins(paymentConfig);
         installPaymentDao();
+        installRetryEngine();
     }
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java b/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
index 375bcfd..dee2a28 100644
--- a/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
+++ b/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.RandomStringUtils;
 import org.joda.time.DateTime;
@@ -39,24 +40,34 @@ import com.ning.billing.payment.api.PaymentProviderAccount;
 import com.ning.billing.payment.api.PaypalPaymentMethodInfo;
 
 public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
+    private final AtomicBoolean makeNextInvoiceFail = new AtomicBoolean(false);
     private final Map<String, PaymentInfo> payments = new ConcurrentHashMap<String, PaymentInfo>();
     private final Map<String, PaymentProviderAccount> accounts = new ConcurrentHashMap<String, PaymentProviderAccount>();
     private final Map<String, PaymentMethodInfo> paymentMethods = new ConcurrentHashMap<String, PaymentMethodInfo>();
 
+    public void makeNextInvoiceFail() {
+        makeNextInvoiceFail.set(true);
+    }
+
     @Override
     public Either<PaymentError, PaymentInfo> processInvoice(Account account, Invoice invoice) {
-        PaymentInfo payment = new PaymentInfo.Builder().setPaymentId(UUID.randomUUID().toString())
-                                             .setAmount(invoice.getBalance())
-                                             .setStatus("Processed")
-                                             .setBankIdentificationNumber("1234")
-                                             .setCreatedDate(new DateTime())
-                                             .setEffectiveDate(new DateTime())
-                                             .setPaymentNumber("12345")
-                                             .setReferenceId("12345")
-                                             .setType("Electronic")
-                                             .build();
-        payments.put(payment.getPaymentId(), payment);
-        return Either.right(payment);
+        if (makeNextInvoiceFail.getAndSet(false)) {
+            return Either.left(new PaymentError("unknown", "test error", account.getId(), invoice.getId()));
+        }
+        else {
+            PaymentInfo payment = new PaymentInfo.Builder().setPaymentId(UUID.randomUUID().toString())
+                                                 .setAmount(invoice.getBalance())
+                                                 .setStatus("Processed")
+                                                 .setBankIdentificationNumber("1234")
+                                                 .setCreatedDate(new DateTime())
+                                                 .setEffectiveDate(new DateTime())
+                                                 .setPaymentNumber("12345")
+                                                 .setReferenceId("12345")
+                                                 .setType("Electronic")
+                                                 .build();
+            payments.put(payment.getPaymentId(), payment);
+            return Either.right(payment);
+        }
     }
 
     @Override
@@ -64,7 +75,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
         PaymentInfo payment = payments.get(paymentId);
 
         if (payment == null) {
-            return Either.left(new PaymentError("notfound", "No payment found for id " + paymentId));
+            return Either.left(new PaymentError("notfound", "No payment found for id " + paymentId, null, null));
         }
         else {
             return Either.right(payment);
@@ -83,7 +94,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
             return Either.right(id);
         }
         else {
-            return Either.left(new PaymentError("unknown", "Did not get account to create payment provider account"));
+            return Either.left(new PaymentError("unknown", "Did not get account to create payment provider account", null, null));
         }
     }
 
@@ -93,7 +104,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
             return Either.right(accounts.get(accountKey));
         }
         else {
-            return Either.left(new PaymentError("unknown", "Did not get account for accountKey " + accountKey));
+            return Either.left(new PaymentError("unknown", "Did not get account for accountKey " + accountKey, null, null));
         }
     }
 
@@ -125,7 +136,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
                     realPaymentMethod = new CreditCardPaymentMethodInfo.Builder(ccPaymentMethod).setId(paymentMethodId).build();
                 }
                 if (realPaymentMethod == null) {
-                    return Either.left(new PaymentError("unsupported", "Payment method " + paymentMethod.getType() + " not supported by the plugin"));
+                    return Either.left(new PaymentError("unsupported", "Payment method " + paymentMethod.getType() + " not supported by the plugin", null, null));
                 }
                 else {
                     if (shouldBeDefault) {
@@ -136,11 +147,11 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
                 }
             }
                 else {
-                    return Either.left(new PaymentError("noaccount", "Could not retrieve account for accountKey " + accountKey));
+                    return Either.left(new PaymentError("noaccount", "Could not retrieve account for accountKey " + accountKey, null, null));
                 }
         }
         else {
-            return Either.left(new PaymentError("unknown", "Could not create add payment method " + paymentMethod + " for " + accountKey));
+            return Either.left(new PaymentError("unknown", "Could not create add payment method " + paymentMethod + " for " + accountKey, null, null));
         }
     }
 
@@ -184,7 +195,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
                 realPaymentMethod = new CreditCardPaymentMethodInfo.Builder(ccPaymentMethod).build();
             }
             if (realPaymentMethod == null) {
-                return Either.left(new PaymentError("unsupported", "Payment method " + paymentMethod.getType() + " not supported by the plugin"));
+                return Either.left(new PaymentError("unsupported", "Payment method " + paymentMethod.getType() + " not supported by the plugin", null, null));
             }
             else {
                 paymentMethods.put(paymentMethod.getId(), paymentMethod);
@@ -192,7 +203,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
             }
         }
         else {
-            return Either.left(new PaymentError("unknown", "Could not create add payment method " + paymentMethod + " for " + accountKey));
+            return Either.left(new PaymentError("unknown", "Could not create add payment method " + paymentMethod + " for " + accountKey, null, null));
         }
     }
 
@@ -202,11 +213,11 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
         if (paymentMethodInfo != null) {
             if (Boolean.FALSE.equals(paymentMethodInfo.getDefaultMethod()) || paymentMethodInfo.getDefaultMethod() == null) {
                 if (paymentMethods.remove(paymentMethodId) == null) {
-                    return Either.left(new PaymentError("unknown", "Did not get any result back"));
+                    return Either.left(new PaymentError("unknown", "Did not get any result back", null, null));
                 }
             }
             else {
-                return Either.left(new PaymentError("error", "Cannot delete default payment method"));
+                return Either.left(new PaymentError("error", "Cannot delete default payment method", null, null));
             }
         }
         return Either.right(null);
@@ -215,7 +226,7 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
     @Override
     public Either<PaymentError, PaymentMethodInfo> getPaymentMethodInfo(String paymentMethodId) {
         if (paymentMethodId == null) {
-            return Either.left(new PaymentError("unknown", "Could not retrieve payment method for paymentMethodId " + paymentMethodId));
+            return Either.left(new PaymentError("unknown", "Could not retrieve payment method for paymentMethodId " + paymentMethodId, null, null));
         }
 
         return Either.right(paymentMethods.get(paymentMethodId));
diff --git a/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithEmbeddedDb.java b/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithEmbeddedDb.java
index 31fdae3..97aa31e 100644
--- a/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithEmbeddedDb.java
+++ b/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithEmbeddedDb.java
@@ -16,7 +16,6 @@
 
 package com.ning.billing.payment.setup;
 
-import com.ning.billing.util.bus.Bus;
 import org.apache.commons.collections.MapUtils;
 
 import com.google.common.collect.ImmutableMap;
@@ -24,8 +23,10 @@ import com.google.inject.Provider;
 import com.ning.billing.entitlement.api.billing.EntitlementBillingApi;
 import com.ning.billing.mock.BrainDeadProxyFactory;
 import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
-import com.ning.billing.payment.setup.PaymentTestModuleWithMocks.MockProvider;
+import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.bus.InMemoryBus;
+import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
 
 public class PaymentTestModuleWithEmbeddedDb extends PaymentModule {
 	public static class MockProvider implements Provider<EntitlementBillingApi> {
@@ -33,9 +34,9 @@ public class PaymentTestModuleWithEmbeddedDb extends PaymentModule {
 		public EntitlementBillingApi get() {
 			return BrainDeadProxyFactory.createBrainDeadProxyFor(EntitlementBillingApi.class);
 		}
-		
+
 	}
-	
+
 	public PaymentTestModuleWithEmbeddedDb() {
         super(MapUtils.toProperties(ImmutableMap.of("killbill.payment.provider.default", "my-mock")));
     }
@@ -49,6 +50,7 @@ public class PaymentTestModuleWithEmbeddedDb extends PaymentModule {
     protected void configure() {
         super.configure();
         bind(Bus.class).to(InMemoryBus.class).asEagerSingleton();
-        bind(EntitlementBillingApi.class).toProvider( MockProvider.class );
+        bind(EntitlementBillingApi.class).toProvider(MockProvider.class);
+        bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
     }
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithMocks.java b/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithMocks.java
index 3dfc637..c8f79bc 100644
--- a/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithMocks.java
+++ b/payment/src/test/java/com/ning/billing/payment/setup/PaymentTestModuleWithMocks.java
@@ -31,6 +31,8 @@ import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.bus.InMemoryBus;
+import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
 
 public class PaymentTestModuleWithMocks extends PaymentModule {
 	public static class MockProvider implements Provider<EntitlementBillingApi> {
@@ -38,12 +40,13 @@ public class PaymentTestModuleWithMocks extends PaymentModule {
 		public EntitlementBillingApi get() {
 			return BrainDeadProxyFactory.createBrainDeadProxyFor(EntitlementBillingApi.class);
 		}
-		
+
 	}
-	
-	
+
+
     public PaymentTestModuleWithMocks() {
-        super(MapUtils.toProperties(ImmutableMap.of("killbill.payment.provider.default", "my-mock")));
+        super(MapUtils.toProperties(ImmutableMap.of("killbill.payment.provider.default", "my-mock",
+                                                    "killbill.payment.engine.events.off", "false")));
     }
 
     @Override
@@ -65,5 +68,6 @@ public class PaymentTestModuleWithMocks extends PaymentModule {
         bind(MockInvoiceDao.class).asEagerSingleton();
         bind(InvoiceDao.class).to(MockInvoiceDao.class);
         bind(EntitlementBillingApi.class).toProvider( MockProvider.class );
+        bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
     }
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/TestPaymentInvoiceIntegration.java b/payment/src/test/java/com/ning/billing/payment/TestPaymentInvoiceIntegration.java
index c1e62d8..888e017 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestPaymentInvoiceIntegration.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestPaymentInvoiceIntegration.java
@@ -26,7 +26,6 @@ import java.math.BigDecimal;
 import java.util.List;
 import java.util.concurrent.Callable;
 
-import com.ning.billing.invoice.glue.InvoiceModuleWithMocks;
 import org.apache.commons.io.IOUtils;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
@@ -46,6 +45,7 @@ import com.ning.billing.account.glue.AccountModule;
 import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.api.InvoicePaymentApi;
+import com.ning.billing.invoice.glue.InvoiceModuleWithMocks;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentAttempt;
 import com.ning.billing.payment.api.PaymentError;
@@ -54,6 +54,7 @@ import com.ning.billing.payment.setup.PaymentTestModuleWithEmbeddedDb;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.bus.Bus.EventBusException;
 import com.ning.billing.util.clock.MockClockModule;
+import com.ning.billing.util.notificationq.NotificationQueueService;
 
 public class TestPaymentInvoiceIntegration {
     // create payment for received invoice and save it -- positive and negative
@@ -69,6 +70,8 @@ public class TestPaymentInvoiceIntegration {
     private PaymentApi paymentApi;
     @Inject
     private TestHelper testHelper;
+    @Inject
+    private NotificationQueueService notificationQueueService;
 
     private MockPaymentInfoReceiver paymentInfoReceiver;
 
diff --git a/payment/src/test/java/com/ning/billing/payment/TestRetryService.java b/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
new file mode 100644
index 0000000..6a08eb5
--- /dev/null
+++ b/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.payment;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.inject.Inject;
+import com.ning.billing.account.api.Account;
+import com.ning.billing.account.glue.AccountModuleWithMocks;
+import com.ning.billing.catalog.api.Currency;
+import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.invoice.glue.InvoiceModuleWithMocks;
+import com.ning.billing.invoice.model.RecurringInvoiceItem;
+import com.ning.billing.payment.api.Either;
+import com.ning.billing.payment.api.PaymentApi;
+import com.ning.billing.payment.api.PaymentAttempt;
+import com.ning.billing.payment.api.PaymentError;
+import com.ning.billing.payment.api.PaymentInfo;
+import com.ning.billing.payment.api.PaymentStatus;
+import com.ning.billing.payment.dao.PaymentDao;
+import com.ning.billing.payment.provider.MockPaymentProviderPlugin;
+import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
+import com.ning.billing.payment.setup.PaymentConfig;
+import com.ning.billing.payment.setup.PaymentTestModuleWithMocks;
+import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.notificationq.MockNotificationQueue;
+import com.ning.billing.util.notificationq.Notification;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
+@Guice(modules = { PaymentTestModuleWithMocks.class, AccountModuleWithMocks.class, InvoiceModuleWithMocks.class })
+@Test(groups = "fast")
+public class TestRetryService {
+    @Inject
+    private PaymentConfig paymentConfig;
+    @Inject
+    private Bus eventBus;
+    @Inject
+    private PaymentApi paymentApi;
+    @Inject
+    private TestHelper testHelper;
+    @Inject
+    private PaymentProviderPluginRegistry registry;
+    @Inject
+    private PaymentDao paymentDao;
+    @Inject
+    private RetryService retryService;
+    @Inject
+    private NotificationQueueService notificationQueueService;
+
+    private MockPaymentProviderPlugin mockPaymentProviderPlugin;
+    private MockNotificationQueue mockNotificationQueue;
+
+    @BeforeClass(alwaysRun = true)
+    public void initialize() throws Exception {
+        retryService.initialize();
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() throws Exception {
+        eventBus.start();
+        retryService.start();
+
+        mockPaymentProviderPlugin = (MockPaymentProviderPlugin)registry.getPlugin(null);
+        mockNotificationQueue = (MockNotificationQueue)notificationQueueService.getNotificationQueue(RetryService.SERVICE_NAME, RetryService.QUEUE_NAME);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        retryService.stop();
+        eventBus.stop();
+    }
+
+    @Test
+    public void testSchedulesRetry() throws Exception {
+        final DateTime now = new DateTime(DateTimeZone.UTC);
+        final Account account = testHelper.createTestCreditCardAccount();
+        final Invoice invoice = testHelper.createTestInvoice(account, now, Currency.USD);
+        final BigDecimal amount = new BigDecimal("10.00");
+        final UUID subscriptionId = UUID.randomUUID();
+
+        invoice.addInvoiceItem(new RecurringInvoiceItem(invoice.getId(),
+                                                       subscriptionId,
+                                                       "test plan", "test phase",
+                                                       now,
+                                                       now.plusMonths(1),
+                                                       amount,
+                                                       new BigDecimal("1.0"),
+                                                       Currency.USD,
+                                                       new DateTime(DateTimeZone.UTC),
+                                                       new DateTime(DateTimeZone.UTC)));
+
+        mockPaymentProviderPlugin.makeNextInvoiceFail();
+
+        List<Either<PaymentError, PaymentInfo>> results = paymentApi.createPayment(account.getExternalKey(), Arrays.asList(invoice.getId().toString()));
+
+        assertEquals(results.size(), 1);
+        assertTrue(results.get(0).isLeft());
+
+        List<Notification> pendingNotifications = mockNotificationQueue.getPendingEvents();
+
+        assertEquals(pendingNotifications.size(), 1);
+
+        Notification notification = pendingNotifications.get(0);
+        PaymentAttempt paymentAttempt = paymentApi.getPaymentAttemptForInvoiceId(invoice.getId().toString());
+
+        assertNotNull(paymentAttempt);
+        assertEquals(notification.getNotificationKey(), paymentAttempt.getPaymentAttemptId().toString());
+        assertEquals(paymentAttempt.getRetryCount(), new Integer(1));
+
+        DateTime expectedRetryDate = paymentAttempt.getPaymentAttemptDate().plusDays(paymentConfig.getPaymentRetryDays().get(0));
+
+        assertEquals(notification.getEffectiveDate(), expectedRetryDate);
+        assertEquals(paymentAttempt.getNextRetryDate(), expectedRetryDate);
+    }
+
+    @Test
+    public void testRetries() throws Exception {
+        final DateTime now = new DateTime(DateTimeZone.UTC);
+        final Account account = testHelper.createTestCreditCardAccount();
+        final Invoice invoice = testHelper.createTestInvoice(account, now, Currency.USD);
+        final BigDecimal amount = new BigDecimal("10.00");
+        final UUID subscriptionId = UUID.randomUUID();
+
+        invoice.addInvoiceItem(new RecurringInvoiceItem(invoice.getId(),
+                                                       subscriptionId,
+                                                       "test plan", "test phase",
+                                                       now,
+                                                       now.plusMonths(1),
+                                                       amount,
+                                                       new BigDecimal("1.0"),
+                                                       Currency.USD,
+                                                       new DateTime(DateTimeZone.UTC),
+                                                       new DateTime(DateTimeZone.UTC)));
+
+        DateTime nextRetryDate = new DateTime(DateTimeZone.UTC).minusDays(1);
+        DateTime paymentAttemptDate = nextRetryDate.minusDays(paymentConfig.getPaymentRetryDays().get(0));
+        PaymentAttempt paymentAttempt = new PaymentAttempt(UUID.randomUUID(), invoice).cloner()
+                                                                                      .setRetryCount(1)
+                                                                                      .setPaymentAttemptDate(paymentAttemptDate)
+                                                                                      .setNextRetryDate(nextRetryDate)
+                                                                                      .build();
+
+        paymentDao.createPaymentAttempt(paymentAttempt);
+        retryService.scheduleRetry(paymentAttempt, nextRetryDate);
+
+        // wait a little to give the queue time to process
+        Thread.sleep(paymentConfig.getNotificationSleepTimeMs() * 2);
+
+        List<Notification> pendingNotifications = mockNotificationQueue.getPendingEvents();
+
+        assertEquals(pendingNotifications.size(), 0);
+
+        List<PaymentInfo> paymentInfos = paymentApi.getPaymentInfo(Arrays.asList(invoice.getId().toString()));
+
+        assertEquals(paymentInfos.size(), 1);
+
+        PaymentInfo paymentInfo = paymentInfos.get(0);
+
+        assertEquals(paymentInfo.getStatus(), PaymentStatus.Processed.toString());
+
+        PaymentAttempt updatedAttempt = paymentApi.getPaymentAttemptForInvoiceId(invoice.getId().toString());
+
+        assertEquals(updatedAttempt.getPaymentAttemptId(), paymentAttempt.getPaymentAttemptId());
+        assertEquals(paymentInfo.getPaymentId(), updatedAttempt.getPaymentId());
+
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 8e2aaf8..c0c88fe 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.notificationq;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
@@ -64,6 +65,12 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     }
 
     @Override
+    public void recordFutureNotification(DateTime futureNotificationTime, NotificationKey notificationKey) {
+        Notification notification = new DefaultNotification(getFullQName(), notificationKey.toString(), futureNotificationTime);
+        dao.insertNotification(notification);
+    }
+
+    @Override
     public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
             final DateTime futureNotificationTime, final NotificationKey notificationKey) {
         NotificationSqlDao transactionalNotificationDao =  transactionalDao.become(NotificationSqlDao.class);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index e1dcdbf..fb88d4c 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -23,9 +23,18 @@ import com.ning.billing.util.notificationq.NotificationQueueService.Notification
 
 public interface NotificationQueue {
 
-    /**
+   /**
+    *
+    * Record the need to be called back when the notification is ready
+    *
+    * @param futureNotificationTime the time at which the notification is ready
+    * @param notificationKey the key for that notification
+    */
+   public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey);
+
+   /**
     *
-    *  Record from within a transaction the need to be called back when the notification is ready
+    * Record from within a transaction the need to be called back when the notification is ready
     *
     * @param transactionalDao the transactionalDao
     * @param futureNotificationTime the time at which the notification is ready
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index e96d2cf..b76a8ad 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -49,9 +49,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    public void recordFutureNotificationFromTransaction(
-            Transmogrifier transactionalDao, DateTime futureNotificationTime,
-            NotificationKey notificationKey) {
+    public void recordFutureNotification(DateTime futureNotificationTime, NotificationKey notificationKey) {
         Notification notification = new DefaultNotification("MockQueue", notificationKey.toString(), futureNotificationTime);
         synchronized(notifications) {
             notifications.add(notification);
@@ -59,6 +57,24 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
+    public void recordFutureNotificationFromTransaction(
+            Transmogrifier transactionalDao, DateTime futureNotificationTime,
+            NotificationKey notificationKey) {
+        recordFutureNotification(futureNotificationTime, notificationKey);
+    }
+
+    public List<Notification> getPendingEvents() {
+        List<Notification> result = new ArrayList<Notification>();
+
+        for (Notification notification : notifications) {
+            if (notification.getProcessingState() == NotificationLifecycleState.AVAILABLE) {
+                result.add(notification);
+            }
+        }
+        return result;
+    }
+
+    @Override
     protected int doProcessEvents(int sequenceId) {
 
         int result = 0;