killbill-aplcache

Rewrote one more time the createPayment routine to handle various

6/2/2012 10:32:52 PM

Changes

Details

diff --git a/api/src/main/java/com/ning/billing/config/PaymentConfig.java b/api/src/main/java/com/ning/billing/config/PaymentConfig.java
index f606039..f934906 100644
--- a/api/src/main/java/com/ning/billing/config/PaymentConfig.java
+++ b/api/src/main/java/com/ning/billing/config/PaymentConfig.java
@@ -34,6 +34,18 @@ public interface PaymentConfig extends NotificationConfig, KillbillConfig  {
     @Default("8,8,8")
     public List<Integer> getPaymentRetryDays();
 
+    @Config("killbill.payment.failure.retry.start.sec")
+    @Default("300")
+    public int getPaymentFailureRetryStart();
+
+    @Config("killbill.payment.failure.retry.multiplier")
+    @Default("2")
+    public int getPaymentFailureRetryMultiplier();
+
+    @Config("killbill.payment.failure.retry.max.attempts")
+    @Default("8")
+    public int getPaymentFailureRetryMaxAttempts();
+
 	@Override
     @Config("killbill.payment.engine.notifications.sleep")
     @Default("500")
diff --git a/api/src/main/java/com/ning/billing/ErrorCode.java b/api/src/main/java/com/ning/billing/ErrorCode.java
index 6081d19..66ac58b 100644
--- a/api/src/main/java/com/ning/billing/ErrorCode.java
+++ b/api/src/main/java/com/ning/billing/ErrorCode.java
@@ -226,7 +226,7 @@ public enum ErrorCode {
     PAYMENT_NULL_INVOICE(7015, "Invoice %s has a balance <= 0 "),      
     PAYMENT_AMOUNT_DENIED(7016, "Payment amount requested for invoice %s is greater than invoice balance [%f/%f]"),         
     PAYMENT_INTERNAL_ERROR(7017, "Internal payment error : %s"),
-    PAYMENT_PLUGIN_TIMEOUT(7017, "Plugin timeout "),    
+    PAYMENT_PLUGIN_TIMEOUT(7018, "Plugin timeout for account %s and invoice %s"),    
     /*
     *
     * Range 9000: Miscellaneous
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentStatus.java b/api/src/main/java/com/ning/billing/payment/api/PaymentStatus.java
index 9e44ff7..ad77bdc 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentStatus.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentStatus.java
@@ -15,10 +15,24 @@
  */
 package com.ning.billing.payment.api;
 
+// STEPH is that the enum we want to export? seems to internal
 public enum PaymentStatus {
+    /* Success! */
+    SUCCESS,
+    /* Initial status for Payment and PaymentAttempt */
     UNKNOWN,
+    /* Status for Payment when AUTO_PAY_OFF is turned on */
     AUTO_PAY_OFF,
-    ERROR,
-    SUCCESS,
-    ABORTED
+    /* Status for Payment and PaymentAttempt when the plugin failed to make the Payment and we will schedule a FailedPaymentRetry */    
+    PAYMENT_FAILURE,
+    /* Payment failure , we already retried a maximum of time */
+    PAYMENT_FAILURE_ABORTED,
+    /* Exception from plugin, state is unknown and needs to be retried */
+    PLUGIN_FAILURE,
+    /* Exception from plugin, we already retried a maximum of time */
+    PLUGIN_FAILURE_ABORTED,    
+    /* PaymentAttenmpt timedout; When TimedoutPaymentRetry kicks in, it check moves the state to TIMEDOUT if this is still in UNKNWON state */    
+    TIMEDOUT,
+    /* Status for Payment and PaymentAttempt all TimedoutPaymentRetry failed */
+    TIMEDOUT_ABORTED,    
 }
diff --git a/api/src/main/java/com/ning/billing/payment/plugin/api/PaymentInfoPlugin.java b/api/src/main/java/com/ning/billing/payment/plugin/api/PaymentInfoPlugin.java
index aac15fa..18e4d5e 100644
--- a/api/src/main/java/com/ning/billing/payment/plugin/api/PaymentInfoPlugin.java
+++ b/api/src/main/java/com/ning/billing/payment/plugin/api/PaymentInfoPlugin.java
@@ -35,5 +35,7 @@ public interface PaymentInfoPlugin {
 
     public PaymentPluginStatus getStatus();
     
-    public String getError();
+    public String getGatewayError();
+    
+    public String getGatewayErrorCode();
 }
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
index eadc1fb..dcdc17b 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
@@ -165,7 +165,7 @@ public class TestOverdueIntegration extends TestIntegrationBase {
     @Test(groups={"slow"}, enabled = true)
     public void testBasicOverdueState() throws Exception {
         clock.setTime(new DateTime(2012, 5, 1, 0, 3, 42, 0));
-        paymentPlugin.makeAllInvoicesFail(true);
+        paymentPlugin.makeAllInvoicesFailWithException(true);
         
         // set next invoice to fail and create network 
         busHandler.pushExpectedEvents(NextEvent.CREATE, NextEvent.INVOICE);
@@ -217,7 +217,7 @@ public class TestOverdueIntegration extends TestIntegrationBase {
         // should now be in OD2 state once the update is processed
         checkODState("OD3");
         
-        paymentPlugin.makeAllInvoicesFail(false);
+        paymentPlugin.makeAllInvoicesFailWithException(false);
         Collection<Invoice> invoices = invoiceApi.getUnpaidInvoicesByAccountId(account.getId(), clock.getUTCNow());
         List<String> invoiceIds = new ArrayList<String>();
         for (Invoice invoice : invoices) {
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
index 81ba766..03d4330 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -68,7 +68,13 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
         try {
             checkOverdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
                 DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
-            checkOverdueQueue.removeNotificationsByKey(overdueable.getId());
+            NotificationKey key = new NotificationKey() {
+                @Override
+                public String toString() {
+                    return overdueable.getId().toString();
+                }
+            };
+            checkOverdueQueue.removeNotificationsByKey(key);
         } catch (NoSuchNotificationQueue e) {
             log.error("Attempting to clear items from a non-existent queue (DefaultOverdueCheck).", e);
         }
diff --git a/payment/src/main/java/com/ning/billing/payment/core/AccountProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/AccountProcessor.java
index 5757a49..9578eed 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/AccountProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/AccountProcessor.java
@@ -15,7 +15,7 @@
  */
 package com.ning.billing.payment.core;
 
-import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR;
+import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
 
 import java.util.concurrent.ExecutorService;
 
@@ -42,7 +42,7 @@ public class AccountProcessor extends ProcessorBase {
             final AccountUserApi accountUserApi,
             final Bus eventBus,
             final GlobalLocker locker,
-            @Named(PLUGIN_EXECUTOR) final ExecutorService executor) {
+            @Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor) {
         super(pluginRegistry, accountUserApi, eventBus, locker, executor);        
     }
     
diff --git a/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java
index 1a5f630..a10e226 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java
@@ -15,7 +15,7 @@
  */
 package com.ning.billing.payment.core;
 
-import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR;
+import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -41,7 +41,7 @@ public class PaymentMethodProcessor extends ProcessorBase {
             final AccountUserApi accountUserApi,
             final Bus eventBus,
             final GlobalLocker locker,
-            @Named(PLUGIN_EXECUTOR)  final ExecutorService executor) {
+            @Named(PLUGIN_EXECUTOR_NAMED)  final ExecutorService executor) {
         super(pluginRegistry, accountUserApi, eventBus, locker, executor);
     }
     
diff --git a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
index 5791a33..229850f 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
@@ -15,7 +15,7 @@
  */
 package com.ning.billing.payment.core;
 
-import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR;
+import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
 
 import java.math.BigDecimal;
 import java.math.RoundingMode;
@@ -23,13 +23,15 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
 
 import javax.inject.Inject;
 
-import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
 import com.google.inject.name.Named;
 import com.ning.billing.ErrorCode;
 import com.ning.billing.account.api.Account;
@@ -51,9 +53,9 @@ import com.ning.billing.payment.dispatcher.PluginDispatcher;
 import com.ning.billing.payment.plugin.api.PaymentInfoPlugin;
 import com.ning.billing.payment.plugin.api.PaymentPluginApiException;
 import com.ning.billing.payment.plugin.api.PaymentProviderPlugin;
-import com.ning.billing.payment.plugin.api.PaymentInfoPlugin.PaymentPluginStatus;
 import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
 import com.ning.billing.payment.retry.FailedPaymentRetryService.FailedPaymentRetryServiceScheduler;
+import com.ning.billing.payment.retry.PluginFailureRetryService.PluginFailureRetryServiceScheduler;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.bus.BusEvent;
 import com.ning.billing.util.callcontext.CallContext;
@@ -66,7 +68,8 @@ import com.ning.billing.util.globallocker.GlobalLocker;
 public class PaymentProcessor extends ProcessorBase {
     
     private final InvoicePaymentApi invoicePaymentApi;
-    private final FailedPaymentRetryServiceScheduler retryService;
+    private final FailedPaymentRetryServiceScheduler failedPaymentRetryService;
+    private final PluginFailureRetryServiceScheduler pluginFailureRetryService;
     private final PaymentConfig config;
     private final PaymentDao paymentDao;
     private final CallContextFactory factory;
@@ -81,17 +84,19 @@ public class PaymentProcessor extends ProcessorBase {
     public PaymentProcessor(final PaymentProviderPluginRegistry pluginRegistry,
             final AccountUserApi accountUserApi,
             final InvoicePaymentApi invoicePaymentApi,
-            final FailedPaymentRetryServiceScheduler retryService,
+            final FailedPaymentRetryServiceScheduler failedPaymentRetryService,
+            final PluginFailureRetryServiceScheduler pluginFailureRetryService,
             final PaymentDao paymentDao,
             final PaymentConfig config,
             final Bus eventBus,
             final Clock clock,
             final GlobalLocker locker,
-            @Named(PLUGIN_EXECUTOR) final ExecutorService executor,            
+            @Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor,            
             final CallContextFactory factory) {
         super(pluginRegistry, accountUserApi, eventBus, locker, executor);
         this.invoicePaymentApi = invoicePaymentApi;
-        this.retryService = retryService;
+        this.failedPaymentRetryService = failedPaymentRetryService;
+        this.pluginFailureRetryService = pluginFailureRetryService;
         this.paymentDao = paymentDao;
         this.clock = clock;
         this.config = config;
@@ -135,24 +140,37 @@ public class PaymentProcessor extends ProcessorBase {
 
         final PaymentProviderPlugin plugin = getPaymentProviderPlugin(account);
 
-        return paymentPluginDispatcher.dispatchWithAccountLock(new WithAccountLockAndTimeout<Payment>(locker,
-                account.getExternalKey(),
-                new WithAccountLockCallback<Payment>() {
+        try {
+            return paymentPluginDispatcher.dispatchWithAccountLock(new CallableWithAccountLock<Payment>(locker,
+                    account.getExternalKey(),
+                    new WithAccountLockCallback<Payment>() {
 
-            @Override
-            public Payment doOperation() throws PaymentApiException {
-                final Invoice invoice = invoicePaymentApi.getInvoice(invoiceId);
+                @Override
+                public Payment doOperation() throws PaymentApiException {
+                    final Invoice invoice = invoicePaymentApi.getInvoice(invoiceId);
 
-                if (invoice.isMigrationInvoice()) {
-                    log.error("Received invoice for payment that is a migration invoice - don't know how to handle those yet: {}", invoice);
-                    return null;
-                }
+                    if (invoice.isMigrationInvoice()) {
+                        log.error("Received invoice for payment that is a migration invoice - don't know how to handle those yet: {}", invoice);
+                        return null;
+                    }
 
-                BigDecimal requestedAmount = getAndValidatePaymentAmount(invoice, inputAmount, isInstantPayment);
-                return processNewPaymentWithAccountLocked(plugin, account, invoice, requestedAmount, isInstantPayment, context);
+                    BigDecimal requestedAmount = getAndValidatePaymentAmount(invoice, inputAmount, isInstantPayment);
+                    return processNewPaymentWithAccountLocked(plugin, account, invoice, requestedAmount, isInstantPayment, context);
+                }
+            }));
+        } catch (TimeoutException e) {
+            if (isInstantPayment) {
+                throw new PaymentApiException(ErrorCode.PAYMENT_PLUGIN_TIMEOUT, account.getId(), invoiceId);
+            } else {
+                // If we don't crash, plugin thread will complete (and set the correct status)
+                // If we crash before plugin thread completes, we may end up with a UNKNOWN Payment
+                // We would like to return an error so the Bus can retry but limited bu Guava bug
+                return null;
             }
-        }));
+        }
     }
+    
+
 
     private BigDecimal getAndValidatePaymentAmount(final Invoice invoice,  final BigDecimal inputAmount, final boolean isInstantPayment)
     throws PaymentApiException {
@@ -170,30 +188,55 @@ public class PaymentProcessor extends ProcessorBase {
     }
 
 
+    public void retryPluginFailure(final UUID paymentId) {
+        retryFailedPaymentInternal(paymentId, PaymentStatus.PLUGIN_FAILURE, PaymentStatus.TIMEDOUT);
+    }
+    
     public void retryFailedPayment(final UUID paymentId) {
+        retryFailedPaymentInternal(paymentId, PaymentStatus.PAYMENT_FAILURE);    
+    }
+    
+    private void retryFailedPaymentInternal(final UUID paymentId, final PaymentStatus...expectedPaymentStates) {
 
         try {
-            final PaymentModelDao payment = paymentDao.getPayment(paymentId);
+            
+            PaymentModelDao payment = paymentDao.getPayment(paymentId);
             if (payment == null) {
                 log.error("Invalid retry for non existnt paymentId {}", paymentId);
                 return;
             }
-            
+
             final Account account = accountUserApi.getAccountById(payment.getAccountId());
             final PaymentProviderPlugin plugin = getPaymentProviderPlugin(account);
             final CallContext context = factory.createCallContext("PaymentRetry", CallOrigin.INTERNAL, UserType.SYSTEM);
             
-            voidPluginDispatcher.dispatchWithAccountLock(new WithAccountLockAndTimeout<Void>(locker,
+            voidPluginDispatcher.dispatchWithAccountLock(new CallableWithAccountLock<Void>(locker,
                     account.getExternalKey(),
                     new WithAccountLockCallback<Void>() {
 
                 @Override
                 public Void doOperation() throws PaymentApiException {
+
+                    // Fetch gain with account lock this time
+                    PaymentModelDao payment = paymentDao.getPayment(paymentId);
+                    boolean foundExpectedState = false;
+                    for (PaymentStatus cur : expectedPaymentStates) {
+                        if (payment.getPaymentStatus() == cur) {
+                            foundExpectedState = true;
+                            break;
+                        }
+                    }
+                    if (!foundExpectedState) {
+                        log.info("Aborted retry for payment {} because it is {} state", paymentId, payment.getPaymentStatus());
+                        return null;
+                    }
+
                     final Invoice invoice = invoicePaymentApi.getInvoice(payment.getInvoiceId());
                     if (invoice.isMigrationInvoice()) {
                         return null;
                     }
                     if (invoice.getBalance().compareTo(BigDecimal.ZERO) <= 0 ) {
+                        log.info("Aborted retry for payment {} because invoice has been paid", paymentId);
                         return null;
                     }
                     processRetryPaymentWithAccountLocked(plugin, account, invoice, payment, invoice.getBalance(), context);
@@ -201,35 +244,35 @@ public class PaymentProcessor extends ProcessorBase {
 
                 }
             }));
-
         } catch (AccountApiException e) {
             log.error(String.format("Failed to retry payment for paymentId %s", paymentId), e);
         } catch (PaymentApiException e) {
             log.info(String.format("Failed to retry payment for paymentId %s", paymentId));
+        } catch (TimeoutException e) {
+            // STEPH we should throw some exception so NotificationQ does not clear status and retries us
         }
     }
 
-   
-
     private Payment processNewPaymentWithAccountLocked(PaymentProviderPlugin plugin, Account account, Invoice invoice,
             BigDecimal requestedAmount, boolean isInstantPayment, CallContext context) throws PaymentApiException {
         
+        final boolean scheduleRetryForPayment = !isInstantPayment;
         PaymentModelDao payment = new PaymentModelDao(account.getId(), invoice.getId(), requestedAmount.setScale(2, RoundingMode.HALF_EVEN), invoice.getCurrency(), invoice.getTargetDate());
         PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(account.getId(), invoice.getId(), payment.getId(), clock.getUTCNow(), requestedAmount);
-            
-        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, context);
+        
+        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, scheduleRetryForPayment, context);
         return processPaymentWithAccountLocked(plugin, account, invoice, savedPayment, attempt, isInstantPayment, context);
-
     }
     
     private Payment processRetryPaymentWithAccountLocked(PaymentProviderPlugin plugin, Account account, Invoice invoice, PaymentModelDao payment,
             BigDecimal requestedAmount, CallContext context) throws PaymentApiException {
-        
+        final boolean scheduleRetryForPayment = true;
         PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(account.getId(), invoice.getId(), payment.getId(), clock.getUTCNow(), requestedAmount);
-        paymentDao.insertNewAttemptForPayment(payment.getId(), attempt, context);
+        paymentDao.insertNewAttemptForPayment(payment.getId(), attempt, scheduleRetryForPayment, context);
         return processPaymentWithAccountLocked(plugin, account, invoice, payment, attempt, false, context);
     }
 
+    
     private Payment processPaymentWithAccountLocked(PaymentProviderPlugin plugin, Account account, Invoice invoice,
             PaymentModelDao paymentInput, PaymentAttemptModelDao attemptInput, boolean isInstantPayment, CallContext context) throws PaymentApiException {
         
@@ -237,74 +280,100 @@ public class PaymentProcessor extends ProcessorBase {
         List<PaymentAttemptModelDao> allAttempts = null;
         PaymentAttemptModelDao lastAttempt = null;
         PaymentModelDao payment = null;
+        PaymentStatus paymentStatus = PaymentStatus.UNKNOWN;
         try {
-            
-            PaymentInfoPlugin paymentPluginInfo =  plugin.processPayment(account.getExternalKey(), paymentInput.getId(), attemptInput.getRequestedAmount());
-            
-            // STEPH check if plugin returns UNKNOWN (exception from plugin)
-            // Does plugin throws or returns ERROR?
-            PaymentStatus paymentStatus = paymentPluginInfo.getStatus() ==  PaymentPluginStatus.ERROR ? PaymentStatus.ERROR : PaymentStatus.SUCCESS;
-            
-            paymentDao.updateStatusForPaymentWithAttempt(paymentInput.getId(), paymentStatus, paymentPluginInfo.getError(), attemptInput.getId(), context);
 
-            allAttempts = paymentDao.getAttemptsForPayment(paymentInput.getId());
-            lastAttempt = allAttempts.get(allAttempts.size() - 1);
-            payment = paymentDao.getPayment(paymentInput.getId());
-            
-            invoicePaymentApi.notifyOfPaymentAttempt(invoice.getId(),
-                    paymentStatus == PaymentStatus.SUCCESS ? payment.getAmount() : null,
-                    paymentStatus == PaymentStatus.SUCCESS ? payment.getCurrency() : null,
-                    lastAttempt.getId(),
-                    lastAttempt.getEffectiveDate(),
-                    context);
-            
-            event = new DefaultPaymentInfoEvent(account.getId(),
-                    invoice.getId(), payment.getId(), payment.getAmount(), payment.getPaymentNumber(), paymentStatus, context.getUserToken(), payment.getEffectiveDate());
-      
-        } catch (PaymentPluginApiException e) {
-
-            final PaymentStatus errorStatus = isInstantPayment ? PaymentStatus.ABORTED : PaymentStatus.ERROR;
-            
-            paymentDao.updateStatusForPaymentWithAttempt(paymentInput.getId(), errorStatus, e.getMessage(), attemptInput.getId(), context);
-
-            allAttempts = paymentDao.getAttemptsForPayment(paymentInput.getId());
-            lastAttempt = allAttempts.get(allAttempts.size() - 1);
+            PaymentInfoPlugin paymentPluginInfo = plugin.processPayment(account.getExternalKey(), paymentInput.getId(), attemptInput.getRequestedAmount());
+            switch (paymentPluginInfo.getStatus()) {
+            case PROCESSED:
+                // Update Payment/PaymentAttempt status
+                paymentStatus = PaymentStatus.SUCCESS;
+                paymentDao.updateStatusForPaymentWithAttempt(paymentInput.getId(), paymentStatus, null, attemptInput.getId(), context);
+
+                // Fetch latest objects
+                allAttempts = paymentDao.getAttemptsForPayment(paymentInput.getId());
+                lastAttempt = allAttempts.get(allAttempts.size() - 1);
+                payment = paymentDao.getPayment(paymentInput.getId());
+                
+                // STEPH should we notify in failure case scenario as well?
+                invoicePaymentApi.notifyOfPaymentAttempt(invoice.getId(),
+                        paymentStatus == PaymentStatus.SUCCESS ? payment.getAmount() : null,
+                        paymentStatus == PaymentStatus.SUCCESS ? payment.getCurrency() : null,
+                        lastAttempt.getId(),
+                        lastAttempt.getEffectiveDate(),
+                        context);
+                
+                // Create Bus event
+                event = new DefaultPaymentInfoEvent(account.getId(),
+                        invoice.getId(), payment.getId(), payment.getAmount(), payment.getPaymentNumber(), paymentStatus, context.getUserToken(), payment.getEffectiveDate());
+                break;
+                
+            case ERROR:
+                // Schedule if non instant payment and max attempt for retry not reached yet
+                if (!isInstantPayment) {
+                    allAttempts = paymentDao.getAttemptsForPayment(paymentInput.getId());
+                    final int retryAttempt = getNumberAttemptsInState(paymentInput.getId(), allAttempts,
+                            PaymentStatus.UNKNOWN, PaymentStatus.PAYMENT_FAILURE);
+                    final boolean isScheduledForRetry = failedPaymentRetryService.scheduleRetry(paymentInput.getId(), retryAttempt);
+                    paymentStatus = isScheduledForRetry ? PaymentStatus.PAYMENT_FAILURE : PaymentStatus.PAYMENT_FAILURE_ABORTED; 
+                } else {
+                    paymentStatus = PaymentStatus.PAYMENT_FAILURE_ABORTED;
+                }
 
-            log.info(String.format("Could not process payment for account %s, invoice %s, error = %s",
-                    account.getId(), invoice.getId(), e.getMessage()));
-            
-            if (!isInstantPayment) {
-                scheduleRetry(paymentInput.getId(), lastAttempt.getEffectiveDate(), allAttempts.size(), context);
+                paymentDao.updateStatusForPaymentWithAttempt(paymentInput.getId(), paymentStatus, paymentPluginInfo.getGatewayError(), attemptInput.getId(), context);
+
+                log.info(String.format("Could not process payment for account %s, invoice %s, error = %s",
+                        account.getId(), invoice.getId(), paymentPluginInfo.getGatewayError()));
+                
+                event = new DefaultPaymentErrorEvent(account.getId(), invoice.getId(), paymentInput.getId(), paymentPluginInfo.getGatewayError(), context.getUserToken());                        
+                throw new PaymentApiException(ErrorCode.PAYMENT_CREATE_PAYMENT, account.getId(), paymentPluginInfo.getGatewayError());
+                
+            default:
+                String formatError = String.format("Plugin return status %s for payment %s", paymentPluginInfo.getStatus(), paymentInput.getId());
+                // This caught right below as a retryable Plugin failure
+                throw new PaymentPluginApiException("", formatError);
             }
             
-            event = new DefaultPaymentErrorEvent(account.getId(), invoice.getId(), paymentInput.getId(), e.getErrorMessage(), context.getUserToken());                        
-            throw new PaymentApiException(e, ErrorCode.PAYMENT_CREATE_PAYMENT, account.getId(), e.getMessage());
-
+        } catch (PaymentPluginApiException e) {
+            //
+            // An exception occurred, we are left in an unknown state, we need to schedule a retry
+            //
+            paymentStatus = isInstantPayment ? PaymentStatus.PAYMENT_FAILURE_ABORTED : scheduleRetryOnPluginFailure(paymentInput.getId());  
+            // STEPH message might need truncation to fit??
+            paymentDao.updateStatusForPaymentWithAttempt(paymentInput.getId(), paymentStatus, e.getMessage(), attemptInput.getId(), context);
+
+            throw new PaymentApiException(ErrorCode.PAYMENT_CREATE_PAYMENT, account.getId(), e.getMessage());
+            
         } finally {
-            postPaymentEvent(event, account.getId());
+            if (event != null) {
+                postPaymentEvent(event, account.getId());
+            }
         }
         return new DefaultPayment(payment, allAttempts);
     }
-
-    private void scheduleRetry(final UUID paymentId, final DateTime lastAttemptDate, final int numberAttempts, final CallContext context) {
-
-        final List<Integer> retryDays = config.getPaymentRetryDays();
-        int retryCount = numberAttempts - 1;
-        if (retryCount < retryDays.size()) {
-            int retryInDays = 0;
-            DateTime nextRetryDate = clock.getUTCNow();
-            try {
-                retryInDays = retryDays.get(retryCount);
-                nextRetryDate = nextRetryDate.plusDays(retryInDays);
-                retryService.scheduleRetry(paymentId, nextRetryDate);
-            } catch (NumberFormatException ex) {
-                log.error("Could not get retry day for retry count {}", retryCount);
-            }
-        } else if (retryCount == retryDays.size()) {
-            log.info("Last payment retry failed for {} ", paymentId);
-            paymentDao.updateStatusForPayment(paymentId, PaymentStatus.ABORTED, context);
-        } else {
-            log.error("Cannot update payment retry information because retry count is invalid {} ", retryCount);
+    
+    private PaymentStatus scheduleRetryOnPluginFailure(UUID paymentId) {
+        List<PaymentAttemptModelDao> allAttempts = paymentDao.getAttemptsForPayment(paymentId);
+        // STEPH unknown only?
+        final int retryAttempt = getNumberAttemptsInState(paymentId, allAttempts, PaymentStatus.UNKNOWN);
+        final boolean isScheduledForRetry = pluginFailureRetryService.scheduleRetry(paymentId, retryAttempt);
+        return isScheduledForRetry ? PaymentStatus.PLUGIN_FAILURE : PaymentStatus.PLUGIN_FAILURE_ABORTED; 
+    }
+    
+    private int getNumberAttemptsInState(final UUID paymentId, final List<PaymentAttemptModelDao> allAttempts, final PaymentStatus...statuses) {
+        if (allAttempts == null || allAttempts.size() == 0) {
+            return 0;
         }
+        return Collections2.filter(allAttempts, new Predicate<PaymentAttemptModelDao>() {
+            @Override
+            public boolean apply(PaymentAttemptModelDao input) {
+                for (PaymentStatus cur : statuses) {
+                    if (input.getPaymentStatus() == cur) {
+                        return true;
+                    }
+                }
+                return false;
+            }
+        }).size();
     }
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java b/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
index f342094..080dc60 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
@@ -107,13 +107,13 @@ public abstract class ProcessorBase {
     
     
 
-    public static class WithAccountLockAndTimeout<T> implements Callable<T> {
+    public static class CallableWithAccountLock<T> implements Callable<T> {
         
         private final GlobalLocker locker;
         private final String accountExternalKey;
         private final WithAccountLockCallback<T> callback;
         
-        public WithAccountLockAndTimeout(final GlobalLocker locker,
+        public CallableWithAccountLock(final GlobalLocker locker,
                 final String accountExternalKey,
                 final WithAccountLockCallback<T> callback) {
             this.locker = locker;
@@ -127,6 +127,7 @@ public abstract class ProcessorBase {
         }
     }
     
+    // STEPH might not need that anymore
     public static class WithAccountLock<T> {
         
         public T processAccountWithLock(final GlobalLocker locker, final String accountExternalKey, final WithAccountLockCallback<T> callback)
diff --git a/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java
index 1831de3..a651839 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java
@@ -15,7 +15,7 @@
  */
 package com.ning.billing.payment.core;
 
-import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR;
+import static com.ning.billing.payment.glue.PaymentModule.PLUGIN_EXECUTOR_NAMED;
 
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -39,7 +39,7 @@ public class RefundProcessor extends ProcessorBase {
             final AccountUserApi accountUserApi,
             final Bus eventBus,
             final GlobalLocker locker,
-            @Named(PLUGIN_EXECUTOR)  final ExecutorService executor) {
+            @Named(PLUGIN_EXECUTOR_NAMED)  final ExecutorService executor) {
         super(pluginRegistry, accountUserApi, eventBus, locker, executor);        
     }
     
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java b/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java
index da3ae01..6ff1ae8 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/AuditedPaymentDao.java
@@ -23,8 +23,11 @@ import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
 import com.google.inject.Inject;
 import com.ning.billing.payment.api.PaymentStatus;
+import com.ning.billing.payment.retry.PluginFailureRetryService.PluginFailureRetryServiceScheduler;
 import com.ning.billing.util.ChangeType;
 import com.ning.billing.util.callcontext.CallContext;
 import com.ning.billing.util.dao.EntityAudit;
@@ -36,19 +39,21 @@ public class AuditedPaymentDao implements PaymentDao {
     private final PaymentSqlDao paymentSqlDao;
     private final PaymentAttemptSqlDao paymentAttemptSqlDao;
     private final PaymentMethodSqlDao paymentMethodSqlDao;    
-
+    //private final TimedoutPaymentRetryServiceScheduler timedoutSchduler;
+    
     @Inject
-    public AuditedPaymentDao(IDBI dbi) {
+    public AuditedPaymentDao(IDBI dbi, PluginFailureRetryServiceScheduler timedoutSchduler) {
        this.paymentSqlDao = dbi.onDemand(PaymentSqlDao.class);
        this.paymentAttemptSqlDao = dbi.onDemand(PaymentAttemptSqlDao.class);
        this.paymentMethodSqlDao = dbi.onDemand(PaymentMethodSqlDao.class);
+      // this.timedoutSchduler = timedoutSchduler;
     }
     
 
 
     @Override
     public PaymentAttemptModelDao insertNewAttemptForPayment(final UUID paymentId,
-            final PaymentAttemptModelDao attempt, final CallContext context) {
+            final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context) {
 
         return paymentAttemptSqlDao.inTransaction(new Transaction<PaymentAttemptModelDao, PaymentAttemptSqlDao>() {
             @Override
@@ -61,9 +66,10 @@ public class AuditedPaymentDao implements PaymentDao {
             }
         });
     }
-
+    
+    
     @Override
-    public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao payment, final PaymentAttemptModelDao attempt, final CallContext context) {
+    public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao payment, final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context) {
         
         return paymentSqlDao.inTransaction(new Transaction<PaymentModelDao, PaymentSqlDao>() {
 
@@ -77,6 +83,27 @@ public class AuditedPaymentDao implements PaymentDao {
             }
         });
     }
+
+    /*
+    private int getNbTimedoutAttemptsFromTransaction(final UUID paymentId, final PaymentAttemptSqlDao transactional) {
+        List<PaymentAttemptModelDao> attempts = transactional.getPaymentAttempts(paymentId.toString());
+        return Collections2.filter(attempts, new Predicate<PaymentAttemptModelDao>() {
+            @Override
+            public boolean apply(PaymentAttemptModelDao input) {
+                return input.getPaymentStatus() == PaymentStatus.TIMEDOUT;
+            }
+        }).size();
+    }
+
+    
+    private void scheduleTimeoutRetryFromTransaction(final UUID paymentId, final PaymentAttemptSqlDao transactional, final boolean scheduleTimeoutRetry) {
+
+        if (scheduleTimeoutRetry) { 
+            int retryAttempt = getNbTimedoutAttemptsFromTransaction(paymentId, transactional) + 1;
+            timedoutSchduler.scheduleRetryFromTransaction(paymentId, retryAttempt, transactional);
+        }
+    }
+*/
     
     
     private PaymentModelDao insertPaymentFromTransaction(final PaymentModelDao payment, final CallContext context, final PaymentSqlDao transactional) {
diff --git a/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java b/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
index a951454..efbb252 100644
--- a/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/com/ning/billing/payment/dao/PaymentDao.java
@@ -26,9 +26,9 @@ import com.ning.billing.util.callcontext.CallContext;
 public interface PaymentDao {
 
     // STEPH do we need object returned?
-    public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao paymentInfo, final PaymentAttemptModelDao attempt, final CallContext context);
+    public PaymentModelDao insertPaymentWithAttempt(final PaymentModelDao paymentInfo, final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context);
 
-    public PaymentAttemptModelDao insertNewAttemptForPayment(final UUID paymentId, final PaymentAttemptModelDao attempt, final CallContext context);
+    public PaymentAttemptModelDao insertNewAttemptForPayment(final UUID paymentId, final PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, final CallContext context);
 
 
     public void updateStatusForPayment(final UUID paymentId, final PaymentStatus paymentStatus, final CallContext context);    
diff --git a/payment/src/main/java/com/ning/billing/payment/dispatcher/PluginDispatcher.java b/payment/src/main/java/com/ning/billing/payment/dispatcher/PluginDispatcher.java
new file mode 100644
index 0000000..a439b19
--- /dev/null
+++ b/payment/src/main/java/com/ning/billing/payment/dispatcher/PluginDispatcher.java
@@ -0,0 +1,68 @@
+/* 
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.ning.billing.payment.dispatcher;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.ErrorCode;
+import com.ning.billing.payment.api.PaymentApiException;
+
+public class PluginDispatcher<T> {
+    
+    private static final Logger log = LoggerFactory.getLogger(PluginDispatcher.class);
+    
+    private final long DEFAULT_PLUGIN_TIMEOUT_SEC = 30;
+    private final TimeUnit DEEFAULT_PLUGIN_TIMEOUT_UNIT = TimeUnit.SECONDS;
+    
+    private final ExecutorService executor;
+    
+    public PluginDispatcher(ExecutorService executor) {
+        this.executor = executor;
+    }
+    
+    public T dispatchWithAccountLock(Callable<T> task) 
+        throws PaymentApiException, TimeoutException {
+        return dispatchWithAccountLockAndTimeout(task, DEFAULT_PLUGIN_TIMEOUT_SEC, DEEFAULT_PLUGIN_TIMEOUT_UNIT);
+    }
+    
+    public T dispatchWithAccountLockAndTimeout(Callable<T> task, long timeout, TimeUnit unit)
+    throws PaymentApiException, TimeoutException  {
+
+        try {
+            Future<T> future = executor.submit(task);
+            return future.get(timeout, unit);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof PaymentApiException) {
+                throw (PaymentApiException) e.getCause();
+            } else {
+                throw new PaymentApiException(ErrorCode.PAYMENT_INTERNAL_ERROR, e.getMessage());
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PaymentApiException(ErrorCode.PAYMENT_INTERNAL_ERROR, e.getMessage());
+        } 
+    }
+    
+  
+}
diff --git a/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java b/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
index 904697e..f6ef52d 100644
--- a/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
+++ b/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
@@ -26,7 +26,7 @@ import com.ning.billing.payment.InvoiceHandler;
 import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentService;
 import com.ning.billing.payment.retry.FailedPaymentRetryService;
-import com.ning.billing.payment.retry.TimedoutPaymentRetryService;
+import com.ning.billing.payment.retry.PluginFailureRetryService;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
@@ -42,11 +42,11 @@ public class DefaultPaymentService implements PaymentService {
     private final Bus eventBus;
     private final PaymentApi api;
     private final FailedPaymentRetryService failedRetryService;
-    private final TimedoutPaymentRetryService timedoutRetryService;
+    private final PluginFailureRetryService timedoutRetryService;
 
     @Inject
     public DefaultPaymentService(final InvoiceHandler requestProcessor, final PaymentApi api, final Bus eventBus,
-            final FailedPaymentRetryService failedRetryService, final TimedoutPaymentRetryService timedoutRetryService) {
+            final FailedPaymentRetryService failedRetryService, final PluginFailureRetryService timedoutRetryService) {
         this.requestProcessor = requestProcessor;
         this.eventBus = eventBus;
         this.api = api;
diff --git a/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java b/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java
index 0085b88..eee9302 100644
--- a/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/com/ning/billing/payment/glue/PaymentModule.java
@@ -40,13 +40,15 @@ import com.ning.billing.payment.provider.DefaultPaymentProviderPluginRegistry;
 import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
 import com.ning.billing.payment.retry.FailedPaymentRetryService;
 import com.ning.billing.payment.retry.FailedPaymentRetryService.FailedPaymentRetryServiceScheduler;
-import com.ning.billing.payment.retry.TimedoutPaymentRetryService;
+import com.ning.billing.payment.retry.PluginFailureRetryService;
+import com.ning.billing.payment.retry.PluginFailureRetryService.PluginFailureRetryServiceScheduler;
 
 public class PaymentModule extends AbstractModule {
     
     private final static int PLUGIN_NB_THREADS = 3;
     private final static String PLUGIN_THREAD_PREFIX = "Plugin-th-";
-    public final static String PLUGIN_EXECUTOR = "PluginExecutor";
+    
+    public final static String PLUGIN_EXECUTOR_NAMED = "PluginExecutor";
     
 
     private final Properties props;
@@ -68,8 +70,9 @@ public class PaymentModule extends AbstractModule {
 
     protected void installRetryEngines() {
         bind(FailedPaymentRetryService.class).asEagerSingleton();
-        bind(TimedoutPaymentRetryService.class).asEagerSingleton(); 
+        bind(PluginFailureRetryService.class).asEagerSingleton(); 
         bind(FailedPaymentRetryServiceScheduler.class).asEagerSingleton();
+        bind(PluginFailureRetryServiceScheduler.class).asEagerSingleton();        
     }
     
     
@@ -83,7 +86,7 @@ public class PaymentModule extends AbstractModule {
                 return th;
             }
         });
-        bind(ExecutorService.class).annotatedWith(Names.named(PLUGIN_EXECUTOR)).toInstance(pluginExecutorService);
+        bind(ExecutorService.class).annotatedWith(Names.named(PLUGIN_EXECUTOR_NAMED)).toInstance(pluginExecutorService);
         bind(AccountProcessor.class).asEagerSingleton();
         bind(PaymentProcessor.class).asEagerSingleton();
         bind(RefundProcessor.class).asEagerSingleton();
diff --git a/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java b/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java
index 98ca014..dcc24a8 100644
--- a/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java
+++ b/payment/src/main/java/com/ning/billing/payment/provider/NoOpPaymentProviderPlugin.java
@@ -65,8 +65,13 @@ public class NoOpPaymentProviderPlugin implements PaymentProviderPlugin {
             public PaymentPluginStatus getStatus() {
                 return PaymentPluginStatus.PROCESSED;
             }
+
+            @Override
+            public String getGatewayError() {
+                return null;
+            }
             @Override
-            public String getError() {
+            public String getGatewayErrorCode() {
                 return null;
             }
         };
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
index 5ea5125..e9600df 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
@@ -18,6 +18,7 @@ package com.ning.billing.payment.retry;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,8 +87,33 @@ public abstract class BaseRetryService implements RetryService {
             this.notificationQueueService = notificationQueueService;
         }
     
+        public boolean scheduleRetryFromTransaction(final UUID paymentId, final DateTime timeOfRetry, final Transmogrifier transactionalDao) {
+            return scheduleRetryInternal(paymentId, timeOfRetry, transactionalDao);
+        }
+        
+        public boolean scheduleRetry(final UUID paymentId, final DateTime timeOfRetry) {
+            return scheduleRetryInternal(paymentId, timeOfRetry, null);
+        }
         
-        public void scheduleRetry(final UUID paymentId, final DateTime timeOfRetry) {
+        // STEPH TimedoutPaymentRetryServiceScheduler
+        public void cancelAllScheduleRetryForKey(final UUID paymentId) {
+            /*
+            try {
+                NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, getQueueName());
+                NotificationKey key = new NotificationKey() {
+                    @Override
+                    public String toString() {
+                        return paymentId.toString();
+                    }
+                };
+                retryQueue.removeNotificationsByKey(key);
+            } catch (NoSuchNotificationQueue e) {
+                log.error(String.format("Failed to retrieve notification queue %s:%s", DefaultPaymentService.SERVICE_NAME, getQueueName()));
+            }
+            */
+        }
+        
+        private boolean scheduleRetryInternal(final UUID paymentId, final DateTime timeOfRetry, final Transmogrifier transactionalDao) {
 
             try {
                 NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, getQueueName());
@@ -98,11 +124,17 @@ public abstract class BaseRetryService implements RetryService {
                     }
                 };
                 if (retryQueue != null) {
-                    retryQueue.recordFutureNotification(timeOfRetry, key);
+                    if (transactionalDao == null) {
+                        retryQueue.recordFutureNotification(timeOfRetry, key);
+                    } else {
+                        retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, key);
+                    }
                 }
             } catch (NoSuchNotificationQueue e) {
                 log.error(String.format("Failed to retrieve notification queue %s:%s", DefaultPaymentService.SERVICE_NAME, getQueueName()));
+                return false;
             }
+            return true;
         }
         public abstract String getQueueName();
     }
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
index b6807e5..0abf68e 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
@@ -16,13 +16,13 @@
 
 package com.ning.billing.payment.retry;
 
+import java.util.List;
 import java.util.UUID;
 
-import com.ning.billing.util.callcontext.CallContext;
-import com.ning.billing.util.callcontext.CallOrigin;
-import com.ning.billing.util.callcontext.DefaultCallContext;
-import com.ning.billing.util.callcontext.UserType;
+
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,24 +30,15 @@ import org.slf4j.LoggerFactory;
 import com.google.inject.Inject;
 import com.ning.billing.account.api.AccountUserApi;
 import com.ning.billing.config.PaymentConfig;
-import com.ning.billing.payment.api.PaymentApiException;
 import com.ning.billing.payment.core.PaymentProcessor;
 import com.ning.billing.payment.dao.PaymentDao;
-import com.ning.billing.payment.glue.DefaultPaymentService;
 
 
-import com.ning.billing.util.notificationq.NotificationKey;
-import com.ning.billing.util.notificationq.NotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
-import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
-
 public class FailedPaymentRetryService extends BaseRetryService implements RetryService {
     
     private static final Logger log = LoggerFactory.getLogger(FailedPaymentRetryService.class);
     
-    public static final String QUEUE_NAME = "failed-retry";
+    public static final String QUEUE_NAME = "failed-payment";
 
     private final PaymentProcessor paymentProcessor;
 
@@ -73,10 +64,44 @@ public class FailedPaymentRetryService extends BaseRetryService implements Retry
     
     public static class FailedPaymentRetryServiceScheduler extends RetryServiceScheduler {
         
+        private final PaymentConfig config;
+        private final Clock clock;
+        
         @Inject
-        public FailedPaymentRetryServiceScheduler(final NotificationQueueService notificationQueueService) {
+        public FailedPaymentRetryServiceScheduler(final NotificationQueueService notificationQueueService,
+                final Clock clock, final PaymentConfig config) {
             super(notificationQueueService);
+            this.config = config;
+            this.clock = clock;
         }
+        
+        public boolean scheduleRetry(final UUID paymentId, final int retryAttempt) {
+            DateTime timeOfRetry = getNextRetryDate(retryAttempt);
+            if (timeOfRetry == null) {
+                return false;
+            }
+            return scheduleRetry(paymentId, timeOfRetry);
+        }
+        
+        
+        private DateTime getNextRetryDate(int retryAttempt) {
+
+            DateTime result = null;
+            final List<Integer> retryDays = config.getPaymentRetryDays();
+            int retryCount = retryAttempt - 1;
+            if (retryCount < retryDays.size()) {
+                int retryInDays = 0;
+                DateTime nextRetryDate = clock.getUTCNow();
+                try {
+                    retryInDays = retryDays.get(retryCount);
+                    result = nextRetryDate.plusDays(retryInDays);
+                } catch (NumberFormatException ex) {
+                    log.error("Could not get retry day for retry count {}", retryCount);
+                }
+            }
+            return result;            
+        }
+
 
         @Override
         public String getQueueName() {
diff --git a/payment/src/main/resources/com/ning/billing/payment/ddl.sql b/payment/src/main/resources/com/ning/billing/payment/ddl.sql
index c7b11e2..3ee5a80 100644
--- a/payment/src/main/resources/com/ning/billing/payment/ddl.sql
+++ b/payment/src/main/resources/com/ning/billing/payment/ddl.sql
@@ -9,7 +9,7 @@ CREATE TABLE payments (
     amount decimal(8,2),
     currency char(3),    
     effective_date datetime,
-    payment_status varchar(32),  
+    payment_status varchar(50),  
     created_by varchar(50) NOT NULL,
     created_date datetime NOT NULL,
     updated_by varchar(50) NOT NULL,
@@ -29,7 +29,7 @@ CREATE TABLE payment_history (
     amount decimal(8,2),
     currency char(3),    
     effective_date datetime,
-    payment_status varchar(32), 
+    payment_status varchar(50), 
     created_by varchar(50) NOT NULL,
     created_date datetime NOT NULL,
     updated_by varchar(50) NOT NULL,
@@ -45,7 +45,7 @@ CREATE TABLE payment_attempts (
     id char(36) NOT NULL,
     payment_id char(36) COLLATE utf8_bin NOT NULL,
     payment_error varchar(256),              
-    processing_status varchar(20),
+    processing_status varchar(50),
     requested_amount decimal(8,2),      
     created_by varchar(50) NOT NULL,
     created_date datetime NOT NULL,
@@ -63,7 +63,7 @@ CREATE TABLE payment_attempt_history (
     id char(36) NOT NULL,
     payment_id char(36) COLLATE utf8_bin NOT NULL,
     payment_error varchar(256),              
-    processing_status varchar(20),
+    processing_status varchar(50),
     requested_amount decimal(8,2),            
     created_by varchar(50) NOT NULL,
     created_date datetime NOT NULL,
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
index 7ab8b57..1760339 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/MockPaymentDao.java
@@ -31,7 +31,7 @@ public class MockPaymentDao implements PaymentDao {
     
     @Override
     public PaymentModelDao insertPaymentWithAttempt(PaymentModelDao paymentInfo, PaymentAttemptModelDao attempt,
-            CallContext context) {
+            final boolean scheduleTimeoutRetry, CallContext context) {
         synchronized(this) {
             payments.put(paymentInfo.getId(), paymentInfo);
             attempts.put(attempt.getId(), attempt);
@@ -41,7 +41,7 @@ public class MockPaymentDao implements PaymentDao {
 
     @Override
     public PaymentAttemptModelDao insertNewAttemptForPayment(UUID paymentId,
-            PaymentAttemptModelDao attempt, CallContext context) {
+            PaymentAttemptModelDao attempt, final boolean scheduleTimeoutRetry, CallContext context) {
         synchronized(this) {
             attempts.put(attempt.getId(), attempt);
         }
diff --git a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
index f81262a..99816db 100644
--- a/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/com/ning/billing/payment/dao/TestPaymentDao.java
@@ -66,7 +66,7 @@ public class TestPaymentDao {
         helper.initDb(paymentddl);
         helper.initDb(utilddl);
 
-        paymentDao = new AuditedPaymentDao(dbi);
+        paymentDao = new AuditedPaymentDao(dbi, null);
     }
     
     private void setupDb() {
@@ -103,7 +103,7 @@ public class TestPaymentDao {
         
         PaymentModelDao payment = new PaymentModelDao(accountId, invoiceId, amount, currency, effectiveDate);
         PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), amount);
-        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, context);
+        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, true, context);
         
         PaymentStatus paymentStatus = PaymentStatus.SUCCESS;
         String paymentError = "No error";
@@ -146,7 +146,7 @@ public class TestPaymentDao {
         PaymentModelDao payment = new PaymentModelDao(accountId, invoiceId, amount, currency, effectiveDate);
         PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), amount);
         
-        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt, context);
+        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, attempt,true, context);
         assertEquals(savedPayment.getId(), payment.getId());
         assertEquals(savedPayment.getAccountId(), accountId);        
         assertEquals(savedPayment.getInvoiceId(), invoiceId);        
@@ -197,11 +197,11 @@ public class TestPaymentDao {
         
         PaymentModelDao payment = new PaymentModelDao(accountId, invoiceId, amount, currency, effectiveDate);
         PaymentAttemptModelDao firstAttempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), amount);
-        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, firstAttempt, context);
+        PaymentModelDao savedPayment = paymentDao.insertPaymentWithAttempt(payment, firstAttempt, true,context);
         
         BigDecimal newAmount = new BigDecimal(15.23).setScale(2, RoundingMode.HALF_EVEN);
         PaymentAttemptModelDao secondAttempt = new PaymentAttemptModelDao(accountId, invoiceId, payment.getId(), clock.getUTCNow(), newAmount);        
-        paymentDao.insertNewAttemptForPayment(payment.getId(), secondAttempt, context);
+        paymentDao.insertNewAttemptForPayment(payment.getId(), secondAttempt, true, context);
         
         List<PaymentModelDao> payments = paymentDao.getPaymentsForInvoice(invoiceId);
         assertEquals(payments.size(), 1);
diff --git a/payment/src/test/java/com/ning/billing/payment/plugin/api/MockPaymentInfoPlugin.java b/payment/src/test/java/com/ning/billing/payment/plugin/api/MockPaymentInfoPlugin.java
index 425adc6..0da00fb 100644
--- a/payment/src/test/java/com/ning/billing/payment/plugin/api/MockPaymentInfoPlugin.java
+++ b/payment/src/test/java/com/ning/billing/payment/plugin/api/MockPaymentInfoPlugin.java
@@ -63,9 +63,13 @@ public class MockPaymentInfoPlugin implements PaymentInfoPlugin {
     public DateTime getCreatedDate() {
         return createdDate;
     }
-
     @Override
-    public String getError() {
+    public String getGatewayError() {
         return error;
     }
+
+    @Override
+    public String getGatewayErrorCode() {
+        return null;
+    }
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java b/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
index 9bad840..e83e370 100644
--- a/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
+++ b/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
@@ -49,8 +49,9 @@ import com.ning.billing.util.clock.Clock;
 
 public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
     
-    private final AtomicBoolean makeNextInvoiceFail = new AtomicBoolean(false);
-    private final AtomicBoolean makeAllInvoicesFail = new AtomicBoolean(false);
+    private final AtomicBoolean makeNextInvoiceFailWithError = new AtomicBoolean(false);
+    private final AtomicBoolean makeNextInvoiceFailWithException = new AtomicBoolean(false);
+    private final AtomicBoolean makeAllInvoicesFailWithException = new AtomicBoolean(false);
     private final Map<UUID, PaymentInfoPlugin> payments = new ConcurrentHashMap<UUID, PaymentInfoPlugin>();
     private final Map<String, PaymentProviderAccount> accounts = new ConcurrentHashMap<String, PaymentProviderAccount>();
     private final Map<String, PaymentMethodInfo> paymentMethods = new ConcurrentHashMap<String, PaymentMethodInfo>();
@@ -64,26 +65,32 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
     
     
     public void clear() {
-        makeNextInvoiceFail.set(false);
-        makeAllInvoicesFail.set(false);
+        makeNextInvoiceFailWithException.set(false);
+        makeAllInvoicesFailWithException.set(false);
+        makeNextInvoiceFailWithError.set(false);
     }
     
+    public void makeNextPaymentFailWithError() {
+        makeNextInvoiceFailWithError.set(true);
+    }
 
-    public void makeNextPaymentFail() {
-        makeNextInvoiceFail.set(true);
+    
+    public void makeNextPaymentFailWithException() {
+        makeNextInvoiceFailWithException.set(true);
     }
 
-    public void makeAllInvoicesFail(boolean failure) {
-        makeAllInvoicesFail.set(failure);
+    public void makeAllInvoicesFailWithException(boolean failure) {
+        makeAllInvoicesFailWithException.set(failure);
     }
 
     @Override
     public PaymentInfoPlugin processPayment(String externalKey, UUID paymentId, BigDecimal amount) throws PaymentPluginApiException {
-        if (makeNextInvoiceFail.getAndSet(false) || makeAllInvoicesFail.get()) {
+        if (makeNextInvoiceFailWithException.getAndSet(false) || makeAllInvoicesFailWithException.get()) {
             throw new PaymentPluginApiException("", "test error");
         }
 
-        PaymentInfoPlugin result = new MockPaymentInfoPlugin(amount, clock.getUTCNow(), clock.getUTCNow(), PaymentPluginStatus.PROCESSED, null);
+        PaymentPluginStatus status = makeNextInvoiceFailWithError.getAndSet(false) ? PaymentPluginStatus.ERROR : PaymentPluginStatus.PROCESSED;
+        PaymentInfoPlugin result = new MockPaymentInfoPlugin(amount, clock.getUTCNow(), clock.getUTCNow(), status, null);
         payments.put(paymentId, result);
         return result;
     }
diff --git a/payment/src/test/java/com/ning/billing/payment/TestRetryService.java b/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
index d7bd6c7..d083c9c 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
@@ -163,7 +163,7 @@ public class TestRetryService {
                                                        new BigDecimal("1.0"),
                                                        Currency.USD));
 
-        mockPaymentProviderPlugin.makeNextPaymentFail();
+        mockPaymentProviderPlugin.makeNextPaymentFailWithError();
         boolean failed = false;
         try {
             paymentProcessor.createPayment(account.getExternalKey(), invoice.getId(), amount, context, false);
@@ -178,7 +178,7 @@ public class TestRetryService {
         for (int curFailure = 0; curFailure < maxTries; curFailure++) {
 
             if (curFailure < maxTries - 1) {
-                mockPaymentProviderPlugin.makeNextPaymentFail();
+                mockPaymentProviderPlugin.makeNextPaymentFailWithError();
             }
 
             if (curFailure < paymentConfig.getPaymentRetryDays().size()) {
@@ -218,13 +218,13 @@ public class TestRetryService {
         for (int i = 0; i < attempts.size(); i++) {
             PaymentAttempt cur = attempts.get(i);
             if (i < attempts.size() - 1) {
-                assertEquals(cur.getPaymentStatus(), PaymentStatus.ERROR);
+                assertEquals(cur.getPaymentStatus(), PaymentStatus.PAYMENT_FAILURE);
             } else if (maxTries <= paymentConfig.getPaymentRetryDays().size()) {
                 assertEquals(cur.getPaymentStatus(), PaymentStatus.SUCCESS);
                 assertEquals(payment.getPaymentStatus(), PaymentStatus.SUCCESS);
             } else {
-                assertEquals(cur.getPaymentStatus(), PaymentStatus.ERROR);      
-                assertEquals(payment.getPaymentStatus(), PaymentStatus.ABORTED);                
+                assertEquals(cur.getPaymentStatus(), PaymentStatus.PAYMENT_FAILURE_ABORTED);      
+                assertEquals(payment.getPaymentStatus(), PaymentStatus.PAYMENT_FAILURE_ABORTED);                
             }
         }
     }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 6b50914..2c6bb62 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -122,8 +122,8 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     }
 
     @Override
-    public void removeNotificationsByKey(UUID key) {
-        dao.removeNotificationsByKey(key.toString());
+    public void removeNotificationsByKey(NotificationKey notificationKey) {
+        dao.removeNotificationsByKey(notificationKey.toString());
         
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 71735cd..a04ff45 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -52,7 +52,7 @@ public interface NotificationQueue extends QueueLifecycle {
      * 
      * @param key
      */
-    public void removeNotificationsByKey(UUID key);
+    public void removeNotificationsByKey(final NotificationKey notificationKey);
 
     /**
      * This is only valid when the queue has been configured with isNotificationProcessingOff is true
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 8e9cce4..fb1bfe7 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -113,7 +113,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    public void removeNotificationsByKey(UUID key) {
+    public void removeNotificationsByKey(NotificationKey key) {
         List<Notification> toClearNotifications = new ArrayList<Notification>();
         for (Notification notification : notifications) {
             if (notification.getNotificationKey().equals(key.toString())) {
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 50f0a68..5adac12 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -450,7 +450,7 @@ public class TestNotificationQueue {
         });
     
     
-      queue.removeNotificationsByKey(key); // should remove 2 of the 3
+      queue.removeNotificationsByKey(notificationKey); // should remove 2 of the 3
 
     // Move time in the future after the notification effectiveDate
         ((ClockMock) clock).setDeltaFromReality(4000000 + nextReadyTimeIncrementMs * 3 );