killbill-memoizeit

Add a payment janitor task to take care of incomplete attempt

7/10/2014 12:13:36 AM

Changes

Details

diff --git a/api/src/main/java/org/killbill/billing/invoice/api/InvoiceInternalApi.java b/api/src/main/java/org/killbill/billing/invoice/api/InvoiceInternalApi.java
index 729304b..cc1c061 100644
--- a/api/src/main/java/org/killbill/billing/invoice/api/InvoiceInternalApi.java
+++ b/api/src/main/java/org/killbill/billing/invoice/api/InvoiceInternalApi.java
@@ -46,6 +46,10 @@ public interface InvoiceInternalApi {
 
     public InvoicePayment getInvoicePaymentForAttempt(UUID paymentId, InternalTenantContext context) throws InvoiceApiException;
 
+    public InvoicePayment getInvoicePaymentForRefund(UUID paymentId, InternalTenantContext context) throws InvoiceApiException;
+
+    public InvoicePayment getInvoicePaymentForChargeback(UUID paymentId, InternalTenantContext context) throws InvoiceApiException;
+
     public Invoice getInvoiceForPaymentId(UUID paymentId, InternalTenantContext context) throws InvoiceApiException;
 
     /**
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/api/svcs/DefaultInvoiceInternalApi.java b/invoice/src/main/java/org/killbill/billing/invoice/api/svcs/DefaultInvoiceInternalApi.java
index 8b1aa7b..6d1330b 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/api/svcs/DefaultInvoiceInternalApi.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/api/svcs/DefaultInvoiceInternalApi.java
@@ -56,6 +56,7 @@ import org.killbill.billing.util.timezone.DateAndTimeZoneContext;
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
 
 public class DefaultInvoiceInternalApi implements InvoiceInternalApi {
 
@@ -109,21 +110,17 @@ public class DefaultInvoiceInternalApi implements InvoiceInternalApi {
 
     @Override
     public InvoicePayment getInvoicePaymentForAttempt(final UUID paymentId, final InternalTenantContext context) throws InvoiceApiException {
-        final Collection<InvoicePayment> invoicePayments = Collections2.transform(dao.getInvoicePayments(paymentId, context), new Function<InvoicePaymentModelDao, InvoicePayment>() {
-            @Override
-            public InvoicePayment apply(final InvoicePaymentModelDao input) {
-                return new DefaultInvoicePayment(input);
-            }
-        });
-        if (invoicePayments.size() == 0) {
-            return null;
-        }
-        return Collections2.filter(invoicePayments, new Predicate<InvoicePayment>() {
-            @Override
-            public boolean apply(final InvoicePayment input) {
-                return input.getType() == InvoicePaymentType.ATTEMPT;
-            }
-        }).iterator().next();
+        return getInvoicePayment(paymentId, InvoicePaymentType.ATTEMPT, context);
+    }
+
+    @Override
+    public InvoicePayment getInvoicePaymentForRefund(final UUID paymentId, final InternalTenantContext context) throws InvoiceApiException {
+        return getInvoicePayment(paymentId, InvoicePaymentType.REFUND, context);
+    }
+
+    @Override
+    public InvoicePayment getInvoicePaymentForChargeback(final UUID paymentId, final InternalTenantContext context) throws InvoiceApiException {
+        return getInvoicePayment(paymentId, InvoicePaymentType.CHARGED_BACK, context);
     }
 
     @Override
@@ -151,4 +148,21 @@ public class DefaultInvoiceInternalApi implements InvoiceInternalApi {
         dao.consumeExstingCBAOnAccountWithUnpaidInvoices(accountId, context);
     }
 
+    private InvoicePayment getInvoicePayment(final UUID paymentId, final InvoicePaymentType type, final InternalTenantContext context) throws InvoiceApiException {
+        final Collection<InvoicePayment> invoicePayments = Collections2.transform(dao.getInvoicePayments(paymentId, context), new Function<InvoicePaymentModelDao, InvoicePayment>() {
+            @Override
+            public InvoicePayment apply(final InvoicePaymentModelDao input) {
+                return new DefaultInvoicePayment(input);
+            }
+        });
+        if (invoicePayments.size() == 0) {
+            return null;
+        }
+        return Iterables.tryFind(invoicePayments, new Predicate<InvoicePayment>() {
+            @Override
+            public boolean apply(final InvoicePayment input) {
+                return input.getType() == type;
+            }
+        }).orNull();
+    }
 }
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/org/killbill/billing/invoice/dao/DefaultInvoiceDao.java
index 784a39a..20a220c 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/dao/DefaultInvoiceDao.java
@@ -566,7 +566,17 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
             @Override
             public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
-                entitySqlDaoWrapperFactory.become(InvoicePaymentSqlDao.class).create(invoicePayment, context);
+                final InvoicePaymentSqlDao transactional = entitySqlDaoWrapperFactory.become(InvoicePaymentSqlDao.class);
+                final List<InvoicePaymentModelDao> invoicePayments = transactional.getInvoicePayments(invoicePayment.getPaymentId().toString(), context);
+                final InvoicePaymentModelDao existingAttempt = Iterables.tryFind(invoicePayments, new Predicate<InvoicePaymentModelDao>() {
+                    @Override
+                    public boolean apply(final InvoicePaymentModelDao input) {
+                        return input.getType() == InvoicePaymentType.ATTEMPT;
+                    }
+                }).orNull();
+                if (existingAttempt == null) {
+                    transactional.create(invoicePayment, context);
+                }
                 return null;
             }
         });
diff --git a/invoice/src/main/resources/org/killbill/billing/invoice/ddl.sql b/invoice/src/main/resources/org/killbill/billing/invoice/ddl.sql
index 6a08dc1..ed6f4b8 100644
--- a/invoice/src/main/resources/org/killbill/billing/invoice/ddl.sql
+++ b/invoice/src/main/resources/org/killbill/billing/invoice/ddl.sql
@@ -70,7 +70,7 @@ CREATE TABLE invoice_payments (
     PRIMARY KEY(record_id)
 ) /*! CHARACTER SET utf8 COLLATE utf8_bin */;
 CREATE UNIQUE INDEX invoice_payments_id ON invoice_payments(id);
-CREATE INDEX invoice_payments ON invoice_payments(payment_id);
+CREATE UNIQUE INDEX invoice_payments ON invoice_payments(payment_id, type);
 CREATE INDEX invoice_payments_invoice_id ON invoice_payments(invoice_id);
 CREATE INDEX invoice_payments_reversals ON invoice_payments(linked_invoice_payment_id);
 CREATE INDEX invoice_payments_tenant_account_record_id ON invoice_payments(tenant_record_id, account_record_id);
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultDirectPaymentApi.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultDirectPaymentApi.java
index a79efe9..2d7cd3c 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultDirectPaymentApi.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultDirectPaymentApi.java
@@ -67,7 +67,7 @@ public class DefaultDirectPaymentApi implements DirectPaymentApi {
         checkPositiveAmount(amount);
 
         final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
-        return directPaymentProcessor.createAuthorization(IS_API_PAYMENT, account, paymentMethodId, directPaymentId, amount, currency, directPaymentExternalKey, directPaymentTransactionExternalKey,
+        return directPaymentProcessor.createAuthorization(IS_API_PAYMENT, null, account, paymentMethodId, directPaymentId, amount, currency, directPaymentExternalKey, directPaymentTransactionExternalKey,
                                                           SHOULD_LOCK_ACCOUNT, properties, callContext, internalCallContext);
     }
 
@@ -82,7 +82,7 @@ public class DefaultDirectPaymentApi implements DirectPaymentApi {
         checkPositiveAmount(amount);
 
         final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
-        return directPaymentProcessor.createCapture(IS_API_PAYMENT, account, directPaymentId, amount, currency, directPaymentTransactionExternalKey,
+        return directPaymentProcessor.createCapture(IS_API_PAYMENT, null, account, directPaymentId, amount, currency, directPaymentTransactionExternalKey,
                                                     SHOULD_LOCK_ACCOUNT, properties, callContext, internalCallContext);
     }
 
@@ -98,7 +98,7 @@ public class DefaultDirectPaymentApi implements DirectPaymentApi {
         checkPositiveAmount(amount);
 
         final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
-        return directPaymentProcessor.createPurchase(IS_API_PAYMENT, account, paymentMethodId, directPaymentId, amount, currency, directPaymentExternalKey, directPaymentTransactionExternalKey,
+        return directPaymentProcessor.createPurchase(IS_API_PAYMENT, null, account, paymentMethodId, directPaymentId, amount, currency, directPaymentExternalKey, directPaymentTransactionExternalKey,
                                                      SHOULD_LOCK_ACCOUNT, properties, callContext, internalCallContext);
     }
 
@@ -138,7 +138,7 @@ public class DefaultDirectPaymentApi implements DirectPaymentApi {
         checkNotNullParameter(properties, "plugin properties");
 
         final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
-        return directPaymentProcessor.createVoid(IS_API_PAYMENT, account, directPaymentId, directPaymentTransactionExternalKey,
+        return directPaymentProcessor.createVoid(IS_API_PAYMENT, null, account, directPaymentId, directPaymentTransactionExternalKey,
                                                  SHOULD_LOCK_ACCOUNT, properties, callContext, internalCallContext);
 
     }
@@ -155,7 +155,7 @@ public class DefaultDirectPaymentApi implements DirectPaymentApi {
         checkPositiveAmount(amount);
 
         final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
-        return directPaymentProcessor.createRefund(IS_API_PAYMENT, account, directPaymentId, amount, currency, directPaymentTransactionExternalKey,
+        return directPaymentProcessor.createRefund(IS_API_PAYMENT, null, account, directPaymentId, amount, currency, directPaymentTransactionExternalKey,
                                                    SHOULD_LOCK_ACCOUNT, properties, callContext, internalCallContext);
     }
 
@@ -192,7 +192,7 @@ public class DefaultDirectPaymentApi implements DirectPaymentApi {
         checkPositiveAmount(amount);
 
         final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
-        return directPaymentProcessor.createCredit(IS_API_PAYMENT, account, paymentMethodId, directPaymentId, amount, currency, directPaymentExternalKey, directPaymentTransactionExternalKey,
+        return directPaymentProcessor.createCredit(IS_API_PAYMENT, null, account, paymentMethodId, directPaymentId, amount, currency, directPaymentExternalKey, directPaymentTransactionExternalKey,
                                                    SHOULD_LOCK_ACCOUNT, properties, callContext, internalCallContext);
 
     }
@@ -222,7 +222,7 @@ public class DefaultDirectPaymentApi implements DirectPaymentApi {
         checkPositiveAmount(amount);
 
         final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
-        return directPaymentProcessor.createChargeback(IS_API_PAYMENT, account, directPaymentId, directPaymentTransactionExternalKey, amount, currency, true,
+        return directPaymentProcessor.createChargeback(IS_API_PAYMENT, null, account, directPaymentId, directPaymentTransactionExternalKey, amount, currency, true,
                                                        callContext, internalCallContext);
 
     }
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultDirectPaymentTransaction.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultDirectPaymentTransaction.java
index 151fc0e..b3b7118 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultDirectPaymentTransaction.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultDirectPaymentTransaction.java
@@ -38,11 +38,13 @@ public class DefaultDirectPaymentTransaction extends EntityBase implements Direc
     private final String gatewayErrorCode;
     private final String gatewayErrorMsg;
     private final PaymentTransactionInfoPlugin infoPlugin;
+    private final UUID attemptId;
 
-    public DefaultDirectPaymentTransaction(final UUID id, final String externalKey, final DateTime createdDate, final DateTime updatedDate, final UUID directPaymentId, final TransactionType transactionType,
+    public DefaultDirectPaymentTransaction(final UUID id, final UUID attemptId, final String externalKey, final DateTime createdDate, final DateTime updatedDate, final UUID directPaymentId, final TransactionType transactionType,
                                            final DateTime effectiveDate, final TransactionStatus status, final BigDecimal amount, final Currency currency, final BigDecimal processedAmount, final Currency processedCurrency,
                                            final String gatewayErrorCode, final String gatewayErrorMsg, final PaymentTransactionInfoPlugin infoPlugin) {
         super(id, createdDate, updatedDate);
+        this.attemptId = attemptId;
         this.externalKey = externalKey;
         this.directPaymentId = directPaymentId;
         this.transactionType = transactionType;
@@ -117,6 +119,10 @@ public class DefaultDirectPaymentTransaction extends EntityBase implements Direc
         return infoPlugin;
     }
 
+    public UUID getAttemptId() {
+        return attemptId;
+    }
+
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder("DefaultDirectPaymentTransaction{");
diff --git a/payment/src/main/java/org/killbill/billing/payment/control/InvoicePaymentControlPluginApi.java b/payment/src/main/java/org/killbill/billing/payment/control/InvoicePaymentControlPluginApi.java
index 8874dcc..0c621ac 100644
--- a/payment/src/main/java/org/killbill/billing/payment/control/InvoicePaymentControlPluginApi.java
+++ b/payment/src/main/java/org/killbill/billing/payment/control/InvoicePaymentControlPluginApi.java
@@ -36,6 +36,7 @@ import org.killbill.billing.invoice.api.Invoice;
 import org.killbill.billing.invoice.api.InvoiceApiException;
 import org.killbill.billing.invoice.api.InvoiceInternalApi;
 import org.killbill.billing.invoice.api.InvoiceItem;
+import org.killbill.billing.invoice.api.InvoicePayment;
 import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
@@ -139,27 +140,43 @@ public final class InvoicePaymentControlPluginApi implements PaymentControlPlugi
 
         final InternalCallContext internalContext = internalCallContextFactory.createInternalCallContext(paymentControlContext.getAccountId(), paymentControlContext);
         try {
+            final InvoicePayment existingInvoicePayment;
             switch (transactionType) {
                 case PURCHASE:
                     final UUID invoiceId = getInvoiceId(paymentControlContext);
-                    invoiceApi.notifyOfPayment(invoiceId,
-                                               paymentControlContext.getAmount(),
-                                               paymentControlContext.getCurrency(),
-                                               paymentControlContext.getProcessedCurrency(),
-                                               paymentControlContext.getPaymentId(),
-                                               paymentControlContext.getCreatedDate(),
-                                               internalContext);
+                    existingInvoicePayment = invoiceApi.getInvoicePaymentForAttempt(paymentControlContext.getPaymentId(), internalContext);
+                    if (existingInvoicePayment != null) {
+                        logger.info("onSuccessCall was already completed for payment purchase :" + paymentControlContext.getPaymentId());
+                    } else {
+                        invoiceApi.notifyOfPayment(invoiceId,
+                                                   paymentControlContext.getAmount(),
+                                                   paymentControlContext.getCurrency(),
+                                                   paymentControlContext.getProcessedCurrency(),
+                                                   paymentControlContext.getPaymentId(),
+                                                   paymentControlContext.getCreatedDate(),
+                                                   internalContext);
+                    }
                     break;
 
                 case REFUND:
-                    final Map<UUID, BigDecimal> idWithAmount = extractIdsWithAmountFromProperties(paymentControlContext.getPluginProperties());
-                    final PluginProperty prop = getPluginProperty(paymentControlContext.getPluginProperties(), PROP_IPCD_REFUND_WITH_ADJUSTMENTS);
-                    final boolean isAdjusted = prop != null ? Boolean.valueOf((String) prop.getValue()) : false;
-                    invoiceApi.createRefund(paymentControlContext.getPaymentId(), paymentControlContext.getAmount(), isAdjusted, idWithAmount, paymentControlContext.getTransactionExternalKey(), internalContext);
+                    existingInvoicePayment = invoiceApi.getInvoicePaymentForRefund(paymentControlContext.getPaymentId(), internalContext);
+                    if (existingInvoicePayment != null) {
+                        logger.info("onSuccessCall was already completed for payment refund :" + paymentControlContext.getPaymentId());
+                    } else {
+                        final Map<UUID, BigDecimal> idWithAmount = extractIdsWithAmountFromProperties(paymentControlContext.getPluginProperties());
+                        final PluginProperty prop = getPluginProperty(paymentControlContext.getPluginProperties(), PROP_IPCD_REFUND_WITH_ADJUSTMENTS);
+                        final boolean isAdjusted = prop != null ? Boolean.valueOf((String) prop.getValue()) : false;
+                        invoiceApi.createRefund(paymentControlContext.getPaymentId(), paymentControlContext.getAmount(), isAdjusted, idWithAmount, paymentControlContext.getTransactionExternalKey(), internalContext);
+                    }
                     break;
 
                 case CHARGEBACK:
-                    invoiceApi.createChargeback(paymentControlContext.getPaymentId(), paymentControlContext.getProcessedAmount(), paymentControlContext.getProcessedCurrency(), internalContext);
+                    existingInvoicePayment = invoiceApi.getInvoicePaymentForChargeback(paymentControlContext.getPaymentId(), internalContext);
+                    if (existingInvoicePayment != null) {
+                        logger.info("onSuccessCall was already completed for payment chargeback :" + paymentControlContext.getPaymentId());
+                    } else {
+                        invoiceApi.createChargeback(paymentControlContext.getPaymentId(), paymentControlContext.getProcessedAmount(), paymentControlContext.getProcessedCurrency(), internalContext);
+                    }
                     break;
 
                 default:
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/DirectPaymentProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/DirectPaymentProcessor.java
index fdde4e9..9f05a04 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/DirectPaymentProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/DirectPaymentProcessor.java
@@ -108,45 +108,45 @@ public class DirectPaymentProcessor extends ProcessorBase {
         this.directPaymentAutomatonRunner = directPaymentAutomatonRunner;
     }
 
-    public DirectPayment createAuthorization(final boolean isApiPayment, final Account account, @Nullable final UUID paymentMethodId, @Nullable final UUID directPaymentId, final BigDecimal amount, final Currency currency,
+    public DirectPayment createAuthorization(final boolean isApiPayment, @Nullable final UUID attemptId, final Account account, @Nullable final UUID paymentMethodId, @Nullable final UUID directPaymentId, final BigDecimal amount, final Currency currency,
                                              final String directPaymentExternalKey, final String directPaymentTransactionExternalKey, final boolean shouldLockAccountAndDispatch,
                                              final Iterable<PluginProperty> properties, final CallContext callContext, final InternalCallContext internalCallContext) throws PaymentApiException {
-        return performOperation(isApiPayment, TransactionType.AUTHORIZE, account, paymentMethodId, directPaymentId, amount, currency, directPaymentExternalKey, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, properties, callContext, internalCallContext);
+        return performOperation(isApiPayment, attemptId, TransactionType.AUTHORIZE, account, paymentMethodId, directPaymentId, amount, currency, directPaymentExternalKey, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, properties, callContext, internalCallContext);
     }
 
-    public DirectPayment createCapture(final boolean isApiPayment, final Account account, final UUID directPaymentId, final BigDecimal amount, final Currency currency,
+    public DirectPayment createCapture(final boolean isApiPayment, @Nullable final UUID attemptId, final Account account, final UUID directPaymentId, final BigDecimal amount, final Currency currency,
                                        final String directPaymentTransactionExternalKey, final boolean shouldLockAccountAndDispatch,
                                        final Iterable<PluginProperty> properties, final CallContext callContext, final InternalCallContext internalCallContext) throws PaymentApiException {
 
-        return performOperation(isApiPayment, TransactionType.CAPTURE, account, null, directPaymentId, amount, currency, null, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, properties, callContext, internalCallContext);
+        return performOperation(isApiPayment, attemptId, TransactionType.CAPTURE, account, null, directPaymentId, amount, currency, null, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, properties, callContext, internalCallContext);
     }
 
-    public DirectPayment createPurchase(final boolean isApiPayment, final Account account, @Nullable final UUID paymentMethodId, @Nullable final UUID directPaymentId, final BigDecimal amount, final Currency currency,
+    public DirectPayment createPurchase(final boolean isApiPayment, @Nullable final UUID attemptId, final Account account, @Nullable final UUID paymentMethodId, @Nullable final UUID directPaymentId, final BigDecimal amount, final Currency currency,
                                         final String directPaymentExternalKey, final String directPaymentTransactionExternalKey, final boolean shouldLockAccountAndDispatch,
                                         final Iterable<PluginProperty> properties, final CallContext callContext, final InternalCallContext internalCallContext) throws PaymentApiException {
-        return performOperation(isApiPayment, TransactionType.PURCHASE, account, paymentMethodId, directPaymentId, amount, currency, directPaymentExternalKey, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, properties, callContext, internalCallContext);
+        return performOperation(isApiPayment, attemptId, TransactionType.PURCHASE, account, paymentMethodId, directPaymentId, amount, currency, directPaymentExternalKey, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, properties, callContext, internalCallContext);
     }
 
-    public DirectPayment createVoid(final boolean isApiPayment, final Account account, final UUID directPaymentId, final String directPaymentTransactionExternalKey, final boolean shouldLockAccountAndDispatch,
+    public DirectPayment createVoid(final boolean isApiPayment, @Nullable final UUID attemptId, final Account account, final UUID directPaymentId, final String directPaymentTransactionExternalKey, final boolean shouldLockAccountAndDispatch,
                                     final Iterable<PluginProperty> properties, final CallContext callContext, final InternalCallContext internalCallContext) throws PaymentApiException {
-        return performOperation(isApiPayment, TransactionType.VOID, account, null, directPaymentId, null, null, null, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, properties, callContext, internalCallContext);
+        return performOperation(isApiPayment, attemptId, TransactionType.VOID, account, null, directPaymentId, null, null, null, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, properties, callContext, internalCallContext);
     }
 
-    public DirectPayment createRefund(final boolean isApiPayment, final Account account, final UUID directPaymentId, final BigDecimal amount, final Currency currency,
+    public DirectPayment createRefund(final boolean isApiPayment, @Nullable final UUID attemptId, final Account account, final UUID directPaymentId, final BigDecimal amount, final Currency currency,
                                       final String directPaymentTransactionExternalKey, final boolean shouldLockAccountAndDispatch,
                                       final Iterable<PluginProperty> properties, final CallContext callContext, final InternalCallContext internalCallContext) throws PaymentApiException {
-        return performOperation(isApiPayment, TransactionType.REFUND, account, null, directPaymentId, amount, currency, null, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, properties, callContext, internalCallContext);
+        return performOperation(isApiPayment, attemptId, TransactionType.REFUND, account, null, directPaymentId, amount, currency, null, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, properties, callContext, internalCallContext);
     }
 
-    public DirectPayment createCredit(final boolean isApiPayment, final Account account, @Nullable final UUID paymentMethodId, @Nullable final UUID directPaymentId, final BigDecimal amount, final Currency currency,
+    public DirectPayment createCredit(final boolean isApiPayment, @Nullable final UUID attemptId, final Account account, @Nullable final UUID paymentMethodId, @Nullable final UUID directPaymentId, final BigDecimal amount, final Currency currency,
                                       final String directPaymentExternalKey, final String directPaymentTransactionExternalKey, final boolean shouldLockAccountAndDispatch,
                                       final Iterable<PluginProperty> properties, final CallContext callContext, final InternalCallContext internalCallContext) throws PaymentApiException {
-        return performOperation(isApiPayment, TransactionType.CREDIT, account, paymentMethodId, directPaymentId, amount, currency, directPaymentExternalKey, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, properties, callContext, internalCallContext);
+        return performOperation(isApiPayment, attemptId, TransactionType.CREDIT, account, paymentMethodId, directPaymentId, amount, currency, directPaymentExternalKey, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, properties, callContext, internalCallContext);
     }
 
-    public DirectPayment createChargeback(final boolean isApiPayment, final Account account, final UUID paymentId, final String directPaymentTransactionExternalKey, final BigDecimal amount, final Currency currency, final boolean shouldLockAccountAndDispatch,
+    public DirectPayment createChargeback(final boolean isApiPayment, @Nullable final UUID attemptId, final Account account, final UUID paymentId, final String directPaymentTransactionExternalKey, final BigDecimal amount, final Currency currency, final boolean shouldLockAccountAndDispatch,
                                           final CallContext callContext, final InternalCallContext internalCallContext) throws PaymentApiException {
-        return performOperation(isApiPayment, TransactionType.CHARGEBACK, account, null, paymentId, amount, currency, null, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, ImmutableList.<PluginProperty>of(), callContext, internalCallContext);
+        return performOperation(isApiPayment, attemptId, TransactionType.CHARGEBACK, account, null, paymentId, amount, currency, null, directPaymentTransactionExternalKey, shouldLockAccountAndDispatch, ImmutableList.<PluginProperty>of(), callContext, internalCallContext);
     }
 
     public List<DirectPayment> getAccountPayments(final UUID accountId, final InternalTenantContext tenantContext) throws PaymentApiException {
@@ -316,7 +316,7 @@ public class DirectPaymentProcessor extends ProcessorBase {
         return toDirectPayment(paymentModelDao, transactionsForAccount, pluginTransactions);
     }
 
-    private DirectPayment performOperation(final boolean isApiPayment, final TransactionType transactionType, final Account account, final UUID paymentMethodId, final UUID directPaymentId, final BigDecimal amount, final Currency currency,
+    private DirectPayment performOperation(final boolean isApiPayment, @Nullable final UUID attemptId, final TransactionType transactionType, final Account account, final UUID paymentMethodId, final UUID directPaymentId, final BigDecimal amount, final Currency currency,
                                            final String directPaymentExternalKey, final String directPaymentTransactionExternalKey,
                                            final boolean shouldLockAccountAndDispatch, final Iterable<PluginProperty> properties, final CallContext callContext,
                                            final InternalCallContext internalCallContext) throws PaymentApiException {
@@ -327,6 +327,7 @@ public class DirectPaymentProcessor extends ProcessorBase {
 
             final UUID nonNullDirectPaymentId = directPaymentAutomatonRunner.run(transactionType,
                                                                                  account,
+                                                                                 attemptId,
                                                                                  paymentMethodId,
                                                                                  directPaymentId,
                                                                                  directPaymentExternalKey,
@@ -393,7 +394,7 @@ public class DirectPaymentProcessor extends ProcessorBase {
                                                               }
                                                           }).orNull() : null;
 
-                return new DefaultDirectPaymentTransaction(input.getId(), input.getTransactionExternalKey(), input.getCreatedDate(), input.getUpdatedDate(), input.getPaymentId(),
+                return new DefaultDirectPaymentTransaction(input.getId(), input.getAttemptId(), input.getTransactionExternalKey(), input.getCreatedDate(), input.getUpdatedDate(), input.getPaymentId(),
                                                            input.getTransactionType(), input.getEffectiveDate(), input.getTransactionStatus(), input.getAmount(), input.getCurrency(),
                                                            input.getProcessedAmount(), input.getProcessedCurrency(),
                                                            input.getGatewayErrorCode(), input.getGatewayErrorMsg(), info);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java
index 02de4a5..8b9ae94 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java
@@ -17,6 +17,7 @@
 
 package org.killbill.billing.payment.core;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -25,18 +26,36 @@ import javax.inject.Inject;
 import javax.inject.Named;
 
 import org.joda.time.DateTime;
+import org.killbill.billing.ObjectType;
+import org.killbill.billing.account.api.Account;
+import org.killbill.billing.account.api.AccountApiException;
+import org.killbill.billing.account.api.AccountInternalApi;
+import org.killbill.billing.callcontext.DefaultCallContext;
+import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.payment.api.PaymentApiException;
 import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.core.sm.PluginControlledDirectPaymentAutomatonRunner;
+import org.killbill.billing.payment.core.sm.RetryableDirectPaymentStateContext;
+import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
 import org.killbill.billing.payment.dao.PaymentDao;
+import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
+import org.killbill.billing.payment.dao.PluginPropertySerializer;
+import org.killbill.billing.payment.dao.PluginPropertySerializer.PluginPropertySerializerException;
 import org.killbill.billing.payment.glue.PaymentModule;
+import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.UserType;
 import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.billing.util.dao.NonEntityDao;
 import org.killbill.clock.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
 /**
  * Takes care of incomplete payment/transactions.
  */
@@ -45,32 +64,52 @@ public class Janitor {
     private final static Logger log = LoggerFactory.getLogger(Janitor.class);
 
     private final ScheduledExecutorService janitorExecutor;
+    private final AccountInternalApi accountInternalApi;
     private final PaymentDao paymentDao;
     private final Clock clock;
     private final PaymentConfig paymentConfig;
     private final InternalCallContextFactory internalCallContextFactory;
+    private final NonEntityDao nonEntityDao;
+    private final PluginControlledDirectPaymentAutomatonRunner pluginControlledDirectPaymentAutomatonRunner;
+
+    private volatile boolean isStopped;
 
     @Inject
-    public Janitor(final PaymentDao paymentDao,
+    public Janitor(final AccountInternalApi accountInternalApi,
+                   final PaymentDao paymentDao,
                    final PaymentConfig paymentConfig,
                    final Clock clock,
+                   final NonEntityDao nonEntityDao,
                    final InternalCallContextFactory internalCallContextFactory,
+                   final PluginControlledDirectPaymentAutomatonRunner pluginControlledDirectPaymentAutomatonRunner,
                    @Named(PaymentModule.JANITOR_EXECUTOR_NAMED) final ScheduledExecutorService janitorExecutor) {
+        this.accountInternalApi = accountInternalApi;
         this.paymentDao = paymentDao;
         this.clock = clock;
         this.paymentConfig = paymentConfig;
         this.janitorExecutor = janitorExecutor;
+        this.nonEntityDao = nonEntityDao;
         this.internalCallContextFactory = internalCallContextFactory;
+        this.pluginControlledDirectPaymentAutomatonRunner = pluginControlledDirectPaymentAutomatonRunner;
     }
 
     public void start() {
-        final TimeUnit rateUnit = paymentConfig.getJanitorRunningRate().getUnit();
-        final long period = paymentConfig.getJanitorRunningRate().getPeriod();
-        janitorExecutor.scheduleAtFixedRate(new PendingTransactionTask(), period, period, rateUnit);
+        isStopped = false;
+        // Start task for removing old pending payments.
+        final TimeUnit pendingRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
+        final long pendingPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
+        janitorExecutor.scheduleAtFixedRate(new PendingTransactionTask(), pendingPeriod, pendingPeriod, pendingRateUnit);
+
+
+        // Start task for completing incomplete payment attempts
+        final TimeUnit attemptCompletionRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
+        final long attemptCompletionPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
+        janitorExecutor.scheduleAtFixedRate(new AttemptCompletionTask(), attemptCompletionPeriod, attemptCompletionPeriod, attemptCompletionRateUnit);
     }
 
     public void stop() {
-        janitorExecutor.shutdown();
+        isStopped = true;
+        //janitorExecutor.shutdownNow();
     }
 
     /**
@@ -78,20 +117,124 @@ public class Janitor {
      */
     private final class PendingTransactionTask implements Runnable {
 
-        private final InternalTenantContext fakeCallContext;
+        private final InternalCallContext fakeCallContext;
 
         private PendingTransactionTask() {
             this.fakeCallContext = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, "PendingJanitorTask", CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID());
         }
 
+        @Override
+        public void run() {
+            if (isStopped) {
+                return;
+            }
+            log.info("PendingTransactionTask start run ");
+
+            int result = paymentDao.failOldPendingTransactions(TransactionStatus.PLUGIN_FAILURE, getCreatedDateBefore(), fakeCallContext);
+            if (result > 0) {
+                log.info("PendingTransactionTask moved " + result + " PENDING payments ->  PLUGIN_FAILURE");
+            }
+        }
+
         private DateTime getCreatedDateBefore() {
             final long delayBeforeNowMs = paymentConfig.getJanitorPendingCleanupTime().getMillis();
             return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
         }
+    }
+
+    /**
+     * Task to complete 'partially' incomplete attempts
+     * <p/>
+     * If the state of the transaction associated with the attempt completed, but the attempt state machine did not,
+     * we rerun the retry state machine to complete the call and transition the attempt into a terminal state.
+     */
+    private final class AttemptCompletionTask implements Runnable {
+
+        private final InternalCallContext fakeCallContext;
+
+        private AttemptCompletionTask() {
+            this.fakeCallContext = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, "PendingJanitorTask", CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID());
+        }
 
         @Override
         public void run() {
-            paymentDao.failOldPendingTransactions(TransactionStatus.PLUGIN_FAILURE, getCreatedDateBefore(), fakeCallContext);
+
+            if (isStopped) {
+                return;
+            }
+
+            // STEPH state string hack
+            final List<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByState("INIT", getCreatedDateBefore(), fakeCallContext);
+            log.info("AttemptCompletionTask start run : found " + incompleteAttempts.size() + " incomplete attempts");
+
+            for (PaymentAttemptModelDao cur : incompleteAttempts) {
+                complete(cur);
+            }
+        }
+
+        private DateTime getCreatedDateBefore() {
+            final long delayBeforeNowMs = paymentConfig.getJanitorAttemptCompletionTime().getMillis();
+            return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
+        }
+
+        private void complete(final PaymentAttemptModelDao attempt) {
+
+            // STEPH seems a bit insane??
+            final InternalTenantContext tenantContext = internalCallContextFactory.createInternalTenantContext(attempt.getAccountId(), attempt.getId(), ObjectType.PAYMENT_ATTEMPT);
+            final UUID tenantId = nonEntityDao.retrieveIdFromObject(tenantContext.getTenantRecordId(), ObjectType.TENANT);
+            final CallContext callContext = new DefaultCallContext(tenantId, "AttemptCompletionJanitorTask", CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID(), clock);
+            final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(attempt.getAccountId(), callContext);
+
+            final List<PaymentTransactionModelDao> transactions = paymentDao.getDirectPaymentTransactionsByExternalKey(attempt.getTransactionExternalKey(), tenantContext);
+            final PaymentTransactionModelDao transaction = Iterables.tryFind(transactions, new Predicate<PaymentTransactionModelDao>() {
+                @Override
+                public boolean apply(final PaymentTransactionModelDao input) {
+                    return input.getAttemptId().equals(attempt.getId()) &&
+                           input.getTransactionStatus() == TransactionStatus.SUCCESS;
+                }
+            }).orNull();
+
+            if (transaction == null) {
+                log.info("AttemptCompletionTask moving attempt " + attempt.getId() + " -> ABORTED");
+                paymentDao.updatePaymentAttempt(attempt.getId(), attempt.getTransactionId(), "ABORTED", internalCallContext);
+                return;
+            }
+
+            try {
+
+                log.info("AttemptCompletionTask completing attempt " + attempt.getId() + " -> SUCCESS");
+
+                final Account account = accountInternalApi.getAccountById(attempt.getAccountId(), tenantContext);
+                final boolean isApiPayment = true; // unclear
+                final RetryableDirectPaymentStateContext paymentStateContext = new RetryableDirectPaymentStateContext(attempt.getPluginName(),
+                                                                                                                isApiPayment,
+                                                                                                                transaction.getPaymentId(),
+                                                                                                                attempt.getPaymentExternalKey(),
+                                                                                                                transaction.getTransactionExternalKey(),
+                                                                                                                transaction.getTransactionType(),
+                                                                                                                account,
+                                                                                                                attempt.getPaymentMethodId(),
+                                                                                                                transaction.getAmount(),
+                                                                                                                transaction.getCurrency(),
+                                                                                                                PluginPropertySerializer.deserialize(attempt.getPluginProperties()),
+                                                                                                                internalCallContext,
+                                                                                                                callContext);
+
+                paymentStateContext.setAttemptId(attempt.getId()); // Normally set by leavingState Callback
+                paymentStateContext.setDirectPaymentTransactionModelDao(transaction); // Normally set by raw state machine
+                //
+                // Will rerun the state machine with special callbacks to only make the onCompletion call
+                // to the PaymentControlPluginApi plugin and transition the state.
+                //
+                pluginControlledDirectPaymentAutomatonRunner.completeRun(paymentStateContext);
+
+            } catch (AccountApiException e) {
+                log.warn("Failed to complete payment attempt " + attempt.getId(), e);
+            } catch (PluginPropertySerializerException e) {
+                log.warn("Failed to complete payment attempt " + attempt.getId(), e);
+            } catch (PaymentApiException e) {
+                log.warn("Failed to complete payment attempt " + attempt.getId(), e);
+            }
         }
     }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java b/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
index 23d619d..09d6616 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
@@ -180,7 +180,8 @@ public abstract class ProcessorBase {
         final PaymentTransactionModelDao transactionAlreadyExists = Iterables.tryFind(transactions, new Predicate<PaymentTransactionModelDao>() {
             @Override
             public boolean apply(final PaymentTransactionModelDao input) {
-                return input.getTransactionStatus() == TransactionStatus.SUCCESS;
+                return input.getTransactionStatus() == TransactionStatus.SUCCESS ||
+                       input.getTransactionStatus() == TransactionStatus.UNKNOWN;
             }
         }).orNull();
         if (transactionAlreadyExists != null) {
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/DirectPaymentAutomatonDAOHelper.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/DirectPaymentAutomatonDAOHelper.java
index 930a29e..4be2d4c 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/DirectPaymentAutomatonDAOHelper.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/DirectPaymentAutomatonDAOHelper.java
@@ -162,6 +162,7 @@ public class DirectPaymentAutomatonDAOHelper {
 
         return new PaymentTransactionModelDao(createdDate,
                                               updatedDate,
+                                              directPaymentStateContext.getAttemptId(),
                                               directPaymentStateContext.getDirectPaymentTransactionExternalKey(),
                                               directPaymentId,
                                               directPaymentStateContext.getTransactionType(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/DirectPaymentAutomatonRunner.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/DirectPaymentAutomatonRunner.java
index 05fd7ff..b13ada1 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/DirectPaymentAutomatonRunner.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/DirectPaymentAutomatonRunner.java
@@ -92,7 +92,7 @@ public class DirectPaymentAutomatonRunner {
 
     }
 
-    public UUID run(final TransactionType transactionType, final Account account, @Nullable final UUID paymentMethodId,
+    public UUID run(final TransactionType transactionType, final Account account, @Nullable final UUID attemptId, @Nullable final UUID paymentMethodId,
                     @Nullable final UUID directPaymentId, @Nullable final String directPaymentExternalKey, final String directPaymentTransactionExternalKey,
                     @Nullable final BigDecimal amount, @Nullable final Currency currency,
                     final boolean shouldLockAccount, final Iterable<PluginProperty> properties,
@@ -100,7 +100,7 @@ public class DirectPaymentAutomatonRunner {
 
         final DateTime utcNow = clock.getUTCNow();
 
-        final DirectPaymentStateContext directPaymentStateContext = new DirectPaymentStateContext(directPaymentId, directPaymentExternalKey, directPaymentTransactionExternalKey, transactionType,
+        final DirectPaymentStateContext directPaymentStateContext = new DirectPaymentStateContext(directPaymentId, attemptId, directPaymentExternalKey, directPaymentTransactionExternalKey, transactionType,
                                                                                                   account, paymentMethodId, amount, currency, shouldLockAccount, properties, internalCallContext, callContext);
         final DirectPaymentAutomatonDAOHelper daoHelper = new DirectPaymentAutomatonDAOHelper(directPaymentStateContext, utcNow, paymentDao, pluginRegistry, internalCallContext);
 
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/DirectPaymentStateContext.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/DirectPaymentStateContext.java
index dfeeaa6..8044e86 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/DirectPaymentStateContext.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/DirectPaymentStateContext.java
@@ -38,6 +38,7 @@ public class DirectPaymentStateContext {
 
     // HACK
     protected UUID paymentMethodId;
+    protected UUID attemptId;
 
     // Stateful objects created by the callbacks and passed to the other following callbacks in the automaton
     protected List<PaymentTransactionModelDao> onLeavingStateExistingTransactions;
@@ -63,17 +64,18 @@ public class DirectPaymentStateContext {
                                      final Account account, @Nullable final UUID paymentMethodId, final BigDecimal amount, final Currency currency,
                                      final boolean shouldLockAccountAndDispatch, final Iterable<PluginProperty> properties,
                                      final InternalCallContext internalCallContext, final CallContext callContext) {
-        this(directPaymentId, null, directPaymentTransactionExternalKey, transactionType, account, paymentMethodId,
+        this(directPaymentId, null, null, directPaymentTransactionExternalKey, transactionType, account, paymentMethodId,
              amount, currency, shouldLockAccountAndDispatch, properties, internalCallContext, callContext);
     }
 
     // Used to create new payment and transactions
-    public DirectPaymentStateContext(@Nullable final UUID directPaymentId, @Nullable final String directPaymentExternalKey,
+    public DirectPaymentStateContext(@Nullable final UUID directPaymentId, @Nullable final UUID attemptId, @Nullable final String directPaymentExternalKey,
                                      @Nullable final String directPaymentTransactionExternalKey, final TransactionType transactionType,
                                      final Account account, @Nullable final UUID paymentMethodId, final BigDecimal amount, final Currency currency,
                                      final boolean shouldLockAccountAndDispatch, final Iterable<PluginProperty> properties,
                                      final InternalCallContext internalCallContext, final CallContext callContext) {
         this.directPaymentId = directPaymentId;
+        this.attemptId= attemptId;
         this.directPaymentExternalKey = directPaymentExternalKey;
         this.directPaymentTransactionExternalKey = directPaymentTransactionExternalKey;
         this.transactionType = transactionType;
@@ -144,6 +146,14 @@ public class DirectPaymentStateContext {
         return paymentMethodId;
     }
 
+    public UUID getAttemptId() {
+        return attemptId;
+    }
+
+    public void setAttemptId(final UUID attemptId) {
+        this.attemptId = attemptId;
+    }
+
     public BigDecimal getAmount() {
         return amount;
     }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlledDirectPaymentAutomatonRunner.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlledDirectPaymentAutomatonRunner.java
index c48a3ca..31a6376 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlledDirectPaymentAutomatonRunner.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlledDirectPaymentAutomatonRunner.java
@@ -86,6 +86,7 @@ public class PluginControlledDirectPaymentAutomatonRunner extends DirectPaymentA
         this.retryOperation = fetchRetryOperation();
     }
 
+
     public DirectPayment run(final boolean isApiPayment, final TransactionType transactionType, final Account account, @Nullable final UUID paymentMethodId,
                              @Nullable final UUID directPaymentId, @Nullable final String directPaymentExternalKey, final String directPaymentTransactionExternalKey,
                              @Nullable final BigDecimal amount, @Nullable final Currency currency,
@@ -128,6 +129,31 @@ public class PluginControlledDirectPaymentAutomatonRunner extends DirectPaymentA
         return directPaymentStateContext.getResult();
     }
 
+    public DirectPayment completeRun(final RetryableDirectPaymentStateContext paymentStateContext) throws PaymentApiException {
+
+        try {
+
+            final OperationCallback callback = new RetryCompletionOperationCallback(locker, paymentPluginDispatcher, paymentStateContext, directPaymentProcessor, paymentControlPluginRegistry);
+            final LeavingStateCallback leavingStateCallback = new RetryNoopLeavingStateCallback();
+            final EnteringStateCallback enteringStateCallback = new RetryEnteringStateCallback(this, paymentStateContext, retryServiceScheduler);
+
+            initialState.runOperation(retryOperation, callback, enteringStateCallback, leavingStateCallback);
+
+        } catch (MissingEntryException e) {
+            throw new PaymentApiException(e.getCause(), ErrorCode.PAYMENT_INTERNAL_ERROR, Objects.firstNonNull(e.getMessage(), ""));
+        } catch (OperationException e) {
+            if (e.getCause() == null) {
+                throw new PaymentApiException(e, ErrorCode.PAYMENT_INTERNAL_ERROR, Objects.firstNonNull(e.getMessage(), ""));
+            } else if (e.getCause() instanceof PaymentApiException) {
+                throw (PaymentApiException) e.getCause();
+            } else {
+                throw new PaymentApiException(e.getCause(), ErrorCode.PAYMENT_INTERNAL_ERROR, Objects.firstNonNull(e.getMessage(), ""));
+            }
+        }
+        return paymentStateContext.getResult();
+    }
+
+
     // STEPH to be moved
     public final State fetchState(final String stateName) {
         try {
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryableDirectPaymentStateContext.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryableDirectPaymentStateContext.java
index e085666..e85d76e 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryableDirectPaymentStateContext.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryableDirectPaymentStateContext.java
@@ -25,6 +25,8 @@ import org.joda.time.DateTime;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.payment.api.DefaultDirectPayment;
+import org.killbill.billing.payment.api.DefaultDirectPaymentTransaction;
 import org.killbill.billing.payment.api.DirectPayment;
 import org.killbill.billing.payment.api.DirectPaymentTransaction;
 import org.killbill.billing.payment.api.PluginProperty;
@@ -33,11 +35,9 @@ import org.killbill.billing.util.callcontext.CallContext;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
 
 public class RetryableDirectPaymentStateContext extends DirectPaymentStateContext {
 
-    private UUID attemptId;
     private boolean isApiPayment;
     private DateTime retryDate;
     private String pluginName;
@@ -47,8 +47,7 @@ public class RetryableDirectPaymentStateContext extends DirectPaymentStateContex
                                               @Nullable final String directPaymentTransactionExternalKey, final TransactionType transactionType,
                                               final Account account, @Nullable final UUID paymentMethodId, final BigDecimal amount, final Currency currency,
                                               final Iterable<PluginProperty> properties, final InternalCallContext internalCallContext, final CallContext callContext) {
-        super(directPaymentId, directPaymentExternalKey, directPaymentTransactionExternalKey, transactionType, account, paymentMethodId, amount, currency, true, properties, internalCallContext, callContext);
-        this.attemptId = attemptId;
+        super(directPaymentId, null, directPaymentExternalKey, directPaymentTransactionExternalKey, transactionType, account, paymentMethodId, amount, currency, true, properties, internalCallContext, callContext);
         this.pluginName = pluginName;
         this.isApiPayment = isApiPayment;
     }
@@ -85,18 +84,15 @@ public class RetryableDirectPaymentStateContext extends DirectPaymentStateContex
         this.amount = adjustedAmount;
     }
 
-    public UUID getAttemptId() {
-        return attemptId;
-    }
-
-    public void setAttemptId(final UUID attemptId) {
-        this.attemptId = attemptId;
-    }
-
     public DirectPaymentTransaction getCurrentTransaction() {
-        if (result == null || result.getTransactions() == null || result.getTransactions().size() == 0) {
+        if (result == null || result.getTransactions() == null) {
             return null;
         }
-        return result.getTransactions().get(result.getTransactions().size() -1);
+        return Iterables.tryFind(result.getTransactions(), new Predicate<DirectPaymentTransaction>() {
+            @Override
+            public boolean apply(final DirectPaymentTransaction input) {
+                return ((DefaultDirectPaymentTransaction) input).getAttemptId().equals(attemptId);
+            }
+        }).orNull();
     }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryAuthorizeOperationCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryAuthorizeOperationCallback.java
index 5159d46..4605604 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryAuthorizeOperationCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryAuthorizeOperationCallback.java
@@ -34,6 +34,7 @@ public class RetryAuthorizeOperationCallback extends RetryOperationCallback {
     @Override
     protected DirectPayment doCallSpecificOperationCallback() throws PaymentApiException {
         return directPaymentProcessor.createAuthorization(retryableDirectPaymentStateContext.isApiPayment(),
+                                                          retryableDirectPaymentStateContext.getAttemptId(),
                                                           retryableDirectPaymentStateContext.getAccount(),
                                                           retryableDirectPaymentStateContext.getPaymentMethodId(),
                                                           retryableDirectPaymentStateContext.getDirectPaymentId(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryCaptureOperationCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryCaptureOperationCallback.java
index 88cd362..5090352 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryCaptureOperationCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryCaptureOperationCallback.java
@@ -34,6 +34,7 @@ public class RetryCaptureOperationCallback extends RetryOperationCallback {
     @Override
     protected DirectPayment doCallSpecificOperationCallback() throws PaymentApiException {
         return directPaymentProcessor.createCapture(retryableDirectPaymentStateContext.isApiPayment(),
+                                                    retryableDirectPaymentStateContext.getAttemptId(),
                                                     retryableDirectPaymentStateContext.getAccount(),
                                                     retryableDirectPaymentStateContext.getPaymentMethodId(),
                                                     retryableDirectPaymentStateContext.getAmount(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryChargebackOperationCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryChargebackOperationCallback.java
index d442ceb..2ea1a21 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryChargebackOperationCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryChargebackOperationCallback.java
@@ -35,6 +35,7 @@ public class RetryChargebackOperationCallback extends RetryOperationCallback {
     @Override
     protected DirectPayment doCallSpecificOperationCallback() throws PaymentApiException {
         return directPaymentProcessor.createChargeback(retryableDirectPaymentStateContext.isApiPayment(),
+                                                       retryableDirectPaymentStateContext.getAttemptId(),
                                                        retryableDirectPaymentStateContext.getAccount(),
                                                        retryableDirectPaymentStateContext.getDirectPaymentId(),
                                                        retryableDirectPaymentStateContext.getDirectPaymentTransactionExternalKey(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryCompletionOperationCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryCompletionOperationCallback.java
new file mode 100644
index 0000000..e336cc6
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryCompletionOperationCallback.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core.sm;
+
+import org.killbill.automaton.OperationException;
+import org.killbill.automaton.OperationResult;
+import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.api.DirectPayment;
+import org.killbill.billing.payment.api.PaymentApiException;
+import org.killbill.billing.payment.core.DirectPaymentProcessor;
+import org.killbill.billing.payment.core.ProcessorBase.WithAccountLockCallback;
+import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
+import org.killbill.billing.payment.dispatcher.PluginDispatcher;
+import org.killbill.billing.retry.plugin.api.PaymentControlContext;
+import org.killbill.billing.retry.plugin.api.PaymentControlPluginApi;
+import org.killbill.commons.locker.GlobalLocker;
+
+public class RetryCompletionOperationCallback extends RetryOperationCallback {
+
+    public RetryCompletionOperationCallback(final GlobalLocker locker, final PluginDispatcher<OperationResult> paymentPluginDispatcher, final RetryableDirectPaymentStateContext directPaymentStateContext, final DirectPaymentProcessor directPaymentProcessor, final OSGIServiceRegistration<PaymentControlPluginApi> retryPluginRegistry) {
+        super(locker, paymentPluginDispatcher, directPaymentStateContext, directPaymentProcessor, retryPluginRegistry);
+    }
+
+    @Override
+    public OperationResult doOperationCallback() throws OperationException {
+
+        return dispatchWithAccountLockAndTimeout(new WithAccountLockCallback<OperationResult, OperationException>() {
+            @Override
+            public OperationResult doOperation() throws OperationException {
+                final PaymentTransactionModelDao transaction = directPaymentStateContext.getDirectPaymentTransactionModelDao();
+                final PaymentControlContext updatedPaymentControlContext = new DefaultPaymentControlContext(directPaymentStateContext.getAccount(),
+                                                                                                            directPaymentStateContext.getPaymentMethodId(),
+                                                                                                            retryableDirectPaymentStateContext.getAttemptId(),
+                                                                                                            transaction.getPaymentId(),
+                                                                                                            directPaymentStateContext.getDirectPaymentExternalKey(),
+                                                                                                            transaction.getId(),
+                                                                                                            directPaymentStateContext.getDirectPaymentTransactionExternalKey(),
+                                                                                                            directPaymentStateContext.getTransactionType(),
+                                                                                                            transaction.getAmount(),
+                                                                                                            transaction.getCurrency(),
+                                                                                                            transaction.getProcessedAmount(),
+                                                                                                            transaction.getProcessedCurrency(),
+                                                                                                            directPaymentStateContext.getProperties(),
+                                                                                                            retryableDirectPaymentStateContext.isApiPayment(),
+                                                                                                            directPaymentStateContext.callContext);
+
+                onCompletion(retryableDirectPaymentStateContext.getPluginName(), updatedPaymentControlContext);
+                return OperationResult.SUCCESS;
+            }
+        });
+    }
+
+    @Override
+    protected DirectPayment doCallSpecificOperationCallback() throws PaymentApiException {
+        return null;
+    }
+}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryCreditOperationCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryCreditOperationCallback.java
index 1b309d9..f5e4084 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryCreditOperationCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryCreditOperationCallback.java
@@ -34,6 +34,7 @@ public class RetryCreditOperationCallback extends RetryOperationCallback {
     @Override
     protected DirectPayment doCallSpecificOperationCallback() throws PaymentApiException {
         return directPaymentProcessor.createCredit(retryableDirectPaymentStateContext.isApiPayment(),
+                                                   retryableDirectPaymentStateContext.getAttemptId(),
                                                    retryableDirectPaymentStateContext.getAccount(),
                                                    retryableDirectPaymentStateContext.getPaymentMethodId(),
                                                    retryableDirectPaymentStateContext.getDirectPaymentId(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryNoopLeavingStateCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryNoopLeavingStateCallback.java
new file mode 100644
index 0000000..2a45c35
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryNoopLeavingStateCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core.sm;
+
+import org.killbill.automaton.OperationException;
+import org.killbill.automaton.State;
+import org.killbill.automaton.State.LeavingStateCallback;
+
+public class RetryNoopLeavingStateCallback implements LeavingStateCallback {
+
+    @Override
+    public void leavingState(final State oldState) throws OperationException {
+    }
+}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryOperationCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryOperationCallback.java
index 7c7bd6e..7250e6e 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryOperationCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryOperationCallback.java
@@ -183,6 +183,15 @@ public abstract class RetryOperationCallback extends OperationCallbackBase imple
         return new OperationException(e, getOperationResultOnException(directPaymentStateContext));
     }
 
+    protected void onCompletion(final String pluginName, final PaymentControlContext paymentControlContext) {
+        final PaymentControlPluginApi plugin = paymentControlPluginRegistry.getServiceForName(pluginName);
+        try {
+            plugin.onSuccessCall(paymentControlContext);
+        } catch (PaymentControlApiException e) {
+            logger.warn("Plugin " + pluginName + " failed to complete onCompletion call for " + paymentControlContext.getPaymentExternalKey(), e);
+        }
+    }
+
     private OperationResult getOperationResultOnException(final DirectPaymentStateContext directPaymentStateContext) {
         final RetryableDirectPaymentStateContext retryableDirectPaymentStateContext = (RetryableDirectPaymentStateContext) directPaymentStateContext;
         final OperationResult operationResult = retryableDirectPaymentStateContext.getRetryDate() != null ? OperationResult.FAILURE : OperationResult.EXCEPTION;
@@ -217,16 +226,8 @@ public abstract class RetryOperationCallback extends OperationCallbackBase imple
         }
     }
 
-    private void onCompletion(final String pluginName, final PaymentControlContext paymentControlContext) {
-        final PaymentControlPluginApi plugin = paymentControlPluginRegistry.getServiceForName(pluginName);
-        try {
-            plugin.onSuccessCall(paymentControlContext);
-        } catch (PaymentControlApiException e) {
-            logger.warn("Plugin " + pluginName + " failed to complete onCompletion call for " + paymentControlContext.getPaymentExternalKey(), e);
-        }
-    }
 
-    public class DefaultPaymentControlContext extends DefaultCallContext implements PaymentControlContext {
+    public static class DefaultPaymentControlContext extends DefaultCallContext implements PaymentControlContext {
 
         private final Account account;
         private final UUID paymentMethodId;
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryPurchaseOperationCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryPurchaseOperationCallback.java
index 1e5636a..f767aec 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryPurchaseOperationCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryPurchaseOperationCallback.java
@@ -34,6 +34,7 @@ public class RetryPurchaseOperationCallback extends RetryOperationCallback {
     @Override
     protected DirectPayment doCallSpecificOperationCallback() throws PaymentApiException {
         return directPaymentProcessor.createPurchase(retryableDirectPaymentStateContext.isApiPayment(),
+                                                     retryableDirectPaymentStateContext.getAttemptId(),
                                                      retryableDirectPaymentStateContext.getAccount(),
                                                      retryableDirectPaymentStateContext.getPaymentMethodId(),
                                                      retryableDirectPaymentStateContext.getDirectPaymentId(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryRefundOperationCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryRefundOperationCallback.java
index 2fc77fb..fd56cf9 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryRefundOperationCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryRefundOperationCallback.java
@@ -34,6 +34,7 @@ public class RetryRefundOperationCallback extends RetryOperationCallback {
     @Override
     protected DirectPayment doCallSpecificOperationCallback() throws PaymentApiException {
         return directPaymentProcessor.createRefund(retryableDirectPaymentStateContext.isApiPayment(),
+                                                   retryableDirectPaymentStateContext.getAttemptId(),
                                                    retryableDirectPaymentStateContext.getAccount(),
                                                    retryableDirectPaymentStateContext.getDirectPaymentId(),
                                                    retryableDirectPaymentStateContext.getAmount(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryVoidOperationCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryVoidOperationCallback.java
index a2f2382..a6e2681 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryVoidOperationCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryVoidOperationCallback.java
@@ -34,12 +34,13 @@ public class RetryVoidOperationCallback extends RetryOperationCallback {
     @Override
     protected DirectPayment doCallSpecificOperationCallback() throws PaymentApiException {
         return directPaymentProcessor.createVoid(retryableDirectPaymentStateContext.isApiPayment(),
-                                                   retryableDirectPaymentStateContext.getAccount(),
-                                                   retryableDirectPaymentStateContext.getDirectPaymentId(),
-                                                   retryableDirectPaymentStateContext.getDirectPaymentTransactionExternalKey(),
-                                                   false,
-                                                   retryableDirectPaymentStateContext.getProperties(),
-                                                   retryableDirectPaymentStateContext.getCallContext(),
-                                                   retryableDirectPaymentStateContext.getInternalCallContext());
+                                                 retryableDirectPaymentStateContext.getAttemptId(),
+                                                 retryableDirectPaymentStateContext.getAccount(),
+                                                 retryableDirectPaymentStateContext.getDirectPaymentId(),
+                                                 retryableDirectPaymentStateContext.getDirectPaymentTransactionExternalKey(),
+                                                 false,
+                                                 retryableDirectPaymentStateContext.getProperties(),
+                                                 retryableDirectPaymentStateContext.getCallContext(),
+                                                 retryableDirectPaymentStateContext.getInternalCallContext());
     }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index d9cb89c..4092ce1 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -103,6 +103,17 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
+    public List<PaymentAttemptModelDao> getPaymentAttemptsByState(final String stateName, final DateTime createdBeforeDate, final InternalTenantContext context) {
+        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentAttemptModelDao>>() {
+            @Override
+            public List<PaymentAttemptModelDao> inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                final PaymentAttemptSqlDao transactional = entitySqlDaoWrapperFactory.become(PaymentAttemptSqlDao.class);
+                return transactional.getByStateName(stateName, createdBeforeDate.toDate(), context);
+            }
+        });
+    }
+
+    @Override
     public List<PaymentAttemptModelDao> getPaymentAttempts(final String paymentExternalKey, final InternalTenantContext context) {
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentAttemptModelDao>>() {
 
@@ -127,10 +138,10 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
-    public void failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalTenantContext context) {
-         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+    public int failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalCallContext context) {
+         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Integer>() {
             @Override
-            public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+            public Integer inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
                 final TransactionSqlDao transactional = entitySqlDaoWrapperFactory.become(TransactionSqlDao.class);
                 final List<PaymentTransactionModelDao> oldPendingTransactions = transactional.getByTransactionStatusPriorDate(TransactionStatus.PENDING.toString(), createdBeforeDate.toDate(), context);
                 if (oldPendingTransactions.size() > 0) {
@@ -140,9 +151,9 @@ public class DefaultPaymentDao implements PaymentDao {
                             return input.getId().toString();
                         }
                     });
-                    transactional.failOldPendingTransactions(oldPendingTransactionIds, TransactionStatus.PAYMENT_FAILURE.toString(), context);
+                    return transactional.failOldPendingTransactions(oldPendingTransactionIds, TransactionStatus.PAYMENT_FAILURE.toString(), context);
                 }
-                return null;
+                return 0;
             }
         });
     }
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
index d66a717..e3a3fe4 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
@@ -16,6 +16,7 @@
 
 package org.killbill.billing.payment.dao;
 
+import java.util.Date;
 import java.util.List;
 
 import org.killbill.billing.callcontext.InternalCallContext;
@@ -42,10 +43,15 @@ public interface PaymentAttemptSqlDao extends EntitySqlDao<PaymentAttemptModelDa
 
     @SqlQuery
     List<PaymentAttemptModelDao> getByTransactionExternalKey(@Bind("transactionExternalKey") final String transactionExternalKey,
-                                                       @BindBean final InternalTenantContext context);
+                                                             @BindBean final InternalTenantContext context);
 
     @SqlQuery
     List<PaymentAttemptModelDao> getByPaymentExternalKey(@Bind("paymentExternalKey") final String paymentExternalKey,
+                                                         @BindBean final InternalTenantContext context);
+
+    @SqlQuery
+    List<PaymentAttemptModelDao> getByStateName(@Bind("stateName") final String stateName,
+                                                @Bind("createdBeforeDate") final Date createdBeforeDate,
                                                 @BindBean final InternalTenantContext context);
 
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
index a452ccd..b6cfac0 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
@@ -29,12 +29,14 @@ import org.killbill.billing.util.entity.Pagination;
 
 public interface PaymentDao {
 
-    public void failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalTenantContext context);
+    public int failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalCallContext context);
 
     public PaymentAttemptModelDao insertPaymentAttemptWithProperties(PaymentAttemptModelDao attempt, InternalCallContext context);
 
     public void updatePaymentAttempt(UUID paymentAttemptId, UUID transactionId, String state, InternalCallContext context);
 
+    public List<PaymentAttemptModelDao> getPaymentAttemptsByState(String stateName, final DateTime createdBeforeDate, InternalTenantContext context);
+
     public List<PaymentAttemptModelDao> getPaymentAttempts(String paymentExternalKey, InternalTenantContext context);
 
     public List<PaymentAttemptModelDao> getPaymentAttemptByTransactionExternalKey(String externalKey, InternalTenantContext context);
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentTransactionModelDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentTransactionModelDao.java
index eb5aae7..e718a6f 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentTransactionModelDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentTransactionModelDao.java
@@ -34,6 +34,7 @@ import com.google.common.base.Objects;
 
 public class PaymentTransactionModelDao extends EntityBase implements EntityModelDao<DirectPaymentTransaction> {
 
+    private UUID attemptId;
     private UUID paymentId;
     private String transactionExternalKey;
     private TransactionType transactionType;
@@ -49,10 +50,11 @@ public class PaymentTransactionModelDao extends EntityBase implements EntityMode
 
     public PaymentTransactionModelDao() { /* For the DAO mapper */ }
 
-    public PaymentTransactionModelDao(final UUID id, @Nullable final String transactionExternalKey, @Nullable final DateTime createdDate, @Nullable final DateTime updatedDate,
+    public PaymentTransactionModelDao(final UUID id, @Nullable final UUID attemptId, @Nullable final String transactionExternalKey, @Nullable final DateTime createdDate, @Nullable final DateTime updatedDate,
                                       final UUID paymentId, final TransactionType transactionType, final DateTime effectiveDate,
                                       final TransactionStatus paymentStatus, final BigDecimal amount, final Currency currency, final String gatewayErrorCode, final String gatewayErrorMsg) {
         super(id, createdDate, updatedDate);
+        this.attemptId = attemptId;
         this.transactionExternalKey = Objects.firstNonNull(transactionExternalKey, id.toString());
         this.paymentId = paymentId;
         this.transactionType = transactionType;
@@ -66,16 +68,24 @@ public class PaymentTransactionModelDao extends EntityBase implements EntityMode
         this.gatewayErrorMsg = gatewayErrorMsg;
     }
 
-    public PaymentTransactionModelDao(@Nullable final DateTime createdDate, @Nullable final DateTime updatedDate,
+    public PaymentTransactionModelDao(@Nullable final DateTime createdDate, @Nullable final DateTime updatedDate, @Nullable final UUID attemptId,
                                       @Nullable final String transactionExternalKey, final UUID paymentId, final TransactionType transactionType, final DateTime effectiveDate,
                                       final TransactionStatus paymentStatus, final BigDecimal amount, final Currency currency, final String gatewayErrorCode, final String gatewayErrorMsg) {
-        this(UUID.randomUUID(), transactionExternalKey, createdDate, updatedDate, paymentId, transactionType, effectiveDate, paymentStatus, amount, currency, gatewayErrorCode, gatewayErrorMsg);
+        this(UUID.randomUUID(), attemptId, transactionExternalKey, createdDate, updatedDate, paymentId, transactionType, effectiveDate, paymentStatus, amount, currency, gatewayErrorCode, gatewayErrorMsg);
     }
 
     public UUID getPaymentId() {
         return paymentId;
     }
 
+    public UUID getAttemptId() {
+        return attemptId;
+    }
+
+    public void setAttemptId(final UUID attemptId) {
+        this.attemptId = attemptId;
+    }
+
     public String getTransactionExternalKey() {
         return transactionExternalKey;
     }
@@ -180,6 +190,9 @@ public class PaymentTransactionModelDao extends EntityBase implements EntityMode
         if (currency != that.currency) {
             return false;
         }
+        if (attemptId != null ? !attemptId.equals(that.attemptId) : that.attemptId != null) {
+            return false;
+        }
         if (paymentId != null ? !paymentId.equals(that.paymentId) : that.paymentId != null) {
             return false;
         }
@@ -214,6 +227,7 @@ public class PaymentTransactionModelDao extends EntityBase implements EntityMode
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + (paymentId != null ? paymentId.hashCode() : 0);
+        result = 31 * result + (attemptId != null ? attemptId.hashCode() : 0);
         result = 31 * result + (transactionExternalKey != null ? transactionExternalKey.hashCode() : 0);
         result = 31 * result + (transactionType != null ? transactionType.hashCode() : 0);
         result = 31 * result + (effectiveDate != null ? effectiveDate.hashCode() : 0);
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
index 6e7ad8b..fb29b85 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
@@ -59,9 +59,9 @@ public interface TransactionSqlDao extends EntitySqlDao<PaymentTransactionModelD
 
     @SqlUpdate
     @Audited(ChangeType.UPDATE)
-    void failOldPendingTransactions(@UUIDCollectionBinder final Collection<String> pendingTransactionIds,
+    int failOldPendingTransactions(@UUIDCollectionBinder final Collection<String> pendingTransactionIds,
                                     @Bind("newTransactionStatus") final String newTransactionStatus,
-                                    @BindBean final InternalTenantContext context);
+                                    @BindBean final InternalCallContext context);
 
     @SqlQuery
     public List<PaymentTransactionModelDao> getByPaymentId(@Bind("paymentId") final UUID paymentId,
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
index 1c4083b..1cc0604 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
@@ -64,6 +64,18 @@ where payment_external_key = :paymentExternalKey
 ;
 >>
 
+/* Does not include tenant info, global */
+getByStateName() ::= <<
+select
+<allTableFields("")>
+from <tableName()>
+where state_name = :stateName
+and created_date \< :createdBeforeDate
+<andCheckSoftDeletionWithComma("")>
+<defaultOrderBy()>
+;
+>>
+
 
 updateAttempt() ::= <<
 update <tableName()>
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
index 4a6f916..35563c0 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
@@ -9,7 +9,8 @@ order by <prefix>effective_date ASC, <recordIdField(prefix)> ASC
 >>
 
 tableFields(prefix) ::= <<
-  <prefix>transaction_external_key
+  <prefix>attempt_id
+, <prefix>transaction_external_key
 , <prefix>transaction_type
 , <prefix>effective_date
 , <prefix>transaction_status
@@ -27,7 +28,8 @@ tableFields(prefix) ::= <<
 >>
 
 tableValues() ::= <<
-  :transactionExternalKey
+  :attemptId
+, :transactionExternalKey
 , :transactionType
 , :effectiveDate
 , :transactionStatus
@@ -78,6 +80,7 @@ where payment_id = :paymentId
 ;
 >>
 
+
 /* Does not include AND_CHECK_TENANT() since this is a global operation */
 getByTransactionStatusPriorDate() ::= <<
 select <allTableFields()>
diff --git a/payment/src/main/resources/org/killbill/billing/payment/ddl.sql b/payment/src/main/resources/org/killbill/billing/payment/ddl.sql
index 0d6a797..9338a54 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/ddl.sql
+++ b/payment/src/main/resources/org/killbill/billing/payment/ddl.sql
@@ -27,6 +27,7 @@ CREATE TABLE payment_attempts (
 CREATE UNIQUE INDEX payment_attempts_id ON payment_attempts(id);
 CREATE INDEX payment_attempts_payment ON payment_attempts(transaction_id);
 CREATE INDEX payment_attempts_payment_key ON payment_attempts(payment_external_key);
+CREATE INDEX payment_attempts_payment_state ON payment_attempts(state_name);
 CREATE INDEX payment_attempts_payment_transaction_key ON payment_attempts(transaction_external_key);
 CREATE INDEX payment_attempts_tenant_account_record_id ON payment_attempts(tenant_record_id, account_record_id);
 
@@ -151,6 +152,7 @@ DROP TABLE IF EXISTS transactions;
 CREATE TABLE transactions (
     record_id int(11) unsigned NOT NULL AUTO_INCREMENT,
     id char(36) NOT NULL,
+    attempt_id char(36) DEFAULT NULL,
     transaction_external_key varchar(255) NOT NULL,
     transaction_type varchar(32) NOT NULL,
     effective_date datetime NOT NULL,
@@ -173,12 +175,14 @@ CREATE TABLE transactions (
 CREATE UNIQUE INDEX transactions_id ON transactions(id);
 CREATE INDEX transactions_payment_id ON transactions(payment_id);
 CREATE INDEX transactions_key ON transactions(transaction_external_key);
+CREATE INDEX transactions_status ON transactions(transaction_status);
 CREATE INDEX transactions_tenant_account_record_id ON transactions(tenant_record_id, account_record_id);
 
 DROP TABLE IF EXISTS transaction_history;
 CREATE TABLE transaction_history (
     record_id int(11) unsigned NOT NULL AUTO_INCREMENT,
     id char(36) NOT NULL,
+    attempt_id char(36) DEFAULT NULL,
     transaction_external_key varchar(255) NOT NULL,
     target_record_id int(11) unsigned NOT NULL,
     transaction_type varchar(32) NOT NULL,
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/MockRetryAuthorizeOperationCallback.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/MockRetryAuthorizeOperationCallback.java
index bc0c24b..b9f59c3 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/MockRetryAuthorizeOperationCallback.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/MockRetryAuthorizeOperationCallback.java
@@ -66,6 +66,7 @@ public class MockRetryAuthorizeOperationCallback extends RetryAuthorizeOperation
 
         final PaymentTransactionModelDao transaction = new PaymentTransactionModelDao(clock.getUTCNow(),
                                                                                       clock.getUTCNow(),
+                                                                                      directPaymentStateContext.getAttemptId(),
                                                                                       directPaymentStateContext.directPaymentTransactionExternalKey,
                                                                                       directPaymentStateContext.directPaymentId,
                                                                                       directPaymentStateContext.transactionType,
@@ -77,6 +78,7 @@ public class MockRetryAuthorizeOperationCallback extends RetryAuthorizeOperation
                                                                                       "");
         final PaymentModelDao paymentModelDao = paymentDao.insertDirectPaymentWithFirstTransaction(payment, transaction, directPaymentStateContext.internalCallContext);
         final DirectPaymentTransaction convertedTransaction = new DefaultDirectPaymentTransaction(transaction.getId(),
+                                                                                                  directPaymentStateContext.getAttemptId(),
                                                                                                   transaction.getTransactionExternalKey(),
                                                                                                   transaction.getCreatedDate(),
                                                                                                   transaction.getUpdatedDate(),
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestDirectPaymentAutomatonDAOHelper.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestDirectPaymentAutomatonDAOHelper.java
index 447639f..a026985 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestDirectPaymentAutomatonDAOHelper.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestDirectPaymentAutomatonDAOHelper.java
@@ -126,6 +126,7 @@ public class TestDirectPaymentAutomatonDAOHelper extends PaymentTestSuiteWithEmb
         // No default payment method
 
         directPaymentStateContext = new DirectPaymentStateContext(directPaymentId,
+                                                                  null,
                                                                   directPaymentExternalKey,
                                                                   directPaymentTransactionExternalKey,
                                                                   TransactionType.CAPTURE,
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestDirectPaymentLeavingStateCallback.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestDirectPaymentLeavingStateCallback.java
index 2e93c53..3e0e11c 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestDirectPaymentLeavingStateCallback.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestDirectPaymentLeavingStateCallback.java
@@ -96,6 +96,7 @@ public class TestDirectPaymentLeavingStateCallback extends PaymentTestSuiteWithE
         final Account account = Mockito.mock(Account.class);
         Mockito.when(account.getId()).thenReturn(UUID.randomUUID());
         directPaymentStateContext = new DirectPaymentStateContext(directPaymentId,
+                                                                  null,
                                                                   UUID.randomUUID().toString(),
                                                                   UUID.randomUUID().toString(),
                                                                   TransactionType.CAPTURE,
@@ -120,6 +121,7 @@ public class TestDirectPaymentLeavingStateCallback extends PaymentTestSuiteWithE
             );
             final PaymentTransactionModelDao newPaymentTransactionModelDao = new PaymentTransactionModelDao(clock.getUTCNow(),
                                                                                                             clock.getUTCNow(),
+                                                                                                            null,
                                                                                                             directPaymentStateContext.getDirectPaymentTransactionExternalKey(),
                                                                                                             directPaymentId,
                                                                                                             directPaymentStateContext.getTransactionType(),
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestDirectPaymentOperation.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestDirectPaymentOperation.java
index e0deb5b..0dbb4ff 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestDirectPaymentOperation.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestDirectPaymentOperation.java
@@ -104,6 +104,7 @@ public class TestDirectPaymentOperation extends PaymentTestSuiteNoDB {
         final GlobalLocker locker = new MemoryGlobalLocker();
         final PluginDispatcher<OperationResult> paymentPluginDispatcher = new PluginDispatcher<OperationResult>(1, Executors.newCachedThreadPool());
         directPaymentStateContext = new DirectPaymentStateContext(UUID.randomUUID(),
+                                                                  null,
                                                                   UUID.randomUUID().toString(),
                                                                   UUID.randomUUID().toString(),
                                                                   TransactionType.CAPTURE,
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestPluginOperation.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestPluginOperation.java
index 08e0fa3..d13f814 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestPluginOperation.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestPluginOperation.java
@@ -202,6 +202,7 @@ public class TestPluginOperation extends PaymentTestSuiteNoDB {
         final PluginDispatcher<OperationResult> paymentPluginDispatcher = new PluginDispatcher<OperationResult>(timeoutSeconds, Executors.newCachedThreadPool());
 
         final DirectPaymentStateContext directPaymentStateContext = new DirectPaymentStateContext(UUID.randomUUID(),
+                                                                                                  null,
                                                                                                   UUID.randomUUID().toString(),
                                                                                                   UUID.randomUUID().toString(),
                                                                                                   TransactionType.CAPTURE,
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryableDirectPayment.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryableDirectPayment.java
index 3c9d8d1..42bc3b4 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryableDirectPayment.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryableDirectPayment.java
@@ -621,7 +621,7 @@ public class TestRetryableDirectPayment extends PaymentTestSuiteNoDB {
                                                       internalCallContext
                                                      );
         paymentDao.insertDirectPaymentWithFirstTransaction(new PaymentModelDao(directPaymentId, utcNow, utcNow, account.getId(), paymentMethodId, -1, directPaymentExternalKey),
-                                                           new PaymentTransactionModelDao(directTransactionId, directPaymentTransactionExternalKey, utcNow, utcNow, directPaymentId, TransactionType.AUTHORIZE, utcNow, TransactionStatus.PAYMENT_FAILURE, amount, currency, "bla", "foo"),
+                                                           new PaymentTransactionModelDao(directTransactionId, attempt.getId(), directPaymentTransactionExternalKey, utcNow, utcNow, directPaymentId, TransactionType.AUTHORIZE, utcNow, TransactionStatus.PAYMENT_FAILURE, amount, currency, "bla", "foo"),
                                                            internalCallContext);
 
         processor.retryPaymentTransaction(attempt.getId(), MockPaymentControlProviderPlugin.PLUGIN_NAME, internalCallContext);
@@ -663,7 +663,7 @@ public class TestRetryableDirectPayment extends PaymentTestSuiteNoDB {
                                                       internalCallContext
                                                      );
         paymentDao.insertDirectPaymentWithFirstTransaction(new PaymentModelDao(directPaymentId, utcNow, utcNow, account.getId(), paymentMethodId, -1, directPaymentExternalKey),
-                                                           new PaymentTransactionModelDao(directTransactionId, directPaymentTransactionExternalKey, utcNow, utcNow, directPaymentId, TransactionType.AUTHORIZE, utcNow,
+                                                           new PaymentTransactionModelDao(directTransactionId, attempt.getId(), directPaymentTransactionExternalKey, utcNow, utcNow, directPaymentId, TransactionType.AUTHORIZE, utcNow,
                                                                                           TransactionStatus.PAYMENT_FAILURE, amount, currency, "bla", "foo"),
                                                            internalCallContext
                                                           );
@@ -711,7 +711,7 @@ public class TestRetryableDirectPayment extends PaymentTestSuiteNoDB {
                                                           internalCallContext
                                                          );
             paymentDao.insertDirectPaymentWithFirstTransaction(new PaymentModelDao(directPaymentId, utcNow, utcNow, account.getId(), paymentMethodId, -1, directPaymentExternalKey),
-                                                               new PaymentTransactionModelDao(directTransactionId, directPaymentTransactionExternalKey, utcNow, utcNow, directPaymentId, TransactionType.AUTHORIZE, utcNow,
+                                                               new PaymentTransactionModelDao(directTransactionId, attempt.getId(), directPaymentTransactionExternalKey, utcNow, utcNow, directPaymentId, TransactionType.AUTHORIZE, utcNow,
                                                                                               TransactionStatus.PAYMENT_FAILURE, amount, currency, "bla", "foo"),
                                                                internalCallContext
                                                               );
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/TestDirectPaymentProcessor.java b/payment/src/test/java/org/killbill/billing/payment/core/TestDirectPaymentProcessor.java
index cfa5141..a36d35e 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/TestDirectPaymentProcessor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/TestDirectPaymentProcessor.java
@@ -75,7 +75,7 @@ public class TestDirectPaymentProcessor extends PaymentTestSuiteWithEmbeddedDB {
 
         // AUTH pre-3DS
         final String authorizationKey = UUID.randomUUID().toString();
-        final DirectPayment authorization = directPaymentProcessor.createAuthorization(true, account, null, null, TEN, CURRENCY, directPaymentExternalKey, authorizationKey,
+        final DirectPayment authorization = directPaymentProcessor.createAuthorization(true, null, account, null, null, TEN, CURRENCY, directPaymentExternalKey, authorizationKey,
                                                                                        SHOULD_LOCK_ACCOUNT, PLUGIN_PROPERTIES, callContext, internalCallContext);
         verifyDirectPayment(authorization, directPaymentExternalKey, TEN, ZERO, ZERO, 1);
         final UUID directPaymentId = authorization.getId();
@@ -84,7 +84,7 @@ public class TestDirectPaymentProcessor extends PaymentTestSuiteWithEmbeddedDB {
 
         // AUTH post-3DS
         final String authorizationPost3DSKey = UUID.randomUUID().toString();
-        final DirectPayment authorizationPost3DS = directPaymentProcessor.createAuthorization(true, account, null, directPaymentId, TEN, CURRENCY, directPaymentExternalKey, authorizationPost3DSKey,
+        final DirectPayment authorizationPost3DS = directPaymentProcessor.createAuthorization(true, null, account, null, directPaymentId, TEN, CURRENCY, directPaymentExternalKey, authorizationPost3DSKey,
                                                                                               SHOULD_LOCK_ACCOUNT, PLUGIN_PROPERTIES, callContext, internalCallContext);
         verifyDirectPayment(authorizationPost3DS, directPaymentExternalKey, TEN, ZERO, ZERO, 2);
         verifyDirectPaymentTransaction(authorizationPost3DS.getTransactions().get(1), authorizationPost3DSKey, TransactionType.AUTHORIZE, TEN, directPaymentId);
@@ -92,7 +92,7 @@ public class TestDirectPaymentProcessor extends PaymentTestSuiteWithEmbeddedDB {
 
         // CAPTURE
         final String capture1Key = UUID.randomUUID().toString();
-        final DirectPayment partialCapture1 = directPaymentProcessor.createCapture(true, account, directPaymentId, FIVE, CURRENCY, capture1Key,
+        final DirectPayment partialCapture1 = directPaymentProcessor.createCapture(true, null, account, directPaymentId, FIVE, CURRENCY, capture1Key,
                                                                                    SHOULD_LOCK_ACCOUNT, PLUGIN_PROPERTIES, callContext, internalCallContext);
         verifyDirectPayment(partialCapture1, directPaymentExternalKey, TEN, FIVE, ZERO, 3);
         verifyDirectPaymentTransaction(partialCapture1.getTransactions().get(2), capture1Key, TransactionType.CAPTURE, FIVE, directPaymentId);
@@ -100,7 +100,7 @@ public class TestDirectPaymentProcessor extends PaymentTestSuiteWithEmbeddedDB {
 
         // CAPTURE
         final String capture2Key = UUID.randomUUID().toString();
-        final DirectPayment partialCapture2 = directPaymentProcessor.createCapture(true, account, directPaymentId, FIVE, CURRENCY, capture2Key,
+        final DirectPayment partialCapture2 = directPaymentProcessor.createCapture(true, null, account, directPaymentId, FIVE, CURRENCY, capture2Key,
                                                                                    SHOULD_LOCK_ACCOUNT, PLUGIN_PROPERTIES, callContext, internalCallContext);
         verifyDirectPayment(partialCapture2, directPaymentExternalKey, TEN, TEN, ZERO, 4);
         verifyDirectPaymentTransaction(partialCapture2.getTransactions().get(3), capture2Key, TransactionType.CAPTURE, FIVE, directPaymentId);
@@ -108,7 +108,7 @@ public class TestDirectPaymentProcessor extends PaymentTestSuiteWithEmbeddedDB {
 
         // REFUND
         final String refund1Key = UUID.randomUUID().toString();
-        final DirectPayment partialRefund1 = directPaymentProcessor.createRefund(true, account, directPaymentId, FIVE, CURRENCY, refund1Key,
+        final DirectPayment partialRefund1 = directPaymentProcessor.createRefund(true, null, account, directPaymentId, FIVE, CURRENCY, refund1Key,
                                                                                  SHOULD_LOCK_ACCOUNT, PLUGIN_PROPERTIES, callContext, internalCallContext);
         verifyDirectPayment(partialRefund1, directPaymentExternalKey, TEN, TEN, FIVE, 5);
         verifyDirectPaymentTransaction(partialRefund1.getTransactions().get(4), refund1Key, TransactionType.REFUND, FIVE, directPaymentId);
@@ -116,7 +116,7 @@ public class TestDirectPaymentProcessor extends PaymentTestSuiteWithEmbeddedDB {
 
         // REFUND
         final String refund2Key = UUID.randomUUID().toString();
-        final DirectPayment partialRefund2 = directPaymentProcessor.createRefund(true, account, directPaymentId, FIVE, CURRENCY, refund2Key,
+        final DirectPayment partialRefund2 = directPaymentProcessor.createRefund(true, null, account, directPaymentId, FIVE, CURRENCY, refund2Key,
                                                                                  SHOULD_LOCK_ACCOUNT, PLUGIN_PROPERTIES, callContext, internalCallContext);
         verifyDirectPayment(partialRefund2, directPaymentExternalKey, TEN, TEN, TEN, 6);
         verifyDirectPaymentTransaction(partialRefund2.getTransactions().get(5), refund2Key, TransactionType.REFUND, FIVE, directPaymentId);
@@ -129,7 +129,7 @@ public class TestDirectPaymentProcessor extends PaymentTestSuiteWithEmbeddedDB {
 
         // AUTH
         final String authorizationKey = UUID.randomUUID().toString();
-        final DirectPayment authorization = directPaymentProcessor.createAuthorization(true, account, null, null, TEN, CURRENCY, directPaymentExternalKey, authorizationKey,
+        final DirectPayment authorization = directPaymentProcessor.createAuthorization(true, null, account, null, null, TEN, CURRENCY, directPaymentExternalKey, authorizationKey,
                                                                                        SHOULD_LOCK_ACCOUNT, PLUGIN_PROPERTIES, callContext, internalCallContext);
         verifyDirectPayment(authorization, directPaymentExternalKey, TEN, ZERO, ZERO, 1);
         final UUID directPaymentId = authorization.getId();
@@ -138,7 +138,7 @@ public class TestDirectPaymentProcessor extends PaymentTestSuiteWithEmbeddedDB {
 
         // VOID
         final String voidKey = UUID.randomUUID().toString();
-        final DirectPayment voidTransaction = directPaymentProcessor.createVoid(true, account, directPaymentId, voidKey,
+        final DirectPayment voidTransaction = directPaymentProcessor.createVoid(true, null, account, directPaymentId, voidKey,
                                                                                 SHOULD_LOCK_ACCOUNT, PLUGIN_PROPERTIES, callContext, internalCallContext);
         verifyDirectPayment(voidTransaction, directPaymentExternalKey, TEN, ZERO, ZERO, 2);
         verifyDirectPaymentTransaction(voidTransaction.getTransactions().get(1), voidKey, TransactionType.VOID, null, directPaymentId);
@@ -151,7 +151,7 @@ public class TestDirectPaymentProcessor extends PaymentTestSuiteWithEmbeddedDB {
 
         // PURCHASE
         final String purchaseKey = UUID.randomUUID().toString();
-        final DirectPayment purchase = directPaymentProcessor.createPurchase(true, account, null, null, TEN, CURRENCY, directPaymentExternalKey, purchaseKey,
+        final DirectPayment purchase = directPaymentProcessor.createPurchase(true, null, account, null, null, TEN, CURRENCY, directPaymentExternalKey, purchaseKey,
                                                                              SHOULD_LOCK_ACCOUNT, PLUGIN_PROPERTIES, callContext, internalCallContext);
         verifyDirectPayment(purchase, directPaymentExternalKey, ZERO, ZERO, ZERO, 1);
         final UUID directPaymentId = purchase.getId();
@@ -165,7 +165,7 @@ public class TestDirectPaymentProcessor extends PaymentTestSuiteWithEmbeddedDB {
 
         // CREDIT
         final String creditKey = UUID.randomUUID().toString();
-        final DirectPayment purchase = directPaymentProcessor.createCredit(true, account, null, null, TEN, CURRENCY, directPaymentExternalKey, creditKey,
+        final DirectPayment purchase = directPaymentProcessor.createCredit(true, null, account, null, null, TEN, CURRENCY, directPaymentExternalKey, creditKey,
                                                                            SHOULD_LOCK_ACCOUNT, PLUGIN_PROPERTIES, callContext, internalCallContext);
         verifyDirectPayment(purchase, directPaymentExternalKey, ZERO, ZERO, ZERO, 1);
         final UUID directPaymentId = purchase.getId();
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
index 6ea5e88..6c6946f 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
@@ -51,14 +51,18 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
-    public void failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalTenantContext context) {
+    public int failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalCallContext context) {
+        int result = 0;
         synchronized (transactions) {
             for (PaymentTransactionModelDao cur : transactions.values()) {
                 cur.setTransactionStatus(newTransactionStatus);
+                result++;
             }
         }
+        return result;
     }
 
+
     @Override
     public PaymentAttemptModelDao insertPaymentAttemptWithProperties(final PaymentAttemptModelDao attempt, final InternalCallContext context) {
         synchronized (this) {
@@ -86,6 +90,11 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
+    public List<PaymentAttemptModelDao> getPaymentAttemptsByState(final String stateName, final DateTime createdBeforeDate, final InternalTenantContext context) {
+        return null;
+    }
+
+    @Override
     public List<PaymentAttemptModelDao> getPaymentAttempts(final String paymentExternalKey, final InternalTenantContext context) {
         synchronized (this) {
             final List<PaymentAttemptModelDao> result = new ArrayList<PaymentAttemptModelDao>();
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/TestDefaultPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/TestDefaultPaymentDao.java
index 769bb22..3036806 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/TestDefaultPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/TestDefaultPaymentDao.java
@@ -120,6 +120,7 @@ public class TestDefaultPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
 
     private PaymentTransactionModelDao generateDirectPaymentTransactionModelDao(final UUID directPaymentId) {
         return new PaymentTransactionModelDao(UUID.randomUUID(),
+                                              null,
                                               UUID.randomUUID().toString(),
                                               clock.getUTCNow(),
                                               clock.getUTCNow(),
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
index 359e00b..44b8be2 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
@@ -107,7 +107,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         final DateTime utcNow = clock.getUTCNow();
 
         final PaymentModelDao paymentModelDao = new PaymentModelDao(utcNow, utcNow, accountId, paymentMethodId, externalKey);
-        final PaymentTransactionModelDao transactionModelDao = new PaymentTransactionModelDao(utcNow, utcNow, transactionExternalKey,
+        final PaymentTransactionModelDao transactionModelDao = new PaymentTransactionModelDao(utcNow, utcNow, null, transactionExternalKey,
                                                                                               paymentModelDao.getId(), TransactionType.AUTHORIZE, utcNow,
                                                                                               TransactionStatus.SUCCESS, BigDecimal.TEN, Currency.AED,
                                                                                               "success", "");
@@ -151,7 +151,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         assertEquals(savedTransaction2.getAmount().compareTo(BigDecimal.TEN), 0);
         assertEquals(savedTransaction2.getCurrency(), Currency.AED);
 
-        final PaymentTransactionModelDao transactionModelDao2 = new PaymentTransactionModelDao(utcNow, utcNow, transactionExternalKey2,
+        final PaymentTransactionModelDao transactionModelDao2 = new PaymentTransactionModelDao(utcNow, utcNow, null, transactionExternalKey2,
                                                                                                paymentModelDao.getId(), TransactionType.AUTHORIZE, utcNow,
                                                                                                TransactionStatus.UNKNOWN, BigDecimal.TEN, Currency.AED,
                                                                                                "success", "");
@@ -265,20 +265,20 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         final DateTime initialTime = clock.getUTCNow();
 
         final PaymentModelDao paymentModelDao = new PaymentModelDao(initialTime, initialTime, accountId, paymentMethodId, externalKey);
-        final PaymentTransactionModelDao transaction1 = new PaymentTransactionModelDao(initialTime, initialTime, transactionExternalKey1,
+        final PaymentTransactionModelDao transaction1 = new PaymentTransactionModelDao(initialTime, initialTime, null, transactionExternalKey1,
                                                                                        paymentModelDao.getId(), TransactionType.AUTHORIZE, initialTime,
                                                                                        TransactionStatus.PENDING, BigDecimal.TEN, Currency.AED,
                                                                                        "pending", "");
 
         paymentDao.insertDirectPaymentWithFirstTransaction(paymentModelDao, transaction1, internalCallContext);
 
-        final PaymentTransactionModelDao transaction2 = new PaymentTransactionModelDao(initialTime, initialTime, transactionExternalKey2,
+        final PaymentTransactionModelDao transaction2 = new PaymentTransactionModelDao(initialTime, initialTime, null, transactionExternalKey2,
                                                                                        paymentModelDao.getId(), TransactionType.AUTHORIZE, initialTime,
                                                                                        TransactionStatus.PENDING, BigDecimal.TEN, Currency.AED,
                                                                                        "pending", "");
         paymentDao.updateDirectPaymentWithNewTransaction(paymentModelDao.getId(), transaction2, internalCallContext);
 
-        final PaymentTransactionModelDao transaction3 = new PaymentTransactionModelDao(initialTime, initialTime, transactionExternalKey3,
+        final PaymentTransactionModelDao transaction3 = new PaymentTransactionModelDao(initialTime, initialTime, null, transactionExternalKey3,
                                                                                        paymentModelDao.getId(), TransactionType.AUTHORIZE, initialTime,
                                                                                        TransactionStatus.SUCCESS, BigDecimal.TEN, Currency.AED,
                                                                                        "success", "");
@@ -294,7 +294,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
                                                                                         UserType.TEST, "Testing", "This is a test",
                                                                                         newTime, newTime);
 
-        final PaymentTransactionModelDao transaction4 = new PaymentTransactionModelDao(initialTime, initialTime, transactionExternalKey4,
+        final PaymentTransactionModelDao transaction4 = new PaymentTransactionModelDao(initialTime, initialTime, null, transactionExternalKey4,
                                                                                        paymentModelDao.getId(), TransactionType.AUTHORIZE, newTime,
                                                                                        TransactionStatus.PENDING, BigDecimal.TEN, Currency.AED,
                                                                                        "pending", "");
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
new file mode 100644
index 0000000..78ad763
--- /dev/null
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.LocalDate;
+import org.killbill.billing.account.api.Account;
+import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.invoice.api.Invoice;
+import org.killbill.billing.invoice.api.InvoiceApiException;
+import org.killbill.billing.invoice.api.InvoiceItem;
+import org.killbill.billing.payment.api.DirectPayment;
+import org.killbill.billing.payment.api.DirectPaymentTransaction;
+import org.killbill.billing.payment.api.PaymentApiException;
+import org.killbill.billing.payment.api.PaymentOptions;
+import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.payment.control.InvoicePaymentControlPluginApi;
+import org.killbill.billing.payment.core.Janitor;
+import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
+import org.killbill.billing.payment.provider.MockPaymentProviderPlugin;
+import org.killbill.billing.platform.api.KillbillConfigSource;
+import org.killbill.bus.api.PersistentBus.EventBusException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+
+import static org.testng.Assert.assertEquals;
+
+public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
+
+    final PaymentOptions INVOICE_PAYMENT = new PaymentOptions() {
+        @Override
+        public boolean isExternalPayment() {
+            return false;
+        }
+
+        @Override
+        public String getPaymentControlPluginName() {
+            return InvoicePaymentControlPluginApi.PLUGIN_NAME;
+        }
+    };
+
+
+    @Inject
+    private Janitor janitor;
+
+    private Account account;
+
+    @Override
+    protected KillbillConfigSource getConfigSource() {
+        return getConfigSource("/payment.properties",
+                               ImmutableMap.<String, String>of("org.killbill.payment.provider.default", MockPaymentProviderPlugin.PLUGIN_NAME,
+                                                               "killbill.payment.engine.events.off", "false",
+                                                               "org.killbill.payment.janitor.rate", "500ms")
+                              );
+    }
+
+    @BeforeMethod(groups = "slow")
+    public void beforeMethod() throws Exception {
+        super.beforeMethod();
+        janitor.start();
+        account = testHelper.createTestAccount("bobo@gmail.com", true);
+    }
+
+    @AfterMethod(groups = "slow")
+    public void afterMethod() throws Exception {
+        super.afterMethod();
+        janitor.stop();
+    }
+
+    @Test(groups = "slow")
+    public void testCreateSuccessPurchaseWithPaymentControl() throws PaymentApiException, InvoiceApiException, EventBusException {
+
+        final BigDecimal requestedAmount = BigDecimal.TEN;
+        final UUID subscriptionId = UUID.randomUUID();
+        final UUID bundleId = UUID.randomUUID();
+        final LocalDate now = clock.getUTCToday();
+
+        final Invoice invoice = testHelper.createTestInvoice(account, now, Currency.USD);
+
+        final String paymentExternalKey = invoice.getId().toString();
+        final String transactionExternalKey = "wouf wouf";
+
+        invoice.addInvoiceItem(new MockRecurringInvoiceItem(invoice.getId(), account.getId(),
+                                                            subscriptionId,
+                                                            bundleId,
+                                                            "test plan",
+                                                            "test phase", null,
+                                                            now,
+                                                            now.plusMonths(1),
+                                                            requestedAmount,
+                                                            new BigDecimal("1.0"),
+                                                            Currency.USD));
+
+        final DirectPayment payment = paymentApi.createPurchaseWithPaymentControl(account, account.getPaymentMethodId(), null, requestedAmount, Currency.USD, paymentExternalKey, transactionExternalKey,
+                                                                                  createPropertiesForInvoice(invoice), INVOICE_PAYMENT, callContext);
+        assertEquals(payment.getTransactions().size(), 1);
+        assertEquals(payment.getTransactions().get(0).getTransactionStatus(), TransactionStatus.SUCCESS);
+        assertEquals(payment.getTransactions().get(0).getTransactionType(), TransactionType.PURCHASE);
+
+        final List<PaymentAttemptModelDao> attempts = paymentDao.getPaymentAttempts(paymentExternalKey, internalCallContext);
+        assertEquals(attempts.size(), 1);
+
+        final PaymentAttemptModelDao attempt = attempts.get(0);
+        assertEquals(attempt.getStateName(), "SUCCESS");
+
+        // Ok now the fun part starts... we modify the attempt state to be 'INIT' and wait the the Janitor to do its job.
+        paymentDao.updatePaymentAttempt(attempt.getId(), attempt.getTransactionId(), "INIT", internalCallContext);
+        final PaymentAttemptModelDao attempt2 = paymentDao.getPaymentAttempt(attempt.getId(), internalCallContext);
+        assertEquals(attempt2.getStateName(), "INIT");
+
+        clock.addDays(1);
+        try { Thread.sleep(1500); } catch (InterruptedException e) {};
+
+        final PaymentAttemptModelDao attempt3 = paymentDao.getPaymentAttempt(attempt.getId(), internalCallContext);
+        assertEquals(attempt3.getStateName(), "SUCCESS");
+    }
+
+    @Test(groups = "slow")
+    public void testCreateSuccessRefundPaymentControlWithItemAdjustments() throws PaymentApiException, InvoiceApiException, EventBusException {
+
+        final BigDecimal requestedAmount = BigDecimal.TEN;
+        final UUID subscriptionId = UUID.randomUUID();
+        final UUID bundleId = UUID.randomUUID();
+        final LocalDate now = clock.getUTCToday();
+
+        final Invoice invoice = testHelper.createTestInvoice(account, now, Currency.USD);
+
+        final String paymentExternalKey = invoice.getId().toString();
+        final String transactionExternalKey = "craboom";
+        final String transactionExternalKey2 = "qwerty";
+
+        final InvoiceItem invoiceItem = new MockRecurringInvoiceItem(invoice.getId(), account.getId(),
+                                                                     subscriptionId,
+                                                                     bundleId,
+                                                                     "test plan", "test phase", null,
+                                                                     now,
+                                                                     now.plusMonths(1),
+                                                                     requestedAmount,
+                                                                     new BigDecimal("1.0"),
+                                                                     Currency.USD);
+        invoice.addInvoiceItem(invoiceItem);
+
+        final DirectPayment payment = paymentApi.createPurchaseWithPaymentControl(account, account.getPaymentMethodId(), null, requestedAmount, Currency.USD, paymentExternalKey, transactionExternalKey,
+                                                                                  createPropertiesForInvoice(invoice), INVOICE_PAYMENT, callContext);
+
+        final List<PluginProperty> refundProperties = new ArrayList<PluginProperty>();
+        final HashMap<UUID, BigDecimal> uuidBigDecimalHashMap = new HashMap<UUID, BigDecimal>();
+        uuidBigDecimalHashMap.put(invoiceItem.getId(), new BigDecimal("1.0"));
+        final PluginProperty refundIdsProp = new PluginProperty(InvoicePaymentControlPluginApi.PROP_IPCD_REFUND_IDS_WITH_AMOUNT_KEY, uuidBigDecimalHashMap, false);
+        refundProperties.add(refundIdsProp);
+
+        final DirectPayment payment2 = paymentApi.createRefundWithPaymentControl(account, payment.getId(), null, Currency.USD, transactionExternalKey2,
+                                                                                 refundProperties, INVOICE_PAYMENT, callContext);
+
+        assertEquals(payment2.getTransactions().size(), 2);
+        DirectPaymentTransaction refundTransaction = payment2.getTransactions().get(1);
+        assertEquals(refundTransaction.getTransactionType(), TransactionType.REFUND);
+
+        final List<PaymentAttemptModelDao> attempts = paymentDao.getPaymentAttempts(paymentExternalKey, internalCallContext);
+        assertEquals(attempts.size(), 2);
+
+        final PaymentAttemptModelDao refundAttempt = attempts.get(1);
+        assertEquals(refundAttempt.getTransactionType(), TransactionType.REFUND);
+
+        // Ok now the fun part starts... we modify the attempt state to be 'INIT' and wait the the Janitor to do its job.
+        paymentDao.updatePaymentAttempt(refundAttempt.getId(), refundAttempt.getTransactionId(), "INIT", internalCallContext);
+        final PaymentAttemptModelDao attempt2 = paymentDao.getPaymentAttempt(refundAttempt.getId(), internalCallContext);
+        assertEquals(attempt2.getStateName(), "INIT");
+
+        clock.addDays(1);
+        try { Thread.sleep(1500); } catch (InterruptedException e) {};
+
+        final PaymentAttemptModelDao attempt3 = paymentDao.getPaymentAttempt(refundAttempt.getId(), internalCallContext);
+        assertEquals(attempt3.getStateName(), "SUCCESS");
+
+    }
+
+
+
+    private List<PluginProperty> createPropertiesForInvoice(final Invoice invoice) {
+        final List<PluginProperty> result = new ArrayList<PluginProperty>();
+        result.add(new PluginProperty(InvoicePaymentControlPluginApi.PROP_IPCD_INVOICE_ID, invoice.getId().toString(), false));
+        return result;
+    }
+}
+
diff --git a/util/src/main/java/org/killbill/billing/util/callcontext/InternalCallContextFactory.java b/util/src/main/java/org/killbill/billing/util/callcontext/InternalCallContextFactory.java
index 6aa8259..71a1fe9 100644
--- a/util/src/main/java/org/killbill/billing/util/callcontext/InternalCallContextFactory.java
+++ b/util/src/main/java/org/killbill/billing/util/callcontext/InternalCallContextFactory.java
@@ -22,14 +22,13 @@ import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import org.joda.time.DateTime;
-
 import org.killbill.billing.ObjectType;
-import org.killbill.clock.Clock;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.util.cache.Cachable.CacheType;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.dao.NonEntityDao;
+import org.killbill.clock.Clock;
 
 import com.google.common.base.Objects;
 
@@ -88,6 +87,12 @@ public class InternalCallContextFactory {
         return new InternalTenantContext(tenantRecordId, accountRecordId);
     }
 
+    public InternalTenantContext createInternalTenantContext(final UUID accountId, final UUID objectId, final ObjectType objectType) {
+        final Long tenantRecordId = getTenantRecordId(objectId, objectType);
+        final Long accountRecordId = getAccountRecordId(accountId, ObjectType.ACCOUNT);
+        return new InternalTenantContext(tenantRecordId, accountRecordId);
+    }
+
     /**
      * Crate an internal tenant callcontext from a tenant callcontext, and retrieving the account_record_id from another table
      *
@@ -250,6 +255,10 @@ public class InternalCallContextFactory {
         return nonEntityDao.retrieveAccountRecordIdFromObject(objectId, objectType, cacheControllerDispatcher.getCacheController(CacheType.ACCOUNT_RECORD_ID));
     }
 
+    private Long getTenantRecordId(final UUID objectId, final ObjectType objectType) {
+        return nonEntityDao.retrieveTenantRecordIdFromObject(objectId, objectType, cacheControllerDispatcher.getCacheController(CacheType.TENANT_RECORD_ID));
+    }
+
     private Long getTenantRecordId(final TenantContext context) {
         // Default to single default tenant (e.g. single tenant mode)
         // TODO Extract this convention (e.g. BusinessAnalyticsBase needs to know about it)
diff --git a/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java b/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
index 67f5014..ef042d0 100644
--- a/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
+++ b/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
@@ -70,6 +70,11 @@ public interface PaymentConfig extends KillbillConfig {
     @Description("Delay after which pending transactions should be marked as failed")
     public TimeSpan getJanitorPendingCleanupTime();
 
+    @Config("org.killbill.payment.janitor.attempts")
+    @Default("15m")
+    @Description("Delay after which incomplete  attempts should be completed")
+    public TimeSpan getJanitorAttemptCompletionTime();
+
     @Config("org.killbill.payment.janitor.rate")
     @Default("1h")
     @Description("Rate at which janitor tasks are scheduled")