killbill-aplcache

Rework retry framework for payment Add account locking and

6/1/2012 9:54:19 PM

Changes

Details

diff --git a/api/src/main/java/com/ning/billing/ErrorCode.java b/api/src/main/java/com/ning/billing/ErrorCode.java
index 85f0449..6081d19 100644
--- a/api/src/main/java/com/ning/billing/ErrorCode.java
+++ b/api/src/main/java/com/ning/billing/ErrorCode.java
@@ -207,7 +207,7 @@ public enum ErrorCode {
     /*
      * Range 7000 : Payment
      */
-    PAYMENT_INTERNAL_ERROR(7010, "Internal payment error : %s"),
+    
     PAYMENT_NO_SUCH_PAYMENT_METHOD(7001, "Payment method for account %s, and paymentId %s does not exist"),
     PAYMENT_NO_PAYMENT_METHODS(7002, "Payment methods for account %s don't exist"),
     PAYMENT_UPD_GATEWAY_FAILED(7003, "Failed to update payment gateway for account %s : %s"),
@@ -217,13 +217,16 @@ public enum ErrorCode {
     PAYMENT_UPD_PAYMENT_METHOD(7007, "Failed to update payment method for account %s : %s"),            
     PAYMENT_CREATE_PAYMENT(7008, "Failed to create payment for account %s : %s"),                
     PAYMENT_CREATE_PAYMENT_FOR_ATTEMPT(7009, "Failed to create payment for account %s and attempt %s : %s"),                    
-    PAYMENT_CREATE_PAYMENT_FOR_ATTEMPT_WITH_NON_POSITIVE_INV(70010, "Got payment attempt with negative or null invoice for account %s"),                        
+    PAYMENT_CREATE_PAYMENT_FOR_ATTEMPT_WITH_NON_POSITIVE_INV(7010, "Got payment attempt with negative or null invoice for account %s"),                        
     PAYMENT_CREATE_PAYMENT_FOR_ATTEMPT_BAD(7011, "Failed to create payment for attempts %s "),                    
     PAYMENT_CREATE_PAYMENT_PROVIDER_ACCOUNT(7012, "Failed to create payment provider account for account %s : %s"),                
     PAYMENT_UPD_PAYMENT_PROVIDER_ACCOUNT(7013, "Failed to update payment provider account for account %s : %s"),
     PAYMENT_GET_PAYMENT_PROVIDER_ACCOUNT(7014, "Failed to retrieve payment provider account for account %s : %s"),                        
     PAYMENT_CREATE_REFUND(7014, "Failed to create refund for account %s : %s"),                
-    
+    PAYMENT_NULL_INVOICE(7015, "Invoice %s has a balance <= 0 "),      
+    PAYMENT_AMOUNT_DENIED(7016, "Payment amount requested for invoice %s is greater than invoice balance [%f/%f]"),         
+    PAYMENT_INTERNAL_ERROR(7017, "Internal payment error : %s"),
+    PAYMENT_PLUGIN_TIMEOUT(7017, "Plugin timeout "),    
     /*
     *
     * Range 9000: Miscellaneous
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java b/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
index 4f2f6fd..2e2db33 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
@@ -15,6 +15,7 @@
  */
 package com.ning.billing.payment.api;
 
+import java.math.BigDecimal;
 import java.util.List;
 import java.util.UUID;
 
@@ -57,10 +58,10 @@ public interface PaymentApi {
     public void updatePaymentProviderAccountContact(String accountKey, CallContext context)
     throws PaymentApiException;
     
-    public Payment createPayment(final String accountKey, final UUID invoiceId, final CallContext context)
+    public Payment createPayment(final String accountKey, final UUID invoiceId, final BigDecimal amount, final CallContext context)
     throws PaymentApiException;
 
-    public Payment createPayment(final Account account, final UUID invoiceId, final CallContext context)
+    public Payment createPayment(final Account account, final UUID invoiceId, final BigDecimal amount, final CallContext context)
     throws PaymentApiException;
 
     public Refund createRefund(final Account account, final UUID paymentId, final CallContext context)
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentStatus.java b/api/src/main/java/com/ning/billing/payment/api/PaymentStatus.java
index 5f12bb5..9e44ff7 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentStatus.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentStatus.java
@@ -19,5 +19,6 @@ public enum PaymentStatus {
     UNKNOWN,
     AUTO_PAY_OFF,
     ERROR,
-    SUCCESS
+    SUCCESS,
+    ABORTED
 }
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
index a94cf5d..eadc1fb 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
@@ -220,11 +220,11 @@ public class TestOverdueIntegration extends TestIntegrationBase {
         paymentPlugin.makeAllInvoicesFail(false);
         Collection<Invoice> invoices = invoiceApi.getUnpaidInvoicesByAccountId(account.getId(), clock.getUTCNow());
         List<String> invoiceIds = new ArrayList<String>();
-        for(Invoice invoice : invoices) {
+        for (Invoice invoice : invoices) {
             invoiceIds.add(invoice.getId().toString()); 
-            if(invoice.getBalance().compareTo(BigDecimal.ZERO) > 0) {
+            if (invoice.getBalance().compareTo(BigDecimal.ZERO) > 0) {
                 busHandler.pushExpectedEvent(NextEvent.PAYMENT);
-                paymentApi.createPayment(account, invoice.getId(), new DefaultCallContext("test", null, null, clock));
+                paymentApi.createPayment(account, invoice.getId(), invoice.getBalance(), new DefaultCallContext("test", null, null, clock));
                 assertTrue(busHandler.isCompleted(DELAY));
             }
         }
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/InvoiceResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/InvoiceResource.java
index 079f1df..46c7a48 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/InvoiceResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/InvoiceResource.java
@@ -19,7 +19,6 @@ package com.ning.billing.jaxrs.resources;
 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
@@ -65,12 +64,13 @@ import com.ning.billing.jaxrs.util.JaxrsUriBuilder;
 import com.ning.billing.payment.api.Payment;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentApiException;
-import com.ning.billing.payment.api.PaymentInfoEvent;
 
 
 @Path(JaxrsResource.INVOICES_PATH)
 public class InvoiceResource extends JaxRsResourceBase {
+    
     private static final Logger log = LoggerFactory.getLogger(InvoiceResource.class);
+    
     private static final String ID_PARAM_NAME = "invoiceId";
     private static final String CUSTOM_FIELD_URI = JaxrsResource.CUSTOM_FIELDS + "/{" + ID_PARAM_NAME + ":" + UUID_PATTERN + "}";
     private static final String TAG_URI = JaxrsResource.TAGS + "/{" + ID_PARAM_NAME + ":" + UUID_PATTERN + "}";
@@ -191,7 +191,7 @@ public class InvoiceResource extends JaxRsResourceBase {
             @HeaderParam(HDR_COMMENT) final String comment) {
         try {
             Account account = accountApi.getAccountById(UUID.fromString(payment.getAccountId()));
-            paymentApi.createPayment(account, UUID.fromString(payment.getInvoiceId()), context.createContext(createdBy, reason, comment));
+            paymentApi.createPayment(account, UUID.fromString(payment.getInvoiceId()), null, context.createContext(createdBy, reason, comment));
             Response response = uriBuilder.buildResponse(InvoiceResource.class, "getPayments", payment.getInvoiceId());
             return response;
         } catch (PaymentApiException e) {
diff --git a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
index 195cfea..2d61934 100644
--- a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
+++ b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
@@ -16,6 +16,7 @@
 
 package com.ning.billing.payment.api;
 
+import java.math.BigDecimal;
 import java.util.List;
 import java.util.UUID;
 
@@ -88,15 +89,15 @@ public class DefaultPaymentApi implements PaymentApi {
      }
    
     @Override
-    public Payment createPayment(final String accountKey, final UUID invoiceId, final CallContext context) 
+    public Payment createPayment(final String accountKey, final UUID invoiceId, final BigDecimal amount, final CallContext context) 
     throws PaymentApiException {
-        return paymentProcessor.createPayment(accountKey, invoiceId, context);
+        return paymentProcessor.createPayment(accountKey, invoiceId, amount, context, true);
      }
     
     @Override
     public Payment createPayment(Account account, UUID invoiceId,
-            CallContext context) throws PaymentApiException {
-        return paymentProcessor.createPayment(account, invoiceId, context);        
+            final BigDecimal amount, CallContext context) throws PaymentApiException {
+        return paymentProcessor.createPayment(account, invoiceId, amount, context, true);        
     }
 
     
diff --git a/payment/src/main/java/com/ning/billing/payment/core/AccountProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/AccountProcessor.java
index 65e0114..5757a49 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/AccountProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/AccountProcessor.java
@@ -15,7 +15,12 @@
  */
 package com.ning.billing.payment.core;
 
+import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR;
+
+import java.util.concurrent.ExecutorService;
+
 import com.google.inject.Inject;
+import com.google.inject.name.Named;
 import com.ning.billing.ErrorCode;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
@@ -36,8 +41,9 @@ public class AccountProcessor extends ProcessorBase {
     public AccountProcessor(final PaymentProviderPluginRegistry pluginRegistry,
             final AccountUserApi accountUserApi,
             final Bus eventBus,
-            final GlobalLocker locker) {
-        super(pluginRegistry, accountUserApi, eventBus, locker);        
+            final GlobalLocker locker,
+            @Named(PLUGIN_EXECUTOR) final ExecutorService executor) {
+        super(pluginRegistry, accountUserApi, eventBus, locker, executor);        
     }
     
     public String createPaymentProviderAccount(Account account, CallContext context) 
diff --git a/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java
index dd3beae..1a5f630 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java
@@ -15,10 +15,14 @@
  */
 package com.ning.billing.payment.core;
 
+import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR;
+
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import javax.inject.Inject;
 
+import com.google.inject.name.Named;
 import com.ning.billing.ErrorCode;
 import com.ning.billing.account.api.AccountUserApi;
 import com.ning.billing.payment.api.PaymentApiException;
@@ -36,8 +40,9 @@ public class PaymentMethodProcessor extends ProcessorBase {
     public PaymentMethodProcessor(final PaymentProviderPluginRegistry pluginRegistry,
             final AccountUserApi accountUserApi,
             final Bus eventBus,
-            final GlobalLocker locker) {
-        super(pluginRegistry, accountUserApi, eventBus, locker);
+            final GlobalLocker locker,
+            @Named(PLUGIN_EXECUTOR)  final ExecutorService executor) {
+        super(pluginRegistry, accountUserApi, eventBus, locker, executor);
     }
     
     //@Override
diff --git a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
index ed945fe..5791a33 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
@@ -15,11 +15,14 @@
  */
 package com.ning.billing.payment.core;
 
+import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR;
+
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
 
 import javax.inject.Inject;
 
@@ -27,6 +30,7 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.inject.name.Named;
 import com.ning.billing.ErrorCode;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
@@ -43,6 +47,7 @@ import com.ning.billing.payment.api.PaymentStatus;
 import com.ning.billing.payment.dao.PaymentAttemptModelDao;
 import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.dao.PaymentModelDao;
+import com.ning.billing.payment.dispatcher.PluginDispatcher;
 import com.ning.billing.payment.plugin.api.PaymentInfoPlugin;
 import com.ning.billing.payment.plugin.api.PaymentPluginApiException;
 import com.ning.billing.payment.plugin.api.PaymentProviderPlugin;
@@ -67,6 +72,8 @@ public class PaymentProcessor extends ProcessorBase {
     private final CallContextFactory factory;
     private final Clock clock;
     
+    private final PluginDispatcher<Payment> paymentPluginDispatcher;
+    private final PluginDispatcher<Void> voidPluginDispatcher;    
     
     private static final Logger log = LoggerFactory.getLogger(PaymentProcessor.class);
 
@@ -80,14 +87,17 @@ public class PaymentProcessor extends ProcessorBase {
             final Bus eventBus,
             final Clock clock,
             final GlobalLocker locker,
+            @Named(PLUGIN_EXECUTOR) final ExecutorService executor,            
             final CallContextFactory factory) {
-        super(pluginRegistry, accountUserApi, eventBus, locker);
+        super(pluginRegistry, accountUserApi, eventBus, locker, executor);
         this.invoicePaymentApi = invoicePaymentApi;
         this.retryService = retryService;
         this.paymentDao = paymentDao;
         this.clock = clock;
         this.config = config;
         this.factory = factory;
+        this.paymentPluginDispatcher = new PluginDispatcher<Payment>(executor);
+        this.voidPluginDispatcher = new PluginDispatcher<Void>(executor);
     }
   
 
@@ -110,41 +120,57 @@ public class PaymentProcessor extends ProcessorBase {
         return result;
     }
 
-    public Payment createPayment(final String accountKey, final UUID invoiceId, final CallContext context) 
+    public Payment createPayment(final String accountKey, final UUID invoiceId, final BigDecimal inputAmount, final CallContext context, final boolean isInstantPayment) 
     throws PaymentApiException {
         try {
             final Account account = accountUserApi.getAccountByKey(accountKey);
-            return createPayment(account, invoiceId, context);
+            return createPayment(account, invoiceId, inputAmount, context, isInstantPayment);
         } catch (AccountApiException e) {
             throw new PaymentApiException(e);
         }
     }
 
-    public Payment createPayment(final Account account, final UUID invoiceId,
-            final CallContext context) throws PaymentApiException {
-        
+    public Payment createPayment(final Account account, final UUID invoiceId, final BigDecimal inputAmount , final CallContext context,  final boolean isInstantPayment)
+    throws PaymentApiException {
+
         final PaymentProviderPlugin plugin = getPaymentProviderPlugin(account);
-        
-        return new WithAccountLock<Payment>().processAccountWithLock(locker, account.getExternalKey(), new WithAccountLockCallback<Payment>() {
+
+        return paymentPluginDispatcher.dispatchWithAccountLock(new WithAccountLockAndTimeout<Payment>(locker,
+                account.getExternalKey(),
+                new WithAccountLockCallback<Payment>() {
 
             @Override
             public Payment doOperation() throws PaymentApiException {
-
                 final Invoice invoice = invoicePaymentApi.getInvoice(invoiceId);
 
                 if (invoice.isMigrationInvoice()) {
                     log.error("Received invoice for payment that is a migration invoice - don't know how to handle those yet: {}", invoice);
                     return null;
                 }
-                if (invoice.getBalance().compareTo(BigDecimal.ZERO) <= 0 ) {
-                    return null;
-                }
-                return processNewPaymentWithAccountLocked(plugin, account, invoice, context);
+
+                BigDecimal requestedAmount = getAndValidatePaymentAmount(invoice, inputAmount, isInstantPayment);
+                return processNewPaymentWithAccountLocked(plugin, account, invoice, requestedAmount, isInstantPayment, context);
             }
-        });
+        }));
     }
 
-    public void retryPayment(final UUID paymentId) {
+    private BigDecimal getAndValidatePaymentAmount(final Invoice invoice,  final BigDecimal inputAmount, final boolean isInstantPayment)
+    throws PaymentApiException {
+
+        if (invoice.getBalance().compareTo(BigDecimal.ZERO) <= 0 ) {
+            throw new PaymentApiException(ErrorCode.PAYMENT_NULL_INVOICE, invoice.getId());
+        }
+        if (isInstantPayment &&
+                inputAmount != null &&
+                invoice.getBalance().compareTo(inputAmount) < 0) {
+                    throw new PaymentApiException(ErrorCode.PAYMENT_AMOUNT_DENIED,
+                            invoice.getId(), inputAmount.floatValue(), invoice.getBalance().floatValue());   
+        }
+        return inputAmount != null ? inputAmount : invoice.getBalance();
+    }
+
+
+    public void retryFailedPayment(final UUID paymentId) {
 
         try {
             final PaymentModelDao payment = paymentDao.getPayment(paymentId);
@@ -152,62 +178,60 @@ public class PaymentProcessor extends ProcessorBase {
                 log.error("Invalid retry for non existnt paymentId {}", paymentId);
                 return;
             }
+            
             final Account account = accountUserApi.getAccountById(payment.getAccountId());
-
-
             final PaymentProviderPlugin plugin = getPaymentProviderPlugin(account);
-
-
             final CallContext context = factory.createCallContext("PaymentRetry", CallOrigin.INTERNAL, UserType.SYSTEM);
-
-            new WithAccountLock<Void>().processAccountWithLock(locker, account.getExternalKey(), new WithAccountLockCallback<Void>() {
+            
+            voidPluginDispatcher.dispatchWithAccountLock(new WithAccountLockAndTimeout<Void>(locker,
+                    account.getExternalKey(),
+                    new WithAccountLockCallback<Void>() {
 
                 @Override
                 public Void doOperation() throws PaymentApiException {
-
                     final Invoice invoice = invoicePaymentApi.getInvoice(payment.getInvoiceId());
-                    // STEPH invoice API does not throw if no invoice?
                     if (invoice.isMigrationInvoice()) {
                         return null;
                     }
                     if (invoice.getBalance().compareTo(BigDecimal.ZERO) <= 0 ) {
                         return null;
                     }
-                    // STEPH what if invoice balance is now less than what is reflected in the Payment object?
-
-                    processRetryPaymentWithAccountLocked(plugin, account, invoice, payment, context);
+                    processRetryPaymentWithAccountLocked(plugin, account, invoice, payment, invoice.getBalance(), context);
                     return null;
+
                 }
-            });
+            }));
 
         } catch (AccountApiException e) {
             log.error(String.format("Failed to retry payment for paymentId %s", paymentId), e);
         } catch (PaymentApiException e) {
-            log.error(String.format("Failed to retry payment for paymentId %s", paymentId), e);
+            log.info(String.format("Failed to retry payment for paymentId %s", paymentId));
         }
     }
 
    
 
     private Payment processNewPaymentWithAccountLocked(PaymentProviderPlugin plugin, Account account, Invoice invoice,
-            CallContext context) throws PaymentApiException {
-        PaymentModelDao payment = new PaymentModelDao(account.getId(), invoice.getId(), invoice.getTotalAmount().setScale(2, RoundingMode.HALF_EVEN), invoice.getCurrency(), invoice.getTargetDate());
-        PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(account.getId(), invoice.getId(), payment.getId(), clock.getUTCNow());
+            BigDecimal requestedAmount, boolean isInstantPayment, CallContext context) throws PaymentApiException {
+        
+        PaymentModelDao payment = new PaymentModelDao(account.getId(), invoice.getId(), requestedAmount.setScale(2, RoundingMode.HALF_EVEN), invoice.getCurrency(), invoice.getTargetDate());
+        PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(account.getId(), invoice.getId(), payment.getId(), clock.getUTCNow(), requestedAmount);
             
         PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, context);
-        return processPaymentWithAccountLocked(plugin, account, invoice, savedPayment, attempt, context);
+        return processPaymentWithAccountLocked(plugin, account, invoice, savedPayment, attempt, isInstantPayment, context);
 
     }
     
     private Payment processRetryPaymentWithAccountLocked(PaymentProviderPlugin plugin, Account account, Invoice invoice, PaymentModelDao payment,
-            CallContext context) throws PaymentApiException {
-        PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(account.getId(), invoice.getId(), payment.getId(), clock.getUTCNow());
+            BigDecimal requestedAmount, CallContext context) throws PaymentApiException {
+        
+        PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(account.getId(), invoice.getId(), payment.getId(), clock.getUTCNow(), requestedAmount);
         paymentDao.insertNewAttemptForPayment(payment.getId(), attempt, context);
-        return processPaymentWithAccountLocked(plugin, account, invoice, payment, attempt, context);
+        return processPaymentWithAccountLocked(plugin, account, invoice, payment, attempt, false, context);
     }
 
     private Payment processPaymentWithAccountLocked(PaymentProviderPlugin plugin, Account account, Invoice invoice,
-            PaymentModelDao paymentInput, PaymentAttemptModelDao attemptInput, CallContext context) throws PaymentApiException {
+            PaymentModelDao paymentInput, PaymentAttemptModelDao attemptInput, boolean isInstantPayment, CallContext context) throws PaymentApiException {
         
         BusEvent event = null;
         List<PaymentAttemptModelDao> allAttempts = null;
@@ -215,9 +239,10 @@ public class PaymentProcessor extends ProcessorBase {
         PaymentModelDao payment = null;
         try {
             
-            PaymentInfoPlugin paymentPluginInfo =  plugin.processPayment(account.getExternalKey(), paymentInput.getId(), paymentInput.getAmount());
+            PaymentInfoPlugin paymentPluginInfo =  plugin.processPayment(account.getExternalKey(), paymentInput.getId(), attemptInput.getRequestedAmount());
             
             // STEPH check if plugin returns UNKNOWN (exception from plugin)
+            // Does plugin throws or returns ERROR?
             PaymentStatus paymentStatus = paymentPluginInfo.getStatus() ==  PaymentPluginStatus.ERROR ? PaymentStatus.ERROR : PaymentStatus.SUCCESS;
             
             paymentDao.updateStatusForPaymentWithAttempt(paymentInput.getId(), paymentStatus, paymentPluginInfo.getError(), attemptInput.getId(), context);
@@ -238,12 +263,20 @@ public class PaymentProcessor extends ProcessorBase {
       
         } catch (PaymentPluginApiException e) {
 
+            final PaymentStatus errorStatus = isInstantPayment ? PaymentStatus.ABORTED : PaymentStatus.ERROR;
+            
+            paymentDao.updateStatusForPaymentWithAttempt(paymentInput.getId(), errorStatus, e.getMessage(), attemptInput.getId(), context);
+
             allAttempts = paymentDao.getAttemptsForPayment(paymentInput.getId());
             lastAttempt = allAttempts.get(allAttempts.size() - 1);
 
             log.info(String.format("Could not process payment for account %s, invoice %s, error = %s",
                     account.getId(), invoice.getId(), e.getMessage()));
-            scheduleRetry(paymentInput.getId(), lastAttempt.getEffectiveDate(), allAttempts.size());
+            
+            if (!isInstantPayment) {
+                scheduleRetry(paymentInput.getId(), lastAttempt.getEffectiveDate(), allAttempts.size(), context);
+            }
+            
             event = new DefaultPaymentErrorEvent(account.getId(), invoice.getId(), paymentInput.getId(), e.getErrorMessage(), context.getUserToken());                        
             throw new PaymentApiException(e, ErrorCode.PAYMENT_CREATE_PAYMENT, account.getId(), e.getMessage());
 
@@ -253,25 +286,23 @@ public class PaymentProcessor extends ProcessorBase {
         return new DefaultPayment(payment, allAttempts);
     }
 
-    private void scheduleRetry(final UUID paymentId, final DateTime lastAttemptDate, final int numberAttempts) {
-
+    private void scheduleRetry(final UUID paymentId, final DateTime lastAttemptDate, final int numberAttempts, final CallContext context) {
 
         final List<Integer> retryDays = config.getPaymentRetryDays();
-        int retryCount = numberAttempts;
-
+        int retryCount = numberAttempts - 1;
         if (retryCount < retryDays.size()) {
             int retryInDays = 0;
-            DateTime nextRetryDate = lastAttemptDate;
+            DateTime nextRetryDate = clock.getUTCNow();
             try {
                 retryInDays = retryDays.get(retryCount);
                 nextRetryDate = nextRetryDate.plusDays(retryInDays);
-
                 retryService.scheduleRetry(paymentId, nextRetryDate);
             } catch (NumberFormatException ex) {
                 log.error("Could not get retry day for retry count {}", retryCount);
             }
         } else if (retryCount == retryDays.size()) {
             log.info("Last payment retry failed for {} ", paymentId);
+            paymentDao.updateStatusForPayment(paymentId, PaymentStatus.ABORTED, context);
         } else {
             log.error("Cannot update payment retry information because retry count is invalid {} ", retryCount);
         }
diff --git a/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java b/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
index 8701a18..f342094 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
@@ -16,6 +16,9 @@
 package com.ning.billing.payment.core;
 
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,18 +46,20 @@ public abstract class ProcessorBase {
     protected final AccountUserApi accountUserApi;
     protected final Bus eventBus;
     protected final GlobalLocker locker;
-    
+    protected final ExecutorService executor;
 
     private static final Logger log = LoggerFactory.getLogger(ProcessorBase.class);
     
     public ProcessorBase(final PaymentProviderPluginRegistry pluginRegistry,
             final AccountUserApi accountUserApi,
             final Bus eventBus,
-            final GlobalLocker locker) {
+            final GlobalLocker locker,
+            final ExecutorService executor) {
         this.pluginRegistry = pluginRegistry;
         this.accountUserApi = accountUserApi;
         this.eventBus= eventBus;
         this.locker = locker;
+        this.executor = executor;
     }
     
     
@@ -100,7 +105,30 @@ public abstract class ProcessorBase {
         public T doOperation() throws PaymentApiException;
     }
     
+    
+
+    public static class WithAccountLockAndTimeout<T> implements Callable<T> {
+        
+        private final GlobalLocker locker;
+        private final String accountExternalKey;
+        private final WithAccountLockCallback<T> callback;
+        
+        public WithAccountLockAndTimeout(final GlobalLocker locker,
+                final String accountExternalKey,
+                final WithAccountLockCallback<T> callback) {
+            this.locker = locker;
+            this.accountExternalKey = accountExternalKey;
+            this.callback = callback;
+        }
+
+        @Override
+        public T call() throws Exception {
+            return new WithAccountLock<T>().processAccountWithLock(locker, accountExternalKey, callback);
+        }
+    }
+    
     public static class WithAccountLock<T> {
+        
         public T processAccountWithLock(final GlobalLocker locker, final String accountExternalKey, final WithAccountLockCallback<T> callback)
          throws PaymentApiException {
             GlobalLock lock = null;
diff --git a/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java
index a1c4cbd..1831de3 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java
@@ -15,10 +15,14 @@
  */
 package com.ning.billing.payment.core;
 
+import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR;
+
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
 
 import javax.inject.Inject;
 
+import com.google.inject.name.Named;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountUserApi;
 import com.ning.billing.payment.api.PaymentApiException;
@@ -34,8 +38,9 @@ public class RefundProcessor extends ProcessorBase {
     public RefundProcessor(final PaymentProviderPluginRegistry pluginRegistry,
             final AccountUserApi accountUserApi,
             final Bus eventBus,
-            final GlobalLocker locker) {
-        super(pluginRegistry, accountUserApi, eventBus, locker);        
+            final GlobalLocker locker,
+            @Named(PLUGIN_EXECUTOR)  final ExecutorService executor) {
+        super(pluginRegistry, accountUserApi, eventBus, locker, executor);        
     }
     
     public Refund createRefund(Account account, UUID paymentId, CallContext context)
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java b/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java
index a98b067..da3ae01 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java
@@ -15,6 +15,7 @@
  */
 package com.ning.billing.payment.dao;
 
+import java.math.BigDecimal;
 import java.util.List;
 import java.util.UUID;
 
@@ -53,7 +54,10 @@ public class AuditedPaymentDao implements PaymentDao {
             @Override
             public PaymentAttemptModelDao inTransaction(PaymentAttemptSqlDao transactional, TransactionStatus status)
             throws Exception {
-                return insertPaymentAttemptFromTransaction(attempt, context, transactional);
+                PaymentAttemptModelDao savedAttempt = insertPaymentAttemptFromTransaction(attempt, context, transactional);
+                PaymentSqlDao transPaymentSqlDao = transactional.become(PaymentSqlDao.class);
+                updatePaymentAmountFromTransaction(paymentId, savedAttempt.getRequestedAmount(), context, transPaymentSqlDao);
+                return savedAttempt;
             }
         });
     }
@@ -114,6 +118,20 @@ public class AuditedPaymentDao implements PaymentDao {
     
 
     @Override
+    public void updateStatusForPayment(final UUID paymentId,
+            final PaymentStatus paymentStatus, final CallContext context) {
+        paymentSqlDao.inTransaction(new Transaction<Void, PaymentSqlDao>() {
+
+            @Override
+            public Void inTransaction(PaymentSqlDao transactional,
+                    TransactionStatus status) throws Exception {
+                updatePaymentStatusFromTransaction(paymentId, paymentStatus, context, transactional);
+                return null;
+            }
+        });
+    }
+
+    @Override
     public void updateStatusForPaymentWithAttempt(final UUID paymentId,
             final PaymentStatus paymentStatus, final String paymentError, final UUID attemptId,
             final CallContext context) {
@@ -129,7 +147,18 @@ public class AuditedPaymentDao implements PaymentDao {
             }
         });
     }
-    
+
+    private void updatePaymentAmountFromTransaction(final UUID paymentId, final BigDecimal amount, final CallContext context, final PaymentSqlDao transactional) {
+        transactional.updatePaymentAmount(paymentId.toString(), amount, context);
+        PaymentModelDao savedPayment = transactional.getPayment(paymentId.toString());
+        Long recordId = transactional.getRecordId(savedPayment.getId().toString());
+        EntityHistory<PaymentModelDao> history = new EntityHistory<PaymentModelDao>(savedPayment.getId(), recordId, savedPayment, ChangeType.UPDATE);
+        transactional.insertHistoryFromTransaction(history, context);
+        Long historyRecordId = transactional.getHistoryRecordId(recordId);
+        EntityAudit audit = new EntityAudit(TableName.PAYMENTS, historyRecordId, ChangeType.UPDATE);
+        transactional.insertAuditFromTransaction(audit, context);
+    }
+
     private void updatePaymentStatusFromTransaction(final UUID paymentId, final PaymentStatus paymentStatus, final CallContext context, final PaymentSqlDao transactional) {
         transactional.updatePaymentStatus(paymentId.toString(), paymentStatus.toString(), context);
         PaymentModelDao savedPayment = transactional.getPayment(paymentId.toString());
@@ -223,4 +252,7 @@ public class AuditedPaymentDao implements PaymentDao {
     public List<PaymentAttemptModelDao> getAttemptsForPayment(UUID paymentId) {
         return paymentAttemptSqlDao.getPaymentAttempts(paymentId.toString());
     }
+
+
+
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptHistoryBinder.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptHistoryBinder.java
index 4cae8de..b08658d 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptHistoryBinder.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptHistoryBinder.java
@@ -43,11 +43,12 @@ public @interface PaymentAttemptHistoryBinder {
                 public void bind(@SuppressWarnings("rawtypes") SQLStatement q, PaymentAttemptHistoryBinder bind, EntityHistory<PaymentAttemptModelDao> history) {
                     q.bind("recordId", history.getValue());
                     q.bind("changeType", history.getChangeType().toString());
-                    PaymentAttemptModelDao payment = history.getEntity();
-                    q.bind("id", payment.getId().toString());
-                    q.bind("paymentId", payment.getPaymentId().toString());            
-                    q.bind("processingStatus", payment.getPaymentStatus().toString());
-                    q.bind("paymentError", payment.getPaymentError());                    
+                    PaymentAttemptModelDao attempt = history.getEntity();
+                    q.bind("id", attempt.getId().toString());
+                    q.bind("paymentId", attempt.getPaymentId().toString());            
+                    q.bind("processingStatus", attempt.getPaymentStatus().toString());
+                    q.bind("paymentError", attempt.getPaymentError());   
+                    q.bind("requestedAmount", attempt.getRequestedAmount());                           
                 }
             };
         }
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptModelDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptModelDao.java
index 25254be..b8f66b7 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptModelDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptModelDao.java
@@ -15,6 +15,7 @@
  */
 package com.ning.billing.payment.dao;
 
+import java.math.BigDecimal;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -29,25 +30,29 @@ public class PaymentAttemptModelDao extends EntityBase {
     private final UUID paymentId;
     private final PaymentStatus processingStatus;
     private final DateTime effectiveDate;
-    private final String paymentError;        
+    private final String paymentError;
+    private final BigDecimal requestedAmount; 
     
     public PaymentAttemptModelDao(UUID id, UUID accountId, UUID invoiceId,
-            UUID paymentId, PaymentStatus processingStatus, DateTime effectiveDate, String paymentError) {
+            UUID paymentId, PaymentStatus processingStatus, DateTime effectiveDate,
+            BigDecimal requestedAmount, String paymentError) {
         super(id);
         this.accountId = accountId;
         this.invoiceId = invoiceId;
         this.paymentId = paymentId;
         this.processingStatus = processingStatus;
         this.effectiveDate = effectiveDate;
+        this.requestedAmount = requestedAmount;
         this.paymentError = paymentError;
     }
     
-    public PaymentAttemptModelDao(UUID accountId, UUID invoiceId, UUID paymentId, DateTime effectiveDate) {
-        this(UUID.randomUUID(), accountId, invoiceId, paymentId, PaymentStatus.UNKNOWN, effectiveDate, null);
+    public PaymentAttemptModelDao(UUID accountId, UUID invoiceId, UUID paymentId, DateTime effectiveDate, BigDecimal requestedAmount) {
+        this(UUID.randomUUID(), accountId, invoiceId, paymentId, PaymentStatus.UNKNOWN, effectiveDate, requestedAmount,  null);
     }
 
     public PaymentAttemptModelDao(PaymentAttemptModelDao src, PaymentStatus newProcessingStatus, String paymentError) {
-        this(src.getId(), src.getAccountId(), src.getInvoiceId(), src.getPaymentId(), newProcessingStatus, src.getEffectiveDate(), paymentError);
+        this(src.getId(), src.getAccountId(), src.getInvoiceId(), src.getPaymentId(), newProcessingStatus,
+                src.getEffectiveDate(), src.getRequestedAmount(), paymentError);
     }
     
     public UUID getAccountId() {
@@ -73,4 +78,8 @@ public class PaymentAttemptModelDao extends EntityBase {
     public String getPaymentError() {
         return paymentError;
     }
+    
+    public BigDecimal getRequestedAmount() {
+        return requestedAmount;
+    }
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptSqlDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptSqlDao.java
index 3e6dd3a..8932594 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptSqlDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentAttemptSqlDao.java
@@ -15,6 +15,7 @@
  */
 package com.ning.billing.payment.dao;
 
+import java.math.BigDecimal;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
@@ -78,6 +79,7 @@ public interface PaymentAttemptSqlDao extends Transactional<PaymentAttemptSqlDao
             stmt.bind("paymentId", attempt.getPaymentId().toString());            
             stmt.bind("processingStatus", attempt.getPaymentStatus().toString());
             stmt.bind("paymentError", attempt.getPaymentError());            
+            stmt.bind("requestedAmount", attempt.getRequestedAmount());            
         }
     }
     public static class PaymentAttemptModelDaoMapper extends MapperBase implements ResultSetMapper<PaymentAttemptModelDao> {
@@ -92,7 +94,8 @@ public interface PaymentAttemptSqlDao extends Transactional<PaymentAttemptSqlDao
             DateTime effectiveDate = getDate(rs, "effective_date");            
             PaymentStatus processingStatus = PaymentStatus.valueOf(rs.getString("processing_status"));
             String paymentError = rs.getString("payment_error");
-            return new PaymentAttemptModelDao(id, accountId, invoiceId, paymentId, processingStatus, effectiveDate, paymentError);
+            BigDecimal requestedAmount = rs.getBigDecimal("requested_amount");
+            return new PaymentAttemptModelDao(id, accountId, invoiceId, paymentId, processingStatus, effectiveDate, requestedAmount, paymentError);
         }
     }
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
index f08e94f..a951454 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
@@ -30,7 +30,9 @@ public interface PaymentDao {
 
     public PaymentAttemptModelDao insertNewAttemptForPayment(final UUID paymentId, final PaymentAttemptModelDao attempt, final CallContext context);
 
-    
+
+    public void updateStatusForPayment(final UUID paymentId, final PaymentStatus paymentStatus, final CallContext context);    
+
     public void updateStatusForPaymentWithAttempt(final UUID paymentId, final PaymentStatus paymentStatus, final String paymentError, final UUID attemptId, final CallContext context);
     
     public PaymentAttemptModelDao getPaymentAttempt(final UUID attemptId);
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentSqlDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentSqlDao.java
index 6e643a0..11c4a8e 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentSqlDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentSqlDao.java
@@ -55,7 +55,11 @@ public interface PaymentSqlDao extends Transactional<PaymentSqlDao>, UpdatableEn
     @SqlUpdate
     void updatePaymentStatus(@Bind("id") final String paymentId, @Bind("paymentStatus") final String paymentStatus,
             @CallContextBinder final CallContext context);
-    
+
+    @SqlUpdate
+    void updatePaymentAmount(@Bind("id") final String paymentId, @Bind("amount") final BigDecimal amount,
+            @CallContextBinder final CallContext context);
+
     @SqlQuery
     PaymentModelDao getPayment(@Bind("id") final String paymentId);
  
diff --git a/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java b/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
index 6fdb703..904697e 100644
--- a/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
+++ b/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
@@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory;
 import com.google.inject.Inject;
 import com.ning.billing.lifecycle.LifecycleHandlerType;
 import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
-import com.ning.billing.payment.RequestProcessor;
+import com.ning.billing.payment.InvoiceHandler;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentService;
 import com.ning.billing.payment.retry.FailedPaymentRetryService;
@@ -38,14 +38,14 @@ public class DefaultPaymentService implements PaymentService {
     // STEPH for retry crappiness
     public static final String SERVICE_NAME = "payment-service";
 
-    private final RequestProcessor requestProcessor;
+    private final InvoiceHandler requestProcessor;
     private final Bus eventBus;
     private final PaymentApi api;
     private final FailedPaymentRetryService failedRetryService;
     private final TimedoutPaymentRetryService timedoutRetryService;
 
     @Inject
-    public DefaultPaymentService(final RequestProcessor requestProcessor, final PaymentApi api, final Bus eventBus,
+    public DefaultPaymentService(final InvoiceHandler requestProcessor, final PaymentApi api, final Bus eventBus,
             final FailedPaymentRetryService failedRetryService, final TimedoutPaymentRetryService timedoutRetryService) {
         this.requestProcessor = requestProcessor;
         this.eventBus = eventBus;
diff --git a/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java b/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java
index 4056f99..0085b88 100644
--- a/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java
@@ -17,12 +17,16 @@
 package com.ning.billing.payment.glue;
 
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import org.skife.config.ConfigurationObjectFactory;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
 import com.ning.billing.config.PaymentConfig;
-import com.ning.billing.payment.RequestProcessor;
+import com.ning.billing.payment.InvoiceHandler;
 import com.ning.billing.payment.api.DefaultPaymentApi;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentService;
@@ -39,6 +43,12 @@ import com.ning.billing.payment.retry.FailedPaymentRetryService.FailedPaymentRet
 import com.ning.billing.payment.retry.TimedoutPaymentRetryService;
 
 public class PaymentModule extends AbstractModule {
+    
+    private final static int PLUGIN_NB_THREADS = 3;
+    private final static String PLUGIN_THREAD_PREFIX = "Plugin-th-";
+    public final static String PLUGIN_EXECUTOR = "PluginExecutor";
+    
+
     private final Properties props;
 
     public PaymentModule() {
@@ -62,7 +72,18 @@ public class PaymentModule extends AbstractModule {
         bind(FailedPaymentRetryServiceScheduler.class).asEagerSingleton();
     }
     
+    
     protected void installProcessors() {
+        final ExecutorService pluginExecutorService = Executors.newFixedThreadPool(PLUGIN_NB_THREADS, new ThreadFactory() {
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread th = new Thread(r);
+                th.setName(PLUGIN_THREAD_PREFIX + th.getId());
+                return th;
+            }
+        });
+        bind(ExecutorService.class).annotatedWith(Names.named(PLUGIN_EXECUTOR)).toInstance(pluginExecutorService);
         bind(AccountProcessor.class).asEagerSingleton();
         bind(PaymentProcessor.class).asEagerSingleton();
         bind(RefundProcessor.class).asEagerSingleton();
@@ -77,7 +98,7 @@ public class PaymentModule extends AbstractModule {
         bind(PaymentConfig.class).toInstance(paymentConfig);
         bind(PaymentProviderPluginRegistry.class).to(DefaultPaymentProviderPluginRegistry.class).asEagerSingleton();
         bind(PaymentApi.class).to(DefaultPaymentApi.class).asEagerSingleton();
-        bind(RequestProcessor.class).asEagerSingleton();
+        bind(InvoiceHandler.class).asEagerSingleton();
         bind(PaymentService.class).to(DefaultPaymentService.class).asEagerSingleton();
         installPaymentProviderPlugins(paymentConfig);
         installPaymentDao();
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
new file mode 100644
index 0000000..5ea5125
--- /dev/null
+++ b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
@@ -0,0 +1,109 @@
+/* 
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.ning.billing.payment.retry;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.config.PaymentConfig;
+import com.ning.billing.payment.glue.DefaultPaymentService;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public abstract class BaseRetryService implements RetryService {
+    
+    private static final Logger log = LoggerFactory.getLogger(BaseRetryService.class);
+    
+    private final NotificationQueueService notificationQueueService;
+    private final Clock clock;
+    private final PaymentConfig config;
+    
+    private NotificationQueue retryQueue;
+    
+    public BaseRetryService(final NotificationQueueService notificationQueueService,
+            final Clock clock, final PaymentConfig config) {
+        this.notificationQueueService = notificationQueueService;
+        this.clock = clock;
+        this.config = config;
+    }
+   
+
+    @Override
+    public void initialize(final String svcName) throws NotificationQueueAlreadyExists {
+        retryQueue = notificationQueueService.createNotificationQueue(svcName, getQueueName(), new NotificationQueueHandler() {
+            @Override
+            public void handleReadyNotification(String notificationKey, DateTime eventDateTime) {
+                retry(UUID.fromString(notificationKey));
+            }
+        },
+        config);
+    }
+
+    @Override
+    public void start() {
+        retryQueue.startQueue();
+    }
+
+    @Override
+    public void stop() throws NoSuchNotificationQueue {
+        if (retryQueue != null) {
+            retryQueue.stopQueue();
+            notificationQueueService.deleteNotificationQueue(retryQueue.getServiceName(), retryQueue.getQueueName());
+        }
+    }
+    
+    public abstract String getQueueName();
+    
+    
+    public abstract static class RetryServiceScheduler {
+        
+        private final NotificationQueueService notificationQueueService;
+        
+        @Inject
+        public RetryServiceScheduler(final NotificationQueueService notificationQueueService) {
+            this.notificationQueueService = notificationQueueService;
+        }
+    
+        
+        public void scheduleRetry(final UUID paymentId, final DateTime timeOfRetry) {
+
+            try {
+                NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, getQueueName());
+                NotificationKey key = new NotificationKey() {
+                    @Override
+                    public String toString() {
+                        return paymentId.toString();
+                    }
+                };
+                if (retryQueue != null) {
+                    retryQueue.recordFutureNotification(timeOfRetry, key);
+                }
+            } catch (NoSuchNotificationQueue e) {
+                log.error(String.format("Failed to retrieve notification queue %s:%s", DefaultPaymentService.SERVICE_NAME, getQueueName()));
+            }
+        }
+        public abstract String getQueueName();
+    }
+}
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
index 5cb6c4d..b6807e5 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
@@ -16,7 +16,6 @@
 
 package com.ning.billing.payment.retry;
 
-import java.util.List;
 import java.util.UUID;
 
 import com.ning.billing.util.callcontext.CallContext;
@@ -31,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import com.google.inject.Inject;
 import com.ning.billing.account.api.AccountUserApi;
 import com.ning.billing.config.PaymentConfig;
+import com.ning.billing.payment.api.PaymentApiException;
 import com.ning.billing.payment.core.PaymentProcessor;
 import com.ning.billing.payment.dao.PaymentDao;
 import com.ning.billing.payment.glue.DefaultPaymentService;
@@ -43,18 +43,14 @@ import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotifi
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 
-public class FailedPaymentRetryService implements RetryService {
+public class FailedPaymentRetryService extends BaseRetryService implements RetryService {
     
     private static final Logger log = LoggerFactory.getLogger(FailedPaymentRetryService.class);
     
     public static final String QUEUE_NAME = "failed-retry";
 
-    private final Clock clock;
-    private final NotificationQueueService notificationQueueService;
-    private final PaymentConfig config;
     private final PaymentProcessor paymentProcessor;
-    
-    private NotificationQueue retryQueue;
+
     
     @Inject
     public FailedPaymentRetryService(final AccountUserApi accountUserApi,
@@ -63,70 +59,33 @@ public class FailedPaymentRetryService implements RetryService {
             final PaymentConfig config,
             final PaymentProcessor paymentProcessor,
             final PaymentDao paymentDao) {
-        this.clock = clock;
-        this.notificationQueueService = notificationQueueService;
-        this.config = config;
+        super(notificationQueueService, clock, config);
         this.paymentProcessor = paymentProcessor;
     }
 
-    @Override
-    public void initialize(final String svcName) throws NotificationQueueAlreadyExists {
-        retryQueue = notificationQueueService.createNotificationQueue(svcName, QUEUE_NAME, new NotificationQueueHandler() {
-            @Override
-            public void handleReadyNotification(String notificationKey, DateTime eventDateTime) {
-                CallContext context = new DefaultCallContext("FailedRetryService", CallOrigin.INTERNAL, UserType.SYSTEM, clock);
-                retry(UUID.fromString(notificationKey), context);
-            }
-        },
-        config);
-    }
-
-    @Override
-    public void start() {
-        retryQueue.startQueue();
-    }
-
-    @Override
-    public void stop() throws NoSuchNotificationQueue {
-        if (retryQueue != null) {
-            retryQueue.stopQueue();
-            notificationQueueService.deleteNotificationQueue(retryQueue.getServiceName(), retryQueue.getQueueName());
-        }
-    }
-
     
 
-    private void retry(final UUID paymentId, final CallContext context) {
-        paymentProcessor.retryPayment(paymentId);
+    @Override
+    public void retry(final UUID paymentId) {
+        paymentProcessor.retryFailedPayment(paymentId);
     }
     
     
-    public static class FailedPaymentRetryServiceScheduler {
-        
-        private final NotificationQueueService notificationQueueService;
+    public static class FailedPaymentRetryServiceScheduler extends RetryServiceScheduler {
         
         @Inject
         public FailedPaymentRetryServiceScheduler(final NotificationQueueService notificationQueueService) {
-            this.notificationQueueService = notificationQueueService;
+            super(notificationQueueService);
         }
-    
-        
-        public void scheduleRetry(final UUID paymentId, final DateTime timeOfRetry) {
 
-            try {
-                NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, QUEUE_NAME);
-                NotificationKey key = new NotificationKey() {
-                    @Override
-                    public String toString() {
-                        return paymentId.toString();
-                    }
-                };
-                if (retryQueue != null) {
-                    retryQueue.recordFutureNotification(timeOfRetry, key);
-                }
-            } catch (NoSuchNotificationQueue e) {
-                log.error(String.format("Failed to retrieve notification queue %s:%s", DefaultPaymentService.SERVICE_NAME, QUEUE_NAME));
-            }
+        @Override
+        public String getQueueName() {
+            return QUEUE_NAME;
         }
     }
+
+    @Override
+    public String getQueueName() {
+        return QUEUE_NAME;
+    }
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/RetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/RetryService.java
index 52bf1a9..8f181bd 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/RetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/RetryService.java
@@ -15,6 +15,8 @@
  */
 package com.ning.billing.payment.retry;
 
+import java.util.UUID;
+
 import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
 
@@ -27,5 +29,9 @@ public interface RetryService {
     
     public void stop()
         throws NoSuchNotificationQueue;
+    
+    public String getQueueName();
+    
+    public void retry(UUID paymentId);
 
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/TimedoutPaymentRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/TimedoutPaymentRetryService.java
index d1cbf77..3a97f69 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/TimedoutPaymentRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/TimedoutPaymentRetryService.java
@@ -17,103 +17,62 @@ package com.ning.billing.payment.retry;
 
 import java.util.UUID;
 
-import org.joda.time.DateTime;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
+import com.ning.billing.account.api.AccountUserApi;
 import com.ning.billing.config.PaymentConfig;
-import com.ning.billing.util.callcontext.CallContext;
-import com.ning.billing.util.callcontext.CallOrigin;
-import com.ning.billing.util.callcontext.DefaultCallContext;
-import com.ning.billing.util.callcontext.UserType;
+import com.ning.billing.payment.core.PaymentProcessor;
+import com.ning.billing.payment.dao.PaymentDao;
+
 import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.notificationq.NotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 
-public class TimedoutPaymentRetryService implements RetryService {
+public class TimedoutPaymentRetryService extends BaseRetryService implements RetryService {
 
-    
-    
   private static final Logger log = LoggerFactory.getLogger(TimedoutPaymentRetryService.class);
     
     public static final String QUEUE_NAME = "timedout-retry";
 
-    private final Clock clock;
-    private final NotificationQueueService notificationQueueService;
-    private final PaymentConfig config;
-    private NotificationQueue retryQueue;
+    private final PaymentProcessor paymentProcessor;
 
+    
     @Inject
-    public TimedoutPaymentRetryService(Clock clock,
-                        NotificationQueueService notificationQueueService,
-                        PaymentConfig config) {
-        this.clock = clock;
-        this.notificationQueueService = notificationQueueService;
-        this.config = config;
+    public TimedoutPaymentRetryService(final AccountUserApi accountUserApi,
+            final Clock clock,
+            final NotificationQueueService notificationQueueService,
+            final PaymentConfig config,
+            final PaymentProcessor paymentProcessor,
+            final PaymentDao paymentDao) {
+        super(notificationQueueService, clock, config);
+        this.paymentProcessor = paymentProcessor;
     }
 
-    @Override
-    public void initialize(final String svcName) throws NotificationQueueAlreadyExists {
-        retryQueue = notificationQueueService.createNotificationQueue(svcName, QUEUE_NAME, new NotificationQueueHandler() {
-            @Override
-            public void handleReadyNotification(String notificationKey, DateTime eventDateTime) {
-                CallContext context = new DefaultCallContext("TimedoutRetryService", CallOrigin.INTERNAL, UserType.SYSTEM, clock);
-                retry(notificationKey, context);
-            }
-        },
-        config);
-    }
-
-    @Override
-    public void start() {
-        retryQueue.startQueue();
-    }
+    
 
     @Override
-    public void stop() throws NoSuchNotificationQueue {
-        if (retryQueue != null) {
-            retryQueue.stopQueue();
-            notificationQueueService.deleteNotificationQueue(retryQueue.getServiceName(), retryQueue.getQueueName());
-         }
+    public void retry(final UUID paymentId) {
+        paymentProcessor.retryFailedPayment(paymentId);
     }
+    
+    
+    public static class TimedoutPaymentRetryServiceScheduler extends RetryServiceScheduler {
+        
+        @Inject
+        public TimedoutPaymentRetryServiceScheduler(final NotificationQueueService notificationQueueService) {
+            super(notificationQueueService);
+        }
 
-    public void scheduleRetry(UUID paymentId, DateTime timeOfRetry) {
-
-        /*
-        final String id = paymentAttempt.getPaymentAttemptId().toString();
-=======
-        final String id = paymentAttempt.getId().toString();
->>>>>>> origin/integration:payment/src/main/java/com/ning/billing/payment/RetryService.java
-
-        NotificationKey key = new NotificationKey() {
-            @Override
-            public String toString() {
-                return id;
-            }
-        };
-
-        if (retryQueue != null) {
-            retryQueue.recordFutureNotification(timeOfRetry, key);
+        @Override
+        public String getQueueName() {
+            return QUEUE_NAME;
         }
-        */
     }
 
-    private void retry(String paymentAttemptId, CallContext context) {
-        
-        /*
-        try {
-            PaymentInfoEvent paymentInfo = paymentApi.getPaymentInfoForPaymentAttemptId(paymentAttemptId);
-            if (paymentInfo != null && PaymentStatus.Processed.equals(PaymentStatus.valueOf(paymentInfo.getStatus()))) {
-                return;
-            }
-            paymentApi.createPaymentForPaymentAttempt(UUID.fromString(paymentAttemptId), context);
-        } catch (PaymentApiException e) {
-            log.error(String.format("Failed to retry payment for %s",paymentAttemptId), e);
-        }
-        */
+    @Override
+    public String getQueueName() {
+        return QUEUE_NAME;
     }
 }
diff --git a/payment/src/main/resources/com/ning/billing/payment/dao/PaymentAttemptSqlDao.sql.stg b/payment/src/main/resources/com/ning/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
index aaf0062..174b349 100644
--- a/payment/src/main/resources/com/ning/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
+++ b/payment/src/main/resources/com/ning/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
@@ -5,6 +5,7 @@ paymentAttemptFields(prefix) ::= <<
     <prefix>payment_id,    
     <prefix>payment_error,
     <prefix>processing_status,
+    <prefix>requested_amount,
     <prefix>created_by,
     <prefix>created_date,
     <prefix>updated_by,
@@ -13,7 +14,7 @@ paymentAttemptFields(prefix) ::= <<
 
 insertPaymentAttempt() ::= <<
     INSERT INTO payment_attempts (<paymentAttemptFields()>)
-    VALUES (:id, :paymentId, :paymentError, :processingStatus, :userName, :createdDate, :userName, :createdDate);
+    VALUES (:id, :paymentId, :paymentError, :processingStatus, :requestedAmount, :userName, :createdDate, :userName, :createdDate);
 >>
 
 getPaymentAttempt() ::= <<
@@ -57,6 +58,7 @@ historyFields(prefix) ::= <<
     <prefix>payment_id,
     <prefix>payment_error,
     <prefix>processing_status,
+    <prefix>requested_amount,    
     <prefix>created_by,
     <prefix>created_date,
     <prefix>updated_by,
@@ -65,7 +67,7 @@ historyFields(prefix) ::= <<
 
 insertHistoryFromTransaction() ::= <<
     INSERT INTO payment_attempt_history (<historyFields()>)
-    VALUES (:recordId, :id, :paymentId, :paymentError, :processingStatus, :userName, :createdDate, :userName, :updatedDate);
+    VALUES (:recordId, :id, :paymentId, :paymentError, :processingStatus, :requestedAmount, :userName, :createdDate, :userName, :updatedDate);
 >>
 
 getHistoryRecordId() ::= <<
diff --git a/payment/src/main/resources/com/ning/billing/payment/dao/PaymentSqlDao.sql.stg b/payment/src/main/resources/com/ning/billing/payment/dao/PaymentSqlDao.sql.stg
index be9e720..2dc34b6 100644
--- a/payment/src/main/resources/com/ning/billing/payment/dao/PaymentSqlDao.sql.stg
+++ b/payment/src/main/resources/com/ning/billing/payment/dao/PaymentSqlDao.sql.stg
@@ -48,6 +48,12 @@ updatePaymentStatus() ::= <<
     WHERE id = :id;
 >>
 
+updatePaymentAmount() ::= <<
+    UPDATE payments
+    SET amount = :amount
+    WHERE id = :id;
+>>
+
 getRecordId() ::= <<
     SELECT record_id
     FROM payments
diff --git a/payment/src/main/resources/com/ning/billing/payment/ddl.sql b/payment/src/main/resources/com/ning/billing/payment/ddl.sql
index 391d343..c7b11e2 100644
--- a/payment/src/main/resources/com/ning/billing/payment/ddl.sql
+++ b/payment/src/main/resources/com/ning/billing/payment/ddl.sql
@@ -45,7 +45,8 @@ CREATE TABLE payment_attempts (
     id char(36) NOT NULL,
     payment_id char(36) COLLATE utf8_bin NOT NULL,
     payment_error varchar(256),              
-    processing_status varchar(20),        
+    processing_status varchar(20),
+    requested_amount decimal(8,2),      
     created_by varchar(50) NOT NULL,
     created_date datetime NOT NULL,
     updated_by varchar(50) NOT NULL,
@@ -62,7 +63,8 @@ CREATE TABLE payment_attempt_history (
     id char(36) NOT NULL,
     payment_id char(36) COLLATE utf8_bin NOT NULL,
     payment_error varchar(256),              
-    processing_status varchar(20),        
+    processing_status varchar(20),
+    requested_amount decimal(8,2),            
     created_by varchar(50) NOT NULL,
     created_date datetime NOT NULL,
     updated_by varchar(50) NOT NULL,
diff --git a/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java b/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
index 62e76e4..9b963c0 100644
--- a/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
+++ b/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
@@ -19,6 +19,7 @@ package com.ning.billing.payment.api;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
@@ -27,12 +28,16 @@ import java.util.UUID;
 import org.apache.commons.lang.RandomStringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import com.google.inject.Inject;
+import com.ning.billing.ErrorCode;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.invoice.api.Invoice;
@@ -58,6 +63,9 @@ import com.ning.billing.util.glue.CallContextModule;
 @Guice(modules = { PaymentTestModuleWithMocks.class, MockClockModule.class, MockJunctionModule.class, CallContextModule.class })
 @Test(groups = "fast")
 public class TestPaymentApi {
+    
+    private static final Logger log = LoggerFactory.getLogger(TestPaymentApi.class);
+    
     @Inject
     private Bus eventBus;
     @Inject
@@ -84,14 +92,54 @@ public class TestPaymentApi {
         eventBus.stop();
     }
 
+    
     @Test(enabled=true)
-    public void testCreateCreditCardPayment() throws Exception {
+    public void testSimplePaymentWithNoAmount() throws Exception {
+        final BigDecimal invoiceAmount = new BigDecimal("10.0011");
+        final BigDecimal requestedAmount = null;
+        final BigDecimal expectedAmount = invoiceAmount;        
+        
+        testSimplePayment(invoiceAmount, requestedAmount, expectedAmount);
+    }
+
+    @Test(enabled=true)
+    public void testSimplePaymentWithInvoiceAmount() throws Exception {
+        final BigDecimal invoiceAmount = new BigDecimal("10.0011");
+        final BigDecimal requestedAmount = invoiceAmount;
+        final BigDecimal expectedAmount = invoiceAmount;        
+        
+        testSimplePayment(invoiceAmount, requestedAmount, expectedAmount);
+    }
+
+    @Test(enabled=true)
+    public void testSimplePaymentWithLowerAmount() throws Exception {
+        final BigDecimal invoiceAmount = new BigDecimal("10.0011");
+        final BigDecimal requestedAmount = new BigDecimal("8.0091");
+        final BigDecimal expectedAmount = requestedAmount;        
+        
+        testSimplePayment(invoiceAmount, requestedAmount, expectedAmount);
+    }
+
+    @Test(enabled=true)
+    public void testSimplePaymentWithInvalidAmount() throws Exception {
+        final BigDecimal invoiceAmount = new BigDecimal("10.0011");
+        final BigDecimal requestedAmount = new BigDecimal("80.0091");
+        final BigDecimal expectedAmount = null;        
+        
+        testSimplePayment(invoiceAmount, requestedAmount, expectedAmount);
+
+    }
+
+    
+
+    private void testSimplePayment(BigDecimal invoiceAmount, BigDecimal requestedAmount, BigDecimal expectedAmount) throws Exception {
+        
         ((ZombieControl)invoicePaymentApi).addResult("notifyOfPaymentAttempt", BrainDeadProxyFactory.ZOMBIE_VOID);
 
         final DateTime now = new DateTime(DateTimeZone.UTC);
         final Account account = testHelper.createTestCreditCardAccount();
         final Invoice invoice = testHelper.createTestInvoice(account, now, Currency.USD);
-        final BigDecimal amount = new BigDecimal("10.0011");
+        
         final UUID subscriptionId = UUID.randomUUID();
         final UUID bundleId = UUID.randomUUID();
 
@@ -102,24 +150,34 @@ public class TestPaymentApi {
                                                        "test plan", "test phase",
                                                        now,
                                                        now.plusMonths(1),
-                                                       amount,
+                                                       invoiceAmount,
                                                        new BigDecimal("1.0"),
                                                        Currency.USD));
 
-        Payment paymentInfo = paymentApi.createPayment(account.getExternalKey(), invoice.getId(), context);
-
-        assertNotNull(paymentInfo.getId());
-        assertTrue(paymentInfo.getAmount().compareTo(amount.setScale(2, RoundingMode.HALF_EVEN)) == 0);
-        assertNotNull(paymentInfo.getPaymentNumber());
-        assertEquals(paymentInfo.getPaymentStatus(), PaymentStatus.SUCCESS);
-        assertEquals(paymentInfo.getAttempts().size(), 1);
-        assertEquals(paymentInfo.getInvoiceId(), invoice.getId());
-        assertTrue(paymentInfo.getAmount().compareTo(amount.setScale(2, RoundingMode.HALF_EVEN)) == 0);
-        assertEquals(paymentInfo.getCurrency(), Currency.USD);
-        
-        PaymentAttempt paymentAttempt = paymentInfo.getAttempts().get(0);
-        assertNotNull(paymentAttempt);
-        assertNotNull(paymentAttempt.getId());
+        try {
+            Payment paymentInfo = paymentApi.createPayment(account.getExternalKey(), invoice.getId(), requestedAmount, context);
+            if (expectedAmount == null) {
+                fail("Expected to fail because requested amount > invoice amount");
+            }
+            assertNotNull(paymentInfo.getId());
+            assertTrue(paymentInfo.getAmount().compareTo(expectedAmount.setScale(2, RoundingMode.HALF_EVEN)) == 0);
+            assertNotNull(paymentInfo.getPaymentNumber());
+            assertEquals(paymentInfo.getPaymentStatus(), PaymentStatus.SUCCESS);
+            assertEquals(paymentInfo.getAttempts().size(), 1);
+            assertEquals(paymentInfo.getInvoiceId(), invoice.getId());
+            assertEquals(paymentInfo.getCurrency(), Currency.USD);
+
+            PaymentAttempt paymentAttempt = paymentInfo.getAttempts().get(0);
+            assertNotNull(paymentAttempt);
+            assertNotNull(paymentAttempt.getId());
+        } catch (PaymentApiException e) {
+            if (expectedAmount != null) {
+                fail("Failed to create payment", e);
+            } else {
+                log.info(e.getMessage());
+                assertEquals(e.getCode(), ErrorCode.PAYMENT_AMOUNT_DENIED.getCode());
+            }
+        }
     }
 
     private PaymentProviderAccount setupAccountWithPaypalPaymentMethod() throws Exception  {
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
index cccb124..7ab8b57 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
@@ -63,6 +63,18 @@ public class MockPaymentDao implements PaymentDao {
             }
         }
     }
+    
+    @Override
+    public void updateStatusForPayment(UUID paymentId,
+            PaymentStatus paymentStatus, CallContext context) {
+        synchronized(this) {
+            PaymentModelDao entry = payments.remove(paymentId);
+            if (entry != null) {
+               payments.put(paymentId, new PaymentModelDao(entry, paymentStatus));
+            }
+        }
+    }
+
 
     @Override
     public PaymentAttemptModelDao getPaymentAttempt(UUID attemptId) {
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
index e3b6a6c..f81262a 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
@@ -19,6 +19,7 @@ import static org.testng.Assert.assertEquals;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.util.List;
 import java.util.UUID;
 
@@ -26,7 +27,6 @@ import org.apache.commons.io.IOUtils;
 import org.joda.time.DateTime;
 import org.skife.config.ConfigurationObjectFactory;
 import org.skife.jdbi.v2.IDBI;
-import org.testng.Assert;
 import org.testng.annotations.AfterSuite;
 
 import org.testng.annotations.BeforeSuite;
@@ -37,8 +37,6 @@ import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.dbi.DBIProvider;
 import com.ning.billing.dbi.DbiConfig;
 import com.ning.billing.dbi.MysqlTestingHelper;
-import com.ning.billing.payment.api.DefaultPayment;
-import com.ning.billing.payment.api.Payment;
 import com.ning.billing.payment.api.PaymentStatus;
 import com.ning.billing.util.callcontext.CallContext;
 import com.ning.billing.util.callcontext.TestCallContext;
@@ -92,7 +90,7 @@ public class TestPaymentDao {
         helper.cleanupAllTables();
     }
     
-    
+ 
     
     @Test(groups={"slow"})
     public void testUpdateStatus() {
@@ -104,7 +102,7 @@ public class TestPaymentDao {
         DateTime effectiveDate = clock.getUTCNow();
         
         PaymentModelDao payment = new PaymentModelDao(accountId, invoiceId, amount, currency, effectiveDate);
-        PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow());
+        PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), amount);
         PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, context);
         
         PaymentStatus paymentStatus = PaymentStatus.SUCCESS;
@@ -122,7 +120,6 @@ public class TestPaymentDao {
         assertEquals(savedPayment.getAmount().compareTo(amount), 0);        
         assertEquals(savedPayment.getCurrency(), currency);         
         assertEquals(savedPayment.getEffectiveDate().compareTo(effectiveDate), 0); 
-        Assert.assertNotEquals(savedPayment.getPaymentNumber(), PaymentModelDao.INVALID_PAYMENT_NUMBER);
         assertEquals(savedPayment.getPaymentStatus(), PaymentStatus.SUCCESS); 
         
         List<PaymentAttemptModelDao> attempts =  paymentDao.getAttemptsForPayment(payment.getId());
@@ -133,7 +130,8 @@ public class TestPaymentDao {
         assertEquals(savedAttempt.getAccountId(), accountId);
         assertEquals(savedAttempt.getInvoiceId(), invoiceId); 
         assertEquals(savedAttempt.getPaymentStatus(), PaymentStatus.SUCCESS);
-        assertEquals(savedAttempt.getPaymentError(), paymentError);        
+        assertEquals(savedAttempt.getPaymentError(), paymentError); 
+        assertEquals(savedAttempt.getRequestedAmount().compareTo(amount), 0);                
     }
     
     @Test(groups={"slow"})
@@ -146,7 +144,7 @@ public class TestPaymentDao {
         DateTime effectiveDate = clock.getUTCNow();
         
         PaymentModelDao payment = new PaymentModelDao(accountId, invoiceId, amount, currency, effectiveDate);
-        PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow());
+        PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), amount);
         
         PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, context);
         assertEquals(savedPayment.getId(), payment.getId());
@@ -156,7 +154,6 @@ public class TestPaymentDao {
         assertEquals(savedPayment.getAmount().compareTo(amount), 0);        
         assertEquals(savedPayment.getCurrency(), currency);         
         assertEquals(savedPayment.getEffectiveDate().compareTo(effectiveDate), 0); 
-        assertEquals(savedPayment.getPaymentNumber(), new Integer(1));
         assertEquals(savedPayment.getPaymentStatus(), PaymentStatus.UNKNOWN); 
 
         
@@ -177,7 +174,6 @@ public class TestPaymentDao {
         assertEquals(savedPayment.getAmount().compareTo(amount), 0);        
         assertEquals(savedPayment.getCurrency(), currency);         
         assertEquals(savedPayment.getEffectiveDate().compareTo(effectiveDate), 0); 
-        assertEquals(savedPayment.getPaymentNumber(), new Integer(1));
         assertEquals(savedPayment.getPaymentStatus(), PaymentStatus.UNKNOWN); 
         
         List<PaymentAttemptModelDao> attempts =  paymentDao.getAttemptsForPayment(payment.getId());
@@ -190,7 +186,54 @@ public class TestPaymentDao {
         assertEquals(savedAttempt.getPaymentStatus(), PaymentStatus.UNKNOWN);
         
     }
+    
+    @Test(groups={"slow"})
+    public void testNewAttempt() {
+        UUID accountId = UUID.randomUUID();
+        UUID invoiceId = UUID.randomUUID(); 
+        BigDecimal amount = new BigDecimal(13);
+        Currency currency = Currency.USD;
+        DateTime effectiveDate = clock.getUTCNow();
+        
+        PaymentModelDao payment = new PaymentModelDao(accountId, invoiceId, amount, currency, effectiveDate);
+        PaymentAttemptModelDao firstAttempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), amount);
+        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, firstAttempt, context);
+        
+        BigDecimal newAmount = new BigDecimal(15.23).setScale(2, RoundingMode.HALF_EVEN);
+        PaymentAttemptModelDao secondAttempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), newAmount);        
+        paymentDao.insertNewAttemptForPayment(payment.getId(), secondAttempt, context);
+        
+        List<PaymentModelDao> payments = paymentDao.getPaymentsForInvoice(invoiceId);
+        assertEquals(payments.size(), 1);
+        savedPayment = payments.get(0);
+        assertEquals(savedPayment.getId(), payment.getId());
+        assertEquals(savedPayment.getAccountId(), accountId);        
+        assertEquals(savedPayment.getInvoiceId(), invoiceId);        
+        assertEquals(savedPayment.getPaymentMethodId(), null);         
+        assertEquals(savedPayment.getAmount().compareTo(newAmount), 0);        
+        assertEquals(savedPayment.getCurrency(), currency);         
+        assertEquals(savedPayment.getEffectiveDate().compareTo(effectiveDate), 0); 
+        assertEquals(savedPayment.getPaymentStatus(), PaymentStatus.UNKNOWN); 
+
+        List<PaymentAttemptModelDao> attempts =  paymentDao.getAttemptsForPayment(payment.getId());
+        assertEquals(attempts.size(), 2);
+        PaymentAttemptModelDao savedAttempt1 = attempts.get(0);
+        assertEquals(savedAttempt1.getPaymentId(), payment.getId());
+        assertEquals(savedAttempt1.getAccountId(), accountId);
+        assertEquals(savedAttempt1.getInvoiceId(), invoiceId); 
+        assertEquals(savedAttempt1.getPaymentStatus(), PaymentStatus.UNKNOWN);
+        assertEquals(savedAttempt1.getPaymentError(), null); 
+        assertEquals(savedAttempt1.getRequestedAmount().compareTo(amount), 0);        
 
+    
+        PaymentAttemptModelDao savedAttempt2 = attempts.get(1);
+        assertEquals(savedAttempt2.getPaymentId(), payment.getId());
+        assertEquals(savedAttempt2.getAccountId(), accountId);
+        assertEquals(savedAttempt2.getInvoiceId(), invoiceId); 
+        assertEquals(savedAttempt2.getPaymentStatus(), PaymentStatus.UNKNOWN);
+        assertEquals(savedAttempt2.getPaymentError(), null); 
+        assertEquals(savedAttempt2.getRequestedAmount().compareTo(newAmount), 0);        
+}
     @Test(groups={"slow"})
     public void testPaymentMethod() {
         
diff --git a/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java b/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
index 4349993..9bad840 100644
--- a/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
+++ b/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
@@ -59,7 +59,15 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
     @Inject
     public MockPaymentProviderPlugin(Clock clock) {
         this.clock = clock;
+        clear();
     }
+    
+    
+    public void clear() {
+        makeNextInvoiceFail.set(false);
+        makeAllInvoicesFail.set(false);
+    }
+    
 
     public void makeNextPaymentFail() {
         makeNextInvoiceFail.set(true);
diff --git a/payment/src/test/java/com/ning/billing/payment/TestRetryService.java b/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
index 7156c12..d7bd6c7 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
@@ -16,26 +16,32 @@
 
 package com.ning.billing.payment;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
+import static com.jayway.awaitility.Awaitility.await;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
 
 import java.math.BigDecimal;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
 
+import com.ning.billing.payment.api.Payment;
 import com.ning.billing.payment.api.Payment.PaymentAttempt;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentApiException;
-import com.ning.billing.payment.api.PaymentInfoEvent;
-import com.ning.billing.payment.dao.PaymentDao;
+import com.ning.billing.payment.api.PaymentStatus;
+import com.ning.billing.payment.core.PaymentProcessor;
 import com.ning.billing.util.callcontext.CallContext;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.DefaultCallContext;
 import com.ning.billing.util.callcontext.UserType;
 import org.joda.time.DateTime;
-import org.joda.time.Days;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -52,28 +58,23 @@ import com.ning.billing.mock.BrainDeadProxyFactory;
 import com.ning.billing.mock.BrainDeadProxyFactory.ZombieControl;
 import com.ning.billing.mock.glue.MockClockModule;
 import com.ning.billing.mock.glue.MockJunctionModule;
+import com.ning.billing.payment.glue.DefaultPaymentService;
 import com.ning.billing.payment.glue.PaymentTestModuleWithMocks;
 import com.ning.billing.payment.provider.MockPaymentProviderPlugin;
-import com.ning.billing.payment.provider.DefaultPaymentProviderPluginRegistry;
 import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
 import com.ning.billing.payment.retry.FailedPaymentRetryService;
 import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
 import com.ning.billing.util.glue.CallContextModule;
-import com.ning.billing.util.notificationq.MockNotificationQueue;
-import com.ning.billing.util.notificationq.Notification;
-import com.ning.billing.util.notificationq.NotificationQueueService;
 
 @Guice(modules = { PaymentTestModuleWithMocks.class, MockClockModule.class, MockJunctionModule.class, CallContextModule.class })
-@Test(groups = "fast")
 public class TestRetryService {
     @Inject
     private PaymentConfig paymentConfig;
     @Inject
     private Bus eventBus;
     @Inject
-    private PaymentApi paymentApi;
+    private PaymentProcessor paymentProcessor;
     @Inject
     private InvoicePaymentApi invoicePaymentApi;
     @Inject
@@ -82,41 +83,67 @@ public class TestRetryService {
     private PaymentProviderPluginRegistry registry;
     @Inject
     private FailedPaymentRetryService retryService;
-    @Inject
-    private NotificationQueueService notificationQueueService;
 
     @Inject
-    private Clock clock;
+    private ClockMock clock;
 
     private MockPaymentProviderPlugin mockPaymentProviderPlugin;
-    private MockNotificationQueue mockNotificationQueue;
     private CallContext context;
 
-    @BeforeClass(alwaysRun = true)
+    @BeforeClass(groups = "fast")
     public void initialize() throws Exception {
-        retryService.initialize("payment-service");
+        
     }
 
-    @BeforeMethod(alwaysRun = true)
+    @BeforeMethod(groups = "fast")
     public void setUp() throws Exception {
-        eventBus.start();
-        retryService.start();
 
-        mockPaymentProviderPlugin = (MockPaymentProviderPlugin)registry.getPlugin(null);
-        mockNotificationQueue = (MockNotificationQueue)notificationQueueService.getNotificationQueue("payment-service", FailedPaymentRetryService.QUEUE_NAME);
+        retryService.initialize(DefaultPaymentService.SERVICE_NAME);
+        retryService.start();
+        eventBus.start();
+        
+        mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getPlugin(null);
+        mockPaymentProviderPlugin.clear();
+        
         context = new DefaultCallContext("RetryServiceTests", CallOrigin.INTERNAL, UserType.TEST, clock);
         ((ZombieControl)invoicePaymentApi).addResult("notifyOfPaymentAttempt", BrainDeadProxyFactory.ZOMBIE_VOID);
 
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterMethod(groups = "fast")
     public void tearDown() throws Exception {
         retryService.stop();
         eventBus.stop();
     }
 
-    @Test
-    public void testSchedulesRetry() throws Exception {
+    
+    private Payment getPaymentForInvoice(final UUID invoiceId) throws PaymentApiException {
+        List<Payment> payments = paymentProcessor.getInvoicePayments(invoiceId);
+        assertEquals(payments.size(), 1);
+        Payment payment = payments.get(0);
+        assertEquals(payment.getInvoiceId(), invoiceId);
+        return payment;
+    }
+
+
+    @Test(groups = "fast")
+    public void testFailedPaymentWithOneSuccessfulRetry() throws Exception {
+        testSchedulesRetryInternal(1);
+    }
+
+    @Test(groups = "fast")
+    public void testFailedPaymentWithLastRetrySuccess() throws Exception {
+        testSchedulesRetryInternal(paymentConfig.getPaymentRetryDays().size());
+    }
+
+    @Test(groups = "fast")
+    public void testAbortedPayment() throws Exception {
+        testSchedulesRetryInternal(paymentConfig.getPaymentRetryDays().size() + 1);
+    }
+
+
+    private void testSchedulesRetryInternal(int maxTries) throws Exception {
+        
         final Account account = testHelper.createTestCreditCardAccount();
         final Invoice invoice = testHelper.createTestInvoice(account, clock.getUTCNow(), Currency.USD);
         final BigDecimal amount = new BigDecimal("10.00");
@@ -139,78 +166,66 @@ public class TestRetryService {
         mockPaymentProviderPlugin.makeNextPaymentFail();
         boolean failed = false;
         try {
-            paymentApi.createPayment(account.getExternalKey(), invoice.getId(), context);
+            paymentProcessor.createPayment(account.getExternalKey(), invoice.getId(), amount, context, false);
         } catch (PaymentApiException e) {
             failed = true;
         }
         assertTrue(failed);
 
-        List<Notification> pendingNotifications = mockNotificationQueue.getPendingEvents();
-
-        assertEquals(pendingNotifications.size(), 1);
-
-        Notification notification = pendingNotifications.get(0);
-        // STEPH
-        /*
-        List<PaymentAttempt> paymentAttempts = paymentApi.getPaymentAttemptsForInvoiceId(invoice.getId());
-
-        assertNotNull(paymentAttempts);
-        assertEquals(notification.getNotificationKey(), paymentAttempts.get(0).getId().toString());
-
-        DateTime expectedRetryDate = paymentAttempts.get(0).getPaymentAttemptDate().plusDays(paymentConfig.getPaymentRetryDays().get(0));
-
-        assertEquals(notification.getEffectiveDate(), expectedRetryDate);
-        */
-    }
-
-    @Test(enabled = false)
-    public void testRetries() throws Exception {
-        final Account account = testHelper.createTestCreditCardAccount();
-        final Invoice invoice = testHelper.createTestInvoice(account, clock.getUTCNow(), Currency.USD);
-        final BigDecimal amount = new BigDecimal("10.00");
-        final UUID subscriptionId = UUID.randomUUID();
-        final UUID bundleId = UUID.randomUUID();
-
-        final DateTime now = clock.getUTCNow();
 
-        invoice.addInvoiceItem(new MockRecurringInvoiceItem(invoice.getId(),
-                                                       account.getId(),
-                                                       subscriptionId,
-                                                       bundleId,
-                                                       "test plan", "test phase",
-                                                       now,
-                                                       now.plusMonths(1),
-                                                       amount,
-                                                       new BigDecimal("1.0"),
-                                                       Currency.USD));
-
-        int numberOfDays = paymentConfig.getPaymentRetryDays().get(0);
-        DateTime nextRetryDate = now.plusDays(numberOfDays);
-        // STEPH
-        /*
-        PaymentAttempt paymentAttempt = new DefaultPaymentAttempt(UUID.randomUUID(), invoice, PaymentAttemptStatus.COMPLETED_FAILED).cloner()
-                                                                                      .setRetryCount(1)
-                                                                                      .setPaymentAttemptDate(now)
-                                                                                      .build();
-
-        paymentDao.createPaymentAttempt(paymentAttempt, PaymentAttemptStatus.COMPLETED_FAILED,  context);
-        retryService.scheduleRetry(paymentAttempt, nextRetryDate);
-        ((ClockMock)clock).setDeltaFromReality(Days.days(numberOfDays).toStandardSeconds().getSeconds() * 1000);
-        Thread.sleep(2000);
-
-        List<Notification> pendingNotifications = mockNotificationQueue.getPendingEvents();
-        assertEquals(pendingNotifications.size(), 0);
-
-
-        List<PaymentInfoEvent> paymentInfoList = paymentApi.getPaymentInfo(Arrays.asList(invoice.getId()));
-        assertEquals(paymentInfoList.size(), 1);
-
-        PaymentInfoEvent paymentInfo = paymentInfoList.get(0);
-        assertEquals(paymentInfo.getStatus(), PaymentStatus2.Processed.toString());
+        //int maxTries = paymentAborted ? paymentConfig.getPaymentRetryDays().size() + 1 : 1;
+        
+        for (int curFailure = 0; curFailure < maxTries; curFailure++) {
+
+            if (curFailure < maxTries - 1) {
+                mockPaymentProviderPlugin.makeNextPaymentFail();
+            }
+
+            if (curFailure < paymentConfig.getPaymentRetryDays().size()) {
+                
+                int nbDays = paymentConfig.getPaymentRetryDays().get(curFailure);            
+                clock.addDays(nbDays + 1);
+
+                try {
+                    await().atMost(3, SECONDS).until(new Callable<Boolean>() {
+                        @Override
+                        public Boolean call() throws Exception {
+                            Payment payment = getPaymentForInvoice(invoice.getId());
+                            return payment.getPaymentStatus() == PaymentStatus.SUCCESS;
+                        }
+                    });
+                } catch (TimeoutException e) {
+                    if (curFailure == maxTries - 1) {
+                        fail("Failed to find succesful payment for attempt " + (curFailure + 1) + "/" + maxTries);
+                    }
+                }
+            }
+        }
 
-        List<PaymentAttempt> updatedAttempts = paymentApi.getPaymentAttemptsForInvoiceId(invoice.getId());
-        assertEquals(paymentInfo.getId(), updatedAttempts.get(0).getPaymentId());
-        */
 
+        Payment payment = getPaymentForInvoice(invoice.getId());
+        List<PaymentAttempt> attempts = payment.getAttempts();
+        
+        int expectedAttempts = maxTries < paymentConfig.getPaymentRetryDays().size() ? maxTries + 1 : paymentConfig.getPaymentRetryDays().size() + 1;
+        assertEquals(attempts.size(), expectedAttempts);
+        Collections.sort(attempts, new Comparator<PaymentAttempt>() {
+            @Override
+            public int compare(PaymentAttempt o1, PaymentAttempt o2) {
+                return o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
+            }
+        });
+ 
+        for (int i = 0; i < attempts.size(); i++) {
+            PaymentAttempt cur = attempts.get(i);
+            if (i < attempts.size() - 1) {
+                assertEquals(cur.getPaymentStatus(), PaymentStatus.ERROR);
+            } else if (maxTries <= paymentConfig.getPaymentRetryDays().size()) {
+                assertEquals(cur.getPaymentStatus(), PaymentStatus.SUCCESS);
+                assertEquals(payment.getPaymentStatus(), PaymentStatus.SUCCESS);
+            } else {
+                assertEquals(cur.getPaymentStatus(), PaymentStatus.ERROR);      
+                assertEquals(payment.getPaymentStatus(), PaymentStatus.ABORTED);                
+            }
+        }
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index 0ffe882..2a9e149 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -115,7 +115,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
             });
         }
         try {
-            boolean success = doneInitialization.await(sleepTimeMs, TimeUnit.MILLISECONDS);
+            boolean success = doneInitialization.await(waitTimeoutMs, TimeUnit.MILLISECONDS);
             if (!success) {
                 
                 log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));