killbill-memoizeit

Merge pull request #1018 from killbill/fix-for-1017 payment:

6/29/2018 6:19:26 PM

Details

diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestInvoicePayment.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestInvoicePayment.java
index 4eca162..8b424b0 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestInvoicePayment.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestInvoicePayment.java
@@ -54,6 +54,7 @@ import org.killbill.billing.payment.api.PaymentTransaction;
 import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.invoice.InvoicePaymentControlPluginApi;
+import org.killbill.billing.payment.plugin.api.PaymentPluginStatus;
 import org.killbill.xmlloader.XMLLoader;
 import org.mockito.Mockito;
 import org.skife.jdbi.v2.Handle;
@@ -1230,4 +1231,113 @@ public class TestInvoicePayment extends TestIntegrationBase {
         assertEquals(payments2.get(0).getPaymentAttempts().get(0).getPluginName(), InvoicePaymentControlPluginApi.PLUGIN_NAME);
         assertEquals(payments2.get(0).getPaymentAttempts().get(0).getStateName(), "SUCCESS");
     }
+
+    @Test(groups = "slow")
+    public void testWithUNKNOWNPaymentFixedToSuccessViaJanitorFlow() throws Exception {
+        // Verify integration with Overdue in that particular test
+        final String configXml = "<overdueConfig>" +
+                                 "   <accountOverdueStates>" +
+                                 "       <initialReevaluationInterval>" +
+                                 "           <unit>DAYS</unit><number>1</number>" +
+                                 "       </initialReevaluationInterval>" +
+                                 "       <state name=\"OD1\">" +
+                                 "           <condition>" +
+                                 "               <timeSinceEarliestUnpaidInvoiceEqualsOrExceeds>" +
+                                 "                   <unit>DAYS</unit><number>1</number>" +
+                                 "               </timeSinceEarliestUnpaidInvoiceEqualsOrExceeds>" +
+                                 "           </condition>" +
+                                 "           <externalMessage>Reached OD1</externalMessage>" +
+                                 "           <blockChanges>true</blockChanges>" +
+                                 "           <disableEntitlementAndChangesBlocked>false</disableEntitlementAndChangesBlocked>" +
+                                 "       </state>" +
+                                 "   </accountOverdueStates>" +
+                                 "</overdueConfig>";
+        final InputStream is = new ByteArrayInputStream(configXml.getBytes());
+        final DefaultOverdueConfig config = XMLLoader.getObjectFromStreamNoValidation(is, DefaultOverdueConfig.class);
+        overdueConfigCache.loadDefaultOverdueConfig(config);
+
+        clock.setDay(new LocalDate(2012, 4, 1));
+
+        final AccountData accountData = getAccountData(1);
+        final Account account = createAccountWithNonOsgiPaymentMethod(accountData);
+        accountChecker.checkAccount(account.getId(), accountData, callContext);
+
+        checkODState(OverdueWrapper.CLEAR_STATE_NAME, account.getId());
+
+        paymentPlugin.makeNextPaymentUnknown();
+
+        final DefaultEntitlement baseEntitlement = createBaseEntitlementAndCheckForCompletion(account.getId(), "bundleKey", "Shotgun", ProductCategory.BASE, BillingPeriod.MONTHLY, NextEvent.CREATE, NextEvent.BLOCK, NextEvent.INVOICE);
+
+        addDaysAndCheckForCompletion(30, NextEvent.PHASE, NextEvent.INVOICE, NextEvent.PAYMENT_PLUGIN_ERROR, NextEvent.INVOICE_PAYMENT_ERROR);
+
+        invoiceChecker.checkChargedThroughDate(baseEntitlement.getId(), new LocalDate(2012, 6, 1), callContext);
+
+        final List<Invoice> invoices = invoiceUserApi.getInvoicesByAccount(account.getId(), false, false, callContext);
+        assertEquals(invoices.size(), 2);
+
+        final Invoice invoice1 = invoices.get(0).getInvoiceItems().get(0).getInvoiceItemType() == InvoiceItemType.RECURRING ?
+                                 invoices.get(0) : invoices.get(1);
+        assertTrue(invoice1.getBalance().compareTo(new BigDecimal("249.95")) == 0);
+        assertTrue(invoice1.getPaidAmount().compareTo(BigDecimal.ZERO) == 0);
+        assertTrue(invoice1.getChargedAmount().compareTo(new BigDecimal("249.95")) == 0);
+        assertEquals(invoice1.getPayments().size(), 1);
+        assertEquals(invoice1.getPayments().get(0).getAmount().compareTo(BigDecimal.ZERO), 0);
+        assertEquals(invoice1.getPayments().get(0).getCurrency(), Currency.USD);
+        assertFalse(invoice1.getPayments().get(0).isSuccess());
+        assertNotNull(invoice1.getPayments().get(0).getPaymentId());
+
+        final BigDecimal accountBalance1 = invoiceUserApi.getAccountBalance(account.getId(), callContext);
+        assertTrue(accountBalance1.compareTo(new BigDecimal("249.95")) == 0);
+
+        final List<Payment> payments = paymentApi.getAccountPayments(account.getId(), false, true, ImmutableList.<PluginProperty>of(), callContext);
+        assertEquals(payments.size(), 1);
+        assertEquals(payments.get(0).getPurchasedAmount().compareTo(BigDecimal.ZERO), 0);
+        assertEquals(payments.get(0).getTransactions().size(), 1);
+        assertEquals(payments.get(0).getTransactions().get(0).getAmount().compareTo(new BigDecimal("249.95")), 0);
+        assertEquals(payments.get(0).getTransactions().get(0).getCurrency(), Currency.USD);
+        assertEquals(payments.get(0).getTransactions().get(0).getProcessedAmount().compareTo(BigDecimal.ZERO), 0);
+        assertEquals(payments.get(0).getTransactions().get(0).getProcessedCurrency(), Currency.USD);
+        assertEquals(payments.get(0).getTransactions().get(0).getTransactionStatus(), TransactionStatus.UNKNOWN);
+        assertEquals(payments.get(0).getPaymentAttempts().size(), 1);
+        assertEquals(payments.get(0).getPaymentAttempts().get(0).getPluginName(), InvoicePaymentControlPluginApi.PLUGIN_NAME);
+        assertEquals(payments.get(0).getPaymentAttempts().get(0).getStateName(), "ABORTED");
+
+        // Verify account transitions to OD1
+        addDaysAndCheckForCompletion(2, NextEvent.BLOCK);
+        checkODState("OD1", account.getId());
+
+        // Transition the payment to success (Janitor flow)
+        busHandler.pushExpectedEvents(NextEvent.PAYMENT, NextEvent.INVOICE_PAYMENT, NextEvent.BLOCK);
+        paymentPlugin.overridePaymentPluginStatus(payments.get(0).getId(), payments.get(0).getTransactions().get(0).getId(), PaymentPluginStatus.PROCESSED);
+        paymentApi.getPayment(payments.get(0).getId(), true, true, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        checkODState(OverdueWrapper.CLEAR_STATE_NAME, account.getId());
+
+        final Invoice invoice2 = invoiceUserApi.getInvoice(invoice1.getId(), callContext);
+        assertTrue(invoice2.getBalance().compareTo(BigDecimal.ZERO) == 0);
+        assertTrue(invoice2.getPaidAmount().compareTo(new BigDecimal("249.95")) == 0);
+        assertTrue(invoice2.getChargedAmount().compareTo(new BigDecimal("249.95")) == 0);
+        assertEquals(invoice2.getPayments().size(), 1);
+        assertEquals(invoice2.getPayments().get(0).getAmount().compareTo(new BigDecimal("249.95")), 0);
+        assertEquals(invoice2.getPayments().get(0).getCurrency(), Currency.USD);
+        assertTrue(invoice2.getPayments().get(0).isSuccess());
+        assertNotNull(invoice2.getPayments().get(0).getPaymentId());
+
+        final BigDecimal accountBalance2 = invoiceUserApi.getAccountBalance(account.getId(), callContext);
+        assertTrue(accountBalance2.compareTo(BigDecimal.ZERO) == 0);
+
+        final List<Payment> payments2 = paymentApi.getAccountPayments(account.getId(), true, true, ImmutableList.<PluginProperty>of(), callContext);
+        assertEquals(payments2.size(), 1);
+        assertEquals(payments2.get(0).getPurchasedAmount().compareTo(new BigDecimal("249.95")), 0);
+        assertEquals(payments2.get(0).getTransactions().size(), 1);
+        assertEquals(payments2.get(0).getTransactions().get(0).getAmount().compareTo(new BigDecimal("249.95")), 0);
+        assertEquals(payments2.get(0).getTransactions().get(0).getCurrency(), Currency.USD);
+        assertEquals(payments2.get(0).getTransactions().get(0).getProcessedAmount().compareTo(new BigDecimal("249.95")), 0);
+        assertEquals(payments2.get(0).getTransactions().get(0).getProcessedCurrency(), Currency.USD);
+        assertEquals(payments2.get(0).getTransactions().get(0).getTransactionStatus(), TransactionStatus.SUCCESS);
+        assertEquals(payments2.get(0).getPaymentAttempts().size(), 1);
+        assertEquals(payments2.get(0).getPaymentAttempts().get(0).getPluginName(), InvoicePaymentControlPluginApi.PLUGIN_NAME);
+        assertEquals(payments2.get(0).getPaymentAttempts().get(0).getStateName(), "SUCCESS");
+    }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentApi.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentApi.java
index 1418ef1..e4b513f 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentApi.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentApi.java
@@ -31,6 +31,7 @@ import org.killbill.billing.ObjectType;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.payment.core.PaymentControlAwareRefresher;
 import org.killbill.billing.payment.core.PaymentMethodProcessor;
 import org.killbill.billing.payment.core.PaymentProcessor;
 import org.killbill.billing.payment.core.PluginControlPaymentProcessor;
@@ -61,18 +62,26 @@ public class DefaultPaymentApi extends DefaultApiBase implements PaymentApi {
 
     private static final Logger log = LoggerFactory.getLogger(DefaultPaymentApi.class);
 
+    private final PaymentControlAwareRefresher paymentControlAwareRefresher;
     private final PaymentProcessor paymentProcessor;
     private final PaymentMethodProcessor paymentMethodProcessor;
     private final PluginControlPaymentProcessor pluginControlPaymentProcessor;
     private final PaymentDao paymentDao;
 
     @Inject
-    public DefaultPaymentApi(final PaymentConfig paymentConfig, final PaymentProcessor paymentProcessor, final PaymentMethodProcessor paymentMethodProcessor, final PluginControlPaymentProcessor pluginControlPaymentProcessor, final PaymentDao paymentDao, final InternalCallContextFactory internalCallContextFactory) {
+    public DefaultPaymentApi(final PaymentConfig paymentConfig,
+                             final PaymentProcessor paymentProcessor,
+                             final PaymentMethodProcessor paymentMethodProcessor,
+                             final PluginControlPaymentProcessor pluginControlPaymentProcessor,
+                             final PaymentDao paymentDao,
+                             final InternalCallContextFactory internalCallContextFactory,
+                             final PaymentControlAwareRefresher paymentControlAwareRefresher) {
         super(paymentConfig, internalCallContextFactory);
         this.paymentProcessor = paymentProcessor;
         this.paymentMethodProcessor = paymentMethodProcessor;
         this.pluginControlPaymentProcessor = pluginControlPaymentProcessor;
         this.paymentDao = paymentDao;
+        this.paymentControlAwareRefresher = paymentControlAwareRefresher;
     }
 
     @Override
@@ -926,22 +935,22 @@ public class DefaultPaymentApi extends DefaultApiBase implements PaymentApi {
 
     @Override
     public List<Payment> getAccountPayments(final UUID accountId, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext) throws PaymentApiException {
-        return paymentProcessor.getAccountPayments(accountId, withPluginInfo, withAttempts, tenantContext, internalCallContextFactory.createInternalTenantContext(accountId, tenantContext));
+        return paymentControlAwareRefresher.getAccountPayments(accountId, withPluginInfo, withAttempts, tenantContext, internalCallContextFactory.createInternalTenantContext(accountId, tenantContext));
     }
 
     @Override
     public Pagination<Payment> getPayments(final Long offset, final Long limit, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext context) {
-        return paymentProcessor.getPayments(offset, limit, withPluginInfo, withAttempts, properties, context, internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(context));
+        return paymentControlAwareRefresher.getPayments(offset, limit, withPluginInfo, withAttempts, properties, context, internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(context));
     }
 
     @Override
     public Pagination<Payment> getPayments(final Long offset, final Long limit, final String pluginName, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext) throws PaymentApiException {
-        return paymentProcessor.getPayments(offset, limit, pluginName, withPluginInfo, withAttempts, properties, tenantContext, internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(tenantContext));
+        return paymentControlAwareRefresher.getPayments(offset, limit, pluginName, withPluginInfo, withAttempts, properties, tenantContext, internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(tenantContext));
     }
 
     @Override
     public Payment getPayment(final UUID paymentId, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext context) throws PaymentApiException {
-        final Payment payment = paymentProcessor.getPayment(paymentId, withPluginInfo, withAttempts, properties, context, internalCallContextFactory.createInternalTenantContext(paymentId, ObjectType.PAYMENT, context));
+        final Payment payment = paymentControlAwareRefresher.getPayment(paymentId, withPluginInfo, withAttempts, properties, context, internalCallContextFactory.createInternalTenantContext(paymentId, ObjectType.PAYMENT, context));
         if (payment == null) {
             throw new PaymentApiException(ErrorCode.PAYMENT_NO_SUCH_PAYMENT, paymentId);
         }
@@ -951,7 +960,7 @@ public class DefaultPaymentApi extends DefaultApiBase implements PaymentApi {
     @Override
     public Payment getPaymentByExternalKey(final String paymentExternalKey, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext)
             throws PaymentApiException {
-        final Payment payment = paymentProcessor.getPaymentByExternalKey(paymentExternalKey, withPluginInfo, withAttempts, properties, tenantContext, internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(tenantContext));
+        final Payment payment = paymentControlAwareRefresher.getPaymentByExternalKey(paymentExternalKey, withPluginInfo, withAttempts, properties, tenantContext, internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(tenantContext));
         if (payment == null) {
             throw new PaymentApiException(ErrorCode.PAYMENT_NO_SUCH_PAYMENT, paymentExternalKey);
         }
@@ -960,12 +969,12 @@ public class DefaultPaymentApi extends DefaultApiBase implements PaymentApi {
 
     @Override
     public Pagination<Payment> searchPayments(final String searchKey, final Long offset, final Long limit, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext context) {
-        return paymentProcessor.searchPayments(searchKey, offset, limit, withPluginInfo, withAttempts, properties, context, internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(context));
+        return paymentControlAwareRefresher.searchPayments(searchKey, offset, limit, withPluginInfo, withAttempts, properties, context, internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(context));
     }
 
     @Override
     public Pagination<Payment> searchPayments(final String searchKey, final Long offset, final Long limit, final String pluginName, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext context) throws PaymentApiException {
-        return paymentProcessor.searchPayments(searchKey, offset, limit, pluginName, withPluginInfo, withAttempts, properties, context, internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(context));
+        return paymentControlAwareRefresher.searchPayments(searchKey, offset, limit, pluginName, withPluginInfo, withAttempts, properties, context, internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(context));
     }
     @Override
     public UUID addPaymentMethod(final Account account, final String paymentMethodExternalKey, final String pluginName,
@@ -1078,7 +1087,7 @@ public class DefaultPaymentApi extends DefaultApiBase implements PaymentApi {
 
     @Override
     public Payment getPaymentByTransactionId(final UUID transactionId, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext context) throws PaymentApiException {
-        final Payment payment = paymentProcessor.getPaymentByTransactionId(transactionId, withPluginInfo, withAttempts, properties, context, internalCallContextFactory.createInternalTenantContext(transactionId, ObjectType.TRANSACTION, context));
+        final Payment payment = paymentControlAwareRefresher.getPaymentByTransactionId(transactionId, withPluginInfo, withAttempts, properties, context, internalCallContextFactory.createInternalTenantContext(transactionId, ObjectType.TRANSACTION, context));
         if (payment == null) {
             throw new PaymentApiException(ErrorCode.PAYMENT_NO_SUCH_PAYMENT, transactionId);
         }
@@ -1087,7 +1096,7 @@ public class DefaultPaymentApi extends DefaultApiBase implements PaymentApi {
 
     @Override
     public Payment getPaymentByTransactionExternalKey(final String transactionExternalKey, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext context) throws PaymentApiException {
-        final Payment payment = paymentProcessor.getPaymentByTransactionExternalKey(transactionExternalKey, withPluginInfo, withAttempts, properties, context, internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(context));
+        final Payment payment = paymentControlAwareRefresher.getPaymentByTransactionExternalKey(transactionExternalKey, withPluginInfo, withAttempts, properties, context, internalCallContextFactory.createInternalTenantContextWithoutAccountRecordId(context));
         if (payment == null) {
             throw new PaymentApiException(ErrorCode.PAYMENT_NO_SUCH_PAYMENT, transactionExternalKey);
         }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentControlAwareRefresher.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentControlAwareRefresher.java
new file mode 100644
index 0000000..293a25d
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentControlAwareRefresher.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2014-2018 Groupon, Inc
+ * Copyright 2014-2018 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core;
+
+import java.util.List;
+
+import javax.inject.Inject;
+
+import org.killbill.billing.account.api.AccountInternalApi;
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.invoice.api.InvoiceInternalApi;
+import org.killbill.billing.payment.core.janitor.IncompletePaymentAttemptTask;
+import org.killbill.billing.payment.core.janitor.IncompletePaymentTransactionTask;
+import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
+import org.killbill.billing.payment.dao.PaymentDao;
+import org.killbill.billing.payment.dao.PaymentModelDao;
+import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
+import org.killbill.billing.tag.TagInternalApi;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.clock.Clock;
+import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+public class PaymentControlAwareRefresher extends PaymentRefresher {
+
+    private static final Logger log = LoggerFactory.getLogger(PaymentControlAwareRefresher.class);
+
+    private final IncompletePaymentAttemptTask incompletePaymentAttemptTask;
+
+    @Inject
+    public PaymentControlAwareRefresher(final PaymentPluginServiceRegistration paymentPluginServiceRegistration,
+                                        final AccountInternalApi accountUserApi,
+                                        final PaymentDao paymentDao,
+                                        final TagInternalApi tagUserApi,
+                                        final GlobalLocker locker,
+                                        final InternalCallContextFactory internalCallContextFactory,
+                                        final InvoiceInternalApi invoiceApi,
+                                        final Clock clock,
+                                        final IncompletePaymentTransactionTask incompletePaymentTransactionTask,
+                                        final NotificationQueueService notificationQueueService,
+                                        final IncompletePaymentAttemptTask incompletePaymentAttemptTask) {
+        super(paymentPluginServiceRegistration, accountUserApi, paymentDao, tagUserApi, locker, internalCallContextFactory, invoiceApi, clock, notificationQueueService, incompletePaymentTransactionTask);
+        this.incompletePaymentAttemptTask = incompletePaymentAttemptTask;
+    }
+
+    @Override
+    protected void onJanitorChange(final PaymentTransactionModelDao curPaymentTransactionModelDao, final InternalTenantContext internalTenantContext) {
+        // If there is a payment attempt associated with that transaction, we need to update it as well
+        final List<PaymentAttemptModelDao> paymentAttemptsModelDao = paymentDao.getPaymentAttemptByTransactionExternalKey(curPaymentTransactionModelDao.getTransactionExternalKey(), internalTenantContext);
+        final PaymentAttemptModelDao paymentAttemptModelDao = Iterables.<PaymentAttemptModelDao>tryFind(paymentAttemptsModelDao,
+                                                                                                        new Predicate<PaymentAttemptModelDao>() {
+                                                                                                            @Override
+                                                                                                            public boolean apply(final PaymentAttemptModelDao input) {
+                                                                                                                return curPaymentTransactionModelDao.getId().equals(input.getTransactionId());
+                                                                                                            }
+                                                                                                        }).orNull();
+        if (paymentAttemptModelDao != null) {
+            // We can re-use the logic from IncompletePaymentAttemptTask as it is doing very similar work (i.e. run the completion part of
+            // the state machine to call the plugins and update the attempt in the right terminal state)
+            incompletePaymentAttemptTask.doIteration(paymentAttemptModelDao);
+        }
+    }
+}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
index 2074576..d5cf41d 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
@@ -19,15 +19,10 @@
 package org.killbill.billing.payment.core;
 
 import java.math.BigDecimal;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 import javax.annotation.Nullable;
@@ -40,20 +35,13 @@ import org.killbill.billing.account.api.Account;
 import org.killbill.billing.account.api.AccountApiException;
 import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.callcontext.InternalCallContext;
-import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.api.Currency;
 import org.killbill.billing.invoice.api.InvoiceInternalApi;
-import org.killbill.billing.payment.api.DefaultPayment;
-import org.killbill.billing.payment.api.DefaultPaymentAttempt;
-import org.killbill.billing.payment.api.DefaultPaymentTransaction;
 import org.killbill.billing.payment.api.Payment;
 import org.killbill.billing.payment.api.PaymentApiException;
-import org.killbill.billing.payment.api.PaymentAttempt;
-import org.killbill.billing.payment.api.PaymentTransaction;
 import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
-import org.killbill.billing.payment.core.janitor.IncompletePaymentTransactionTask;
 import org.killbill.billing.payment.core.sm.PaymentAutomatonDAOHelper;
 import org.killbill.billing.payment.core.sm.PaymentAutomatonRunner;
 import org.killbill.billing.payment.core.sm.PaymentStateContext;
@@ -61,22 +49,14 @@ import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
 import org.killbill.billing.payment.dao.PaymentDao;
 import org.killbill.billing.payment.dao.PaymentModelDao;
 import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
-import org.killbill.billing.payment.dao.PluginPropertySerializer;
-import org.killbill.billing.payment.dao.PluginPropertySerializer.PluginPropertySerializerException;
 import org.killbill.billing.payment.glue.DefaultPaymentService;
 import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
-import org.killbill.billing.payment.plugin.api.PaymentPluginApiException;
 import org.killbill.billing.payment.plugin.api.PaymentTransactionInfoPlugin;
 import org.killbill.billing.payment.retry.DefaultRetryService;
 import org.killbill.billing.payment.retry.PaymentRetryNotificationKey;
 import org.killbill.billing.tag.TagInternalApi;
 import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
-import org.killbill.billing.util.callcontext.TenantContext;
-import org.killbill.billing.util.entity.DefaultPagination;
-import org.killbill.billing.util.entity.Pagination;
-import org.killbill.billing.util.entity.dao.DefaultPaginationHelper.EntityPaginationBuilder;
-import org.killbill.billing.util.entity.dao.DefaultPaginationHelper.SourcePaginationBuilder;
 import org.killbill.clock.Clock;
 import org.killbill.commons.locker.GlobalLocker;
 import org.killbill.notificationq.api.NotificationEvent;
@@ -87,31 +67,20 @@ import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificatio
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Ordering;
-
-import static org.killbill.billing.util.entity.dao.DefaultPaginationHelper.getEntityPagination;
-import static org.killbill.billing.util.entity.dao.DefaultPaginationHelper.getEntityPaginationFromPlugins;
 
 public class PaymentProcessor extends ProcessorBase {
 
+    private static final Logger log = LoggerFactory.getLogger(PaymentProcessor.class);
+
     private static final ImmutableList<PluginProperty> PLUGIN_PROPERTIES = ImmutableList.<PluginProperty>of();
 
     private final PaymentAutomatonRunner paymentAutomatonRunner;
-    private final IncompletePaymentTransactionTask incompletePaymentTransactionTask;
     private final NotificationQueueService notificationQueueService;
-
-    private static final Logger log = LoggerFactory.getLogger(PaymentProcessor.class);
-
-    public static final String SCHEDULED = "SCHEDULED";
-
+    private final PaymentRefresher paymentRefresher;
 
     @Inject
     public PaymentProcessor(final PaymentPluginServiceRegistration paymentPluginServiceRegistration,
@@ -122,13 +91,12 @@ public class PaymentProcessor extends ProcessorBase {
                             final InternalCallContextFactory internalCallContextFactory,
                             final GlobalLocker locker,
                             final PaymentAutomatonRunner paymentAutomatonRunner,
-                            final IncompletePaymentTransactionTask incompletePaymentTransactionTask,
                             final NotificationQueueService notificationQueueService,
-                            final Clock clock) {
+                            final Clock clock, final PaymentRefresher paymentRefresher) {
         super(paymentPluginServiceRegistration, accountUserApi, paymentDao, tagUserApi, locker, internalCallContextFactory, invoiceApi, clock);
         this.paymentAutomatonRunner = paymentAutomatonRunner;
-        this.incompletePaymentTransactionTask = incompletePaymentTransactionTask;
         this.notificationQueueService = notificationQueueService;
+        this.paymentRefresher = paymentRefresher;
     }
 
     public Payment createAuthorization(final boolean isApiPayment, @Nullable final UUID attemptId, final Account account, @Nullable final UUID paymentMethodId, @Nullable final UUID paymentId, final BigDecimal amount, final Currency currency, @Nullable final DateTime effectiveDate,
@@ -190,200 +158,6 @@ public class PaymentProcessor extends ProcessorBase {
                                 overridePluginResult, PLUGIN_PROPERTIES, callContext, internalCallContext);
     }
 
-    public List<Payment> getAccountPayments(final UUID accountId, final boolean withPluginInfo, final boolean withAttempts, final TenantContext context, final InternalTenantContext tenantContext) throws PaymentApiException {
-        final List<PaymentModelDao> paymentsModelDao = paymentDao.getPaymentsForAccount(accountId, tenantContext);
-        final List<PaymentTransactionModelDao> transactionsModelDao = paymentDao.getTransactionsForAccount(accountId, tenantContext);
-
-        final Map<UUID, PaymentPluginApi> paymentPluginByPaymentMethodId = new HashMap<UUID, PaymentPluginApi>();
-        final Collection<UUID> absentPlugins = new HashSet<UUID>();
-        final List<Payment> transformedPayments = Lists.<PaymentModelDao, Payment>transform(paymentsModelDao,
-                                                                                            new Function<PaymentModelDao, Payment>() {
-                                                                                                @Override
-                                                                                                public Payment apply(final PaymentModelDao paymentModelDao) {
-                                                                                                    List<PaymentTransactionInfoPlugin> pluginInfo = null;
-
-                                                                                                    if (withPluginInfo) {
-                                                                                                        PaymentPluginApi pluginApi = paymentPluginByPaymentMethodId.get(paymentModelDao.getPaymentMethodId());
-                                                                                                        if (pluginApi == null && !absentPlugins.contains(paymentModelDao.getPaymentMethodId())) {
-                                                                                                            try {
-                                                                                                                pluginApi = getPaymentProviderPlugin(paymentModelDao.getPaymentMethodId(), true, tenantContext);
-                                                                                                                paymentPluginByPaymentMethodId.put(paymentModelDao.getPaymentMethodId(), pluginApi);
-                                                                                                            } catch (final PaymentApiException e) {
-                                                                                                                log.warn("Unable to retrieve pluginApi for payment method " + paymentModelDao.getPaymentMethodId());
-                                                                                                                absentPlugins.add(paymentModelDao.getPaymentMethodId());
-                                                                                                            }
-                                                                                                        }
-
-                                                                                                        pluginInfo = getPaymentTransactionInfoPluginsIfNeeded(pluginApi, paymentModelDao, context);
-                                                                                                    }
-
-                                                                                                    return toPayment(paymentModelDao, transactionsModelDao, pluginInfo, withAttempts, tenantContext);
-                                                                                                }
-                                                                                            });
-
-        // Copy the transformed list, so the transformation function is applied once (otherwise, the Janitor could be invoked multiple times)
-        return ImmutableList.<Payment>copyOf(transformedPayments);
-    }
-
-    public Payment getPayment(final UUID paymentId, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
-        final PaymentModelDao paymentModelDao = paymentDao.getPayment(paymentId, internalTenantContext);
-        return getPayment(paymentModelDao, withPluginInfo, withAttempts, properties, tenantContext, internalTenantContext);
-    }
-
-    public Payment getPaymentByExternalKey(final String paymentExternalKey, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
-        final PaymentModelDao paymentModelDao = paymentDao.getPaymentByExternalKey(paymentExternalKey, internalTenantContext);
-        return getPayment(paymentModelDao, withPluginInfo, withAttempts, properties, tenantContext, internalTenantContext);
-    }
-
-    private Payment getPayment(final PaymentModelDao paymentModelDao, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
-        if (paymentModelDao == null) {
-            return null;
-        }
-        return toPayment(paymentModelDao, withPluginInfo, withAttempts, properties, tenantContext, internalTenantContext);
-    }
-
-    public Pagination<Payment> getPayments(final Long offset, final Long limit, final boolean withPluginInfo, final boolean withAttempts,
-                                           final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) {
-        final Map<UUID, Optional<PaymentPluginApi>> paymentMethodIdToPaymentPluginApi = new HashMap<UUID, Optional<PaymentPluginApi>>();
-
-        try {
-            return getEntityPagination(limit,
-                                       new SourcePaginationBuilder<PaymentModelDao, PaymentApiException>() {
-                                           @Override
-                                           public Pagination<PaymentModelDao> build() {
-                                               // Find all payments for all accounts
-                                               return paymentDao.get(offset, limit, internalTenantContext);
-                                           }
-                                       },
-                                       new Function<PaymentModelDao, Payment>() {
-                                           @Override
-                                           public Payment apply(final PaymentModelDao paymentModelDao) {
-                                               final PaymentPluginApi pluginApi;
-                                               if (!withPluginInfo) {
-                                                   pluginApi = null;
-                                               } else {
-                                                   if (paymentMethodIdToPaymentPluginApi.get(paymentModelDao.getPaymentMethodId()) == null) {
-                                                       try {
-                                                           final PaymentPluginApi paymentProviderPlugin = getPaymentProviderPlugin(paymentModelDao.getPaymentMethodId(), true, internalTenantContext);
-                                                           paymentMethodIdToPaymentPluginApi.put(paymentModelDao.getPaymentMethodId(), Optional.<PaymentPluginApi>of(paymentProviderPlugin));
-                                                       } catch (final PaymentApiException e) {
-                                                           log.warn("Unable to retrieve PaymentPluginApi for paymentMethodId='{}'", paymentModelDao.getPaymentMethodId(), e);
-                                                           // We use Optional to avoid printing the log line for each result
-                                                           paymentMethodIdToPaymentPluginApi.put(paymentModelDao.getPaymentMethodId(), Optional.<PaymentPluginApi>absent());
-                                                       }
-                                                   }
-                                                   pluginApi = paymentMethodIdToPaymentPluginApi.get(paymentModelDao.getPaymentMethodId()).orNull();
-                                               }
-                                               final List<PaymentTransactionInfoPlugin> pluginInfo = getPaymentTransactionInfoPluginsIfNeeded(pluginApi, paymentModelDao, tenantContext);
-                                               return toPayment(paymentModelDao.getId(), pluginInfo, withAttempts, internalTenantContext);
-                                           }
-                                       }
-                                      );
-        } catch (final PaymentApiException e) {
-            log.warn("Unable to get payments", e);
-            return new DefaultPagination<Payment>(offset, limit, null, null, ImmutableSet.<Payment>of().iterator());
-        }
-    }
-
-    public Pagination<Payment> getPayments(final Long offset, final Long limit, final String pluginName, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
-        final PaymentPluginApi pluginApi = withPluginInfo ? getPaymentPluginApi(pluginName) : null;
-
-        return getEntityPagination(limit,
-                                   new SourcePaginationBuilder<PaymentModelDao, PaymentApiException>() {
-                                       @Override
-                                       public Pagination<PaymentModelDao> build() {
-                                           // Find all payments for all accounts
-                                           return paymentDao.getPayments(pluginName, offset, limit, internalTenantContext);
-                                       }
-                                   },
-                                   new Function<PaymentModelDao, Payment>() {
-                                       @Override
-                                       public Payment apply(final PaymentModelDao paymentModelDao) {
-                                           final List<PaymentTransactionInfoPlugin> pluginInfo = getPaymentTransactionInfoPluginsIfNeeded(pluginApi, paymentModelDao, tenantContext);
-                                           return toPayment(paymentModelDao.getId(), pluginInfo, withAttempts, internalTenantContext);
-                                       }
-                                   }
-                                  );
-    }
-
-    public Pagination<Payment> searchPayments(final String searchKey, final Long offset, final Long limit, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) {
-        if (withPluginInfo) {
-            return getEntityPaginationFromPlugins(false,
-                                                  getAvailablePlugins(),
-                                                  offset,
-                                                  limit,
-                                                  new EntityPaginationBuilder<Payment, PaymentApiException>() {
-                                                      @Override
-                                                      public Pagination<Payment> build(final Long offset, final Long limit, final String pluginName) throws PaymentApiException {
-                                                          return searchPayments(searchKey, offset, limit, pluginName, withPluginInfo, withAttempts, properties, tenantContext, internalTenantContext);
-                                                      }
-                                                  }
-                                                 );
-        } else {
-            try {
-                return getEntityPagination(limit,
-                                           new SourcePaginationBuilder<PaymentModelDao, PaymentApiException>() {
-                                               @Override
-                                               public Pagination<PaymentModelDao> build() {
-                                                   return paymentDao.searchPayments(searchKey, offset, limit, internalTenantContext);
-                                               }
-                                           },
-                                           new Function<PaymentModelDao, Payment>() {
-                                               @Override
-                                               public Payment apply(final PaymentModelDao paymentModelDao) {
-                                                   return toPayment(paymentModelDao.getId(), null, withAttempts, internalTenantContext);
-                                               }
-                                           }
-                                          );
-            } catch (final PaymentApiException e) {
-                log.warn("Unable to search through payments", e);
-                return new DefaultPagination<Payment>(offset, limit, null, null, ImmutableSet.<Payment>of().iterator());
-            }
-        }
-    }
-
-    public Pagination<Payment> searchPayments(final String searchKey, final Long offset, final Long limit, final String pluginName, final boolean withPluginInfo,
-                                              final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
-        final PaymentPluginApi pluginApi = getPaymentPluginApi(pluginName);
-
-        return getEntityPagination(limit,
-                                   new SourcePaginationBuilder<PaymentTransactionInfoPlugin, PaymentApiException>() {
-                                       @Override
-                                       public Pagination<PaymentTransactionInfoPlugin> build() throws PaymentApiException {
-                                           try {
-                                               return pluginApi.searchPayments(searchKey, offset, limit, properties, tenantContext);
-                                           } catch (final PaymentPluginApiException e) {
-                                               throw new PaymentApiException(e, ErrorCode.PAYMENT_PLUGIN_SEARCH_PAYMENTS, pluginName, searchKey);
-                                           }
-                                       }
-
-                                   },
-                                   new Function<PaymentTransactionInfoPlugin, Payment>() {
-                                       final List<PaymentTransactionInfoPlugin> cachedPaymentTransactions = new LinkedList<PaymentTransactionInfoPlugin>();
-
-                                       @Override
-                                       public Payment apply(final PaymentTransactionInfoPlugin pluginTransaction) {
-                                           if (pluginTransaction.getKbPaymentId() == null) {
-                                               // Garbage from the plugin?
-                                               log.debug("Plugin {} returned a payment without a kbPaymentId for searchKey {}", pluginName, searchKey);
-                                               return null;
-                                           }
-
-                                           if (cachedPaymentTransactions.isEmpty() ||
-                                               (cachedPaymentTransactions.get(0).getKbPaymentId().equals(pluginTransaction.getKbPaymentId()))) {
-                                               cachedPaymentTransactions.add(pluginTransaction);
-                                               return null;
-                                           } else {
-                                               final Payment result = toPayment(pluginTransaction.getKbPaymentId(), withPluginInfo ? ImmutableList.<PaymentTransactionInfoPlugin>copyOf(cachedPaymentTransactions) : ImmutableList.<PaymentTransactionInfoPlugin>of(), withAttempts, internalTenantContext);
-                                               cachedPaymentTransactions.clear();
-                                               cachedPaymentTransactions.add(pluginTransaction);
-                                               return result;
-                                           }
-                                       }
-                                   }
-                                  );
-    }
-
     public void cancelScheduledPaymentTransaction(@Nullable final UUID paymentTransactionId, @Nullable final String paymentTransactionExternalKey, final CallContext callContext) throws PaymentApiException {
 
         final InternalCallContext internalCallContextWithoutAccountRecordId = internalCallContextFactory.createInternalCallContextWithoutAccountRecordId(callContext);
@@ -432,25 +206,6 @@ public class PaymentProcessor extends ProcessorBase {
         }
     }
 
-    public Payment getPaymentByTransactionId(final UUID transactionId, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
-        final PaymentTransactionModelDao paymentTransactionDao = paymentDao.getPaymentTransaction(transactionId, internalTenantContext);
-        if (null != paymentTransactionDao) {
-            final PaymentModelDao paymentModelDao = paymentDao.getPayment(paymentTransactionDao.getPaymentId(), internalTenantContext);
-            return toPayment(paymentModelDao, withPluginInfo, withAttempts, properties, tenantContext, internalTenantContext);
-        }
-        return null;
-    }
-
-    public Payment getPaymentByTransactionExternalKey(final String transactionExternalKey, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
-        final List<PaymentTransactionModelDao> paymentTransactionDao = paymentDao.getPaymentTransactionsByExternalKey(transactionExternalKey, internalTenantContext);
-        if (paymentTransactionDao.isEmpty()) {
-            return null;
-        }
-        // All transactions must be on the same payment (see sanity in buildPaymentStateContext)
-        final PaymentModelDao paymentModelDao = paymentDao.getPayment(paymentTransactionDao.get(0).getPaymentId(), internalTenantContext);
-        return toPayment(paymentModelDao, withPluginInfo, withAttempts, properties, tenantContext, internalTenantContext);
-    }
-
     private Payment performOperation(final boolean isApiPayment,
                                      @Nullable final UUID attemptId,
                                      final TransactionType transactionType,
@@ -532,8 +287,8 @@ public class PaymentProcessor extends ProcessorBase {
             // prevent disallowed transitions in case the state couldn't be fixed (or if it's already in a final state).
             if (runJanitor) {
                 final PaymentPluginApi plugin = getPaymentProviderPlugin(paymentModelDao.getPaymentMethodId(), true, internalCallContext);
-                final List<PaymentTransactionInfoPlugin> pluginTransactions = getPaymentTransactionInfoPlugins(plugin, paymentModelDao, properties, callContext);
-                paymentModelDao = invokeJanitor(paymentModelDao, paymentTransactionsForCurrentPayment, pluginTransactions, internalCallContext);
+                final List<PaymentTransactionInfoPlugin> pluginTransactions = paymentRefresher.getPaymentTransactionInfoPlugins(plugin, paymentModelDao, properties, callContext);
+                paymentModelDao = paymentRefresher.invokeJanitor(paymentModelDao, paymentTransactionsForCurrentPayment, pluginTransactions, internalCallContext);
             }
 
             if (paymentStateContext.getPaymentTransactionExternalKey() != null) {
@@ -572,7 +327,7 @@ public class PaymentProcessor extends ProcessorBase {
 
         paymentAutomatonRunner.run(paymentStateContext, daoHelper, currentStateName, transactionType);
 
-        return getPayment(paymentStateContext.getPaymentModelDao(), true, false, properties, callContext, internalCallContext);
+        return paymentRefresher.getPayment(paymentStateContext.getPaymentModelDao(), true, false, properties, callContext, internalCallContext);
     }
 
     private void runSanityOnTransactionExternalKey(final Iterable<PaymentTransactionModelDao> allPaymentTransactionsForKey,
@@ -634,246 +389,4 @@ public class PaymentProcessor extends ProcessorBase {
         Preconditions.checkState(Iterables.size(completionCandidates) <= 1, "There should be at most one completion candidate");
         return Iterables.<PaymentTransactionModelDao>getLast(completionCandidates, null);
     }
-
-    // Used in bulk get API (getAccountPayments / getPayments)
-    private List<PaymentTransactionInfoPlugin> getPaymentTransactionInfoPluginsIfNeeded(@Nullable final PaymentPluginApi pluginApi, final PaymentModelDao paymentModelDao, final TenantContext context) {
-        if (pluginApi == null) {
-            return null;
-        }
-
-        try {
-            return getPaymentTransactionInfoPlugins(pluginApi, paymentModelDao, PLUGIN_PROPERTIES, context);
-        } catch (final PaymentApiException e) {
-            log.warn("Unable to retrieve plugin info for payment " + paymentModelDao.getId());
-            return null;
-        }
-    }
-
-    private List<PaymentTransactionInfoPlugin> getPaymentTransactionInfoPlugins(final PaymentPluginApi plugin, final PaymentModelDao paymentModelDao, final Iterable<PluginProperty> properties, final TenantContext context) throws PaymentApiException {
-        try {
-            return plugin.getPaymentInfo(paymentModelDao.getAccountId(), paymentModelDao.getId(), properties, context);
-        } catch (final PaymentPluginApiException e) {
-            throw new PaymentApiException(e, ErrorCode.PAYMENT_PLUGIN_GET_PAYMENT_INFO, paymentModelDao.getId(), e.toString());
-        }
-    }
-
-    // Used in bulk get APIs (getPayments / searchPayments)
-    private Payment toPayment(final UUID paymentId, @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions, final boolean withAttempts, final InternalTenantContext tenantContext) {
-        final PaymentModelDao paymentModelDao = paymentDao.getPayment(paymentId, tenantContext);
-        if (paymentModelDao == null) {
-            log.warn("Unable to find payment id " + paymentId);
-            return null;
-        }
-
-        return toPayment(paymentModelDao, pluginTransactions, withAttempts, tenantContext);
-    }
-
-    // Used in single get APIs (getPayment / getPaymentByExternalKey)
-    private Payment toPayment(final PaymentModelDao paymentModelDao, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext context, final InternalTenantContext tenantContext) throws PaymentApiException {
-        final PaymentPluginApi plugin = getPaymentProviderPlugin(paymentModelDao.getPaymentMethodId(), true, tenantContext);
-        final List<PaymentTransactionInfoPlugin> pluginTransactions = withPluginInfo ? getPaymentTransactionInfoPlugins(plugin, paymentModelDao, properties, context) : null;
-
-        return toPayment(paymentModelDao, pluginTransactions, withAttempts, tenantContext);
-    }
-
-    private Payment toPayment(final PaymentModelDao paymentModelDao, @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions,
-                              final boolean withAttempts, final InternalTenantContext tenantContext) {
-        final InternalTenantContext tenantContextWithAccountRecordId = getInternalTenantContextWithAccountRecordId(paymentModelDao.getAccountId(), tenantContext);
-        final List<PaymentTransactionModelDao> transactionsForPayment = paymentDao.getTransactionsForPayment(paymentModelDao.getId(), tenantContextWithAccountRecordId);
-
-        return toPayment(paymentModelDao, transactionsForPayment, pluginTransactions, withAttempts, tenantContextWithAccountRecordId);
-    }
-
-    private PaymentModelDao invokeJanitor(final PaymentModelDao curPaymentModelDao, final Collection<PaymentTransactionModelDao> curTransactionsModelDao, @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions, final InternalTenantContext internalTenantContext) {
-        // Need to filter for optimized codepaths looking up by account_record_id
-        final Iterable<PaymentTransactionModelDao> filteredTransactions = Iterables.filter(curTransactionsModelDao, new Predicate<PaymentTransactionModelDao>() {
-            @Override
-            public boolean apply(final PaymentTransactionModelDao curPaymentTransactionModelDao) {
-                return curPaymentTransactionModelDao.getPaymentId().equals(curPaymentModelDao.getId());
-            }
-        });
-
-        PaymentModelDao newPaymentModelDao = curPaymentModelDao;
-        final Collection<PaymentTransactionModelDao> transactionsModelDao = new LinkedList<PaymentTransactionModelDao>();
-        for (final PaymentTransactionModelDao curPaymentTransactionModelDao : filteredTransactions) {
-            PaymentTransactionModelDao newPaymentTransactionModelDao = curPaymentTransactionModelDao;
-
-            final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin = findPaymentTransactionInfoPlugin(newPaymentTransactionModelDao, pluginTransactions);
-            if (paymentTransactionInfoPlugin != null) {
-                // Make sure to invoke the Janitor task in case the plugin fixes its state on the fly
-                // See https://github.com/killbill/killbill/issues/341
-                final boolean hasChanged = incompletePaymentTransactionTask.updatePaymentAndTransactionIfNeededWithAccountLock(newPaymentModelDao, newPaymentTransactionModelDao, paymentTransactionInfoPlugin, internalTenantContext);
-                if (hasChanged) {
-                    newPaymentModelDao = paymentDao.getPayment(newPaymentModelDao.getId(), internalTenantContext);
-                    newPaymentTransactionModelDao = paymentDao.getPaymentTransaction(newPaymentTransactionModelDao.getId(), internalTenantContext);
-                }
-            } else {
-                log.warn("Unable to find transaction={} from pluginTransactions={}", curPaymentTransactionModelDao, pluginTransactions);
-            }
-
-            transactionsModelDao.add(newPaymentTransactionModelDao);
-        }
-
-        curTransactionsModelDao.clear();
-        curTransactionsModelDao.addAll(transactionsModelDao);
-
-        return newPaymentModelDao;
-    }
-
-    // Used in bulk get API (getAccountPayments)
-    private Payment toPayment(final PaymentModelDao curPaymentModelDao, final Collection<PaymentTransactionModelDao> curTransactionsModelDao, @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions, final boolean withAttempts, final InternalTenantContext internalTenantContext) {
-        final Collection<PaymentTransactionModelDao> transactionsModelDao = new LinkedList<PaymentTransactionModelDao>(curTransactionsModelDao);
-        invokeJanitor(curPaymentModelDao, transactionsModelDao, pluginTransactions, internalTenantContext);
-
-        final Collection<PaymentTransaction> transactions = new LinkedList<PaymentTransaction>();
-        for (final PaymentTransactionModelDao newPaymentTransactionModelDao : transactionsModelDao) {
-            final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin = findPaymentTransactionInfoPlugin(newPaymentTransactionModelDao, pluginTransactions);
-            final PaymentTransaction transaction = new DefaultPaymentTransaction(newPaymentTransactionModelDao.getId(),
-                                                                                 newPaymentTransactionModelDao.getAttemptId(),
-                                                                                 newPaymentTransactionModelDao.getTransactionExternalKey(),
-                                                                                 newPaymentTransactionModelDao.getCreatedDate(),
-                                                                                 newPaymentTransactionModelDao.getUpdatedDate(),
-                                                                                 newPaymentTransactionModelDao.getPaymentId(),
-                                                                                 newPaymentTransactionModelDao.getTransactionType(),
-                                                                                 newPaymentTransactionModelDao.getEffectiveDate(),
-                                                                                 newPaymentTransactionModelDao.getTransactionStatus(),
-                                                                                 newPaymentTransactionModelDao.getAmount(),
-                                                                                 newPaymentTransactionModelDao.getCurrency(),
-                                                                                 newPaymentTransactionModelDao.getProcessedAmount(),
-                                                                                 newPaymentTransactionModelDao.getProcessedCurrency(),
-                                                                                 newPaymentTransactionModelDao.getGatewayErrorCode(),
-                                                                                 newPaymentTransactionModelDao.getGatewayErrorMsg(),
-                                                                                 paymentTransactionInfoPlugin);
-            transactions.add(transaction);
-        }
-
-        final Ordering<PaymentTransaction> perPaymentTransactionOrdering = Ordering.<PaymentTransaction>from(new Comparator<PaymentTransaction>() {
-            @Override
-            public int compare(final PaymentTransaction o1, final PaymentTransaction o2) {
-                return o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
-            }
-        });
-        final List<PaymentTransaction> sortedTransactions = perPaymentTransactionOrdering.immutableSortedCopy(transactions);
-        return new DefaultPayment(curPaymentModelDao.getId(),
-                                  curPaymentModelDao.getCreatedDate(),
-                                  curPaymentModelDao.getUpdatedDate(),
-                                  curPaymentModelDao.getAccountId(),
-                                  curPaymentModelDao.getPaymentMethodId(),
-                                  curPaymentModelDao.getPaymentNumber(),
-                                  curPaymentModelDao.getExternalKey(),
-                                  sortedTransactions,
-                                  (withAttempts && !sortedTransactions.isEmpty()) ?
-                                  getPaymentAttempts(paymentDao.getPaymentAttempts(curPaymentModelDao.getExternalKey(), internalTenantContext),
-                                                     internalTenantContext) : null
-                                  );
-    }
-
-    private List<PaymentAttempt> getPaymentAttempts(final List<PaymentAttemptModelDao> pastPaymentAttempts,
-                                                    final InternalTenantContext internalTenantContext) {
-
-        List<PaymentAttempt> paymentAttempts = new ArrayList<PaymentAttempt>();
-
-        // Add Past Payment Attempts
-        for (PaymentAttemptModelDao pastPaymentAttempt : pastPaymentAttempts) {
-            DefaultPaymentAttempt paymentAttempt = new DefaultPaymentAttempt(
-                    pastPaymentAttempt.getAccountId(),
-                    pastPaymentAttempt.getPaymentMethodId(),
-                    pastPaymentAttempt.getId(),
-                    pastPaymentAttempt.getCreatedDate(),
-                    pastPaymentAttempt.getUpdatedDate(),
-                    pastPaymentAttempt.getCreatedDate(),
-                    pastPaymentAttempt.getPaymentExternalKey(),
-                    pastPaymentAttempt.getTransactionId(),
-                    pastPaymentAttempt.getTransactionExternalKey(),
-                    pastPaymentAttempt.getTransactionType(),
-                    pastPaymentAttempt.getStateName(),
-                    pastPaymentAttempt.getAmount(),
-                    pastPaymentAttempt.getCurrency(),
-                    pastPaymentAttempt.getPluginName(),
-                    buildPluginProperties(pastPaymentAttempt));
-            paymentAttempts.add(paymentAttempt);
-        }
-
-        // Get Future Payment Attempts from Notification Queue and add them to the list
-        try {
-            final NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, DefaultRetryService.QUEUE_NAME);
-            final Iterable<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
-                    retryQueue.getFutureNotificationForSearchKeys(internalTenantContext.getAccountRecordId(), internalTenantContext.getTenantRecordId());
-
-            for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationEventWithMetadatas) {
-                // Last Attempt
-                PaymentAttemptModelDao lastPaymentAttempt = getLastPaymentAttempt(pastPaymentAttempts,
-                                                                                  ((PaymentRetryNotificationKey) notificationEvent.getEvent()).getAttemptId());
-
-                if (lastPaymentAttempt != null) {
-                    DefaultPaymentAttempt futurePaymentAttempt = new DefaultPaymentAttempt(
-                            lastPaymentAttempt.getAccountId(), // accountId
-                            lastPaymentAttempt.getPaymentMethodId(), // paymentMethodId
-                            ((PaymentRetryNotificationKey) notificationEvent.getEvent()).getAttemptId(), // id
-                            null, // createdDate
-                            null, // updatedDate
-                            notificationEvent.getEffectiveDate(), // effectiveDate
-                            lastPaymentAttempt.getPaymentExternalKey(), // paymentExternalKey
-                            null, // transactionId
-                            lastPaymentAttempt.getTransactionExternalKey(), // transactionExternalKey
-                            lastPaymentAttempt.getTransactionType(), // transactionType
-                            SCHEDULED, // stateName
-                            lastPaymentAttempt.getAmount(), // amount
-                            lastPaymentAttempt.getCurrency(), // currency
-                            ((PaymentRetryNotificationKey) notificationEvent.getEvent()).getPaymentControlPluginNames().get(0), // pluginName,
-                            buildPluginProperties(lastPaymentAttempt)); // pluginProperties
-                    paymentAttempts.add(futurePaymentAttempt);
-                }
-            }
-        } catch (NoSuchNotificationQueue noSuchNotificationQueue) {
-            log.error("ERROR Loading Notification Queue - " + noSuchNotificationQueue.getMessage());
-        }
-        return paymentAttempts;
-    }
-
-    private PaymentAttemptModelDao getLastPaymentAttempt(final List<PaymentAttemptModelDao> pastPaymentAttempts, final UUID attemptId) {
-        if (!pastPaymentAttempts.isEmpty()) {
-            for (int i = pastPaymentAttempts.size() - 1; i >= 0; i--) {
-                if (pastPaymentAttempts.get(i).getId().equals(attemptId)) {
-                    return pastPaymentAttempts.get(i);
-                }
-            }
-        }
-        return null;
-    }
-
-    private List<PluginProperty> buildPluginProperties(final PaymentAttemptModelDao pastPaymentAttempt) {
-        if (pastPaymentAttempt.getPluginProperties() != null) {
-            try {
-                return Lists.newArrayList(PluginPropertySerializer.deserialize(pastPaymentAttempt.getPluginProperties()));
-            } catch (PluginPropertySerializerException e) {
-                log.error("ERROR Deserializing Plugin Properties - " + e.getMessage());
-            }
-        }
-        return null;
-    }
-
-    private PaymentTransactionInfoPlugin findPaymentTransactionInfoPlugin(final PaymentTransactionModelDao paymentTransactionModelDao, @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions) {
-        if (pluginTransactions == null) {
-            return null;
-        }
-
-        return Iterables.tryFind(pluginTransactions,
-                                 new Predicate<PaymentTransactionInfoPlugin>() {
-                                     @Override
-                                     public boolean apply(final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin) {
-                                         return paymentTransactionModelDao.getId().equals(paymentTransactionInfoPlugin.getKbTransactionPaymentId());
-                                     }
-                                 }).orNull();
-    }
-
-    private InternalTenantContext getInternalTenantContextWithAccountRecordId(final UUID accountId, final InternalTenantContext tenantContext) {
-        final InternalTenantContext tenantContextWithAccountRecordId;
-        if (tenantContext.getAccountRecordId() == null) {
-            tenantContextWithAccountRecordId = internalCallContextFactory.createInternalTenantContext(accountId, tenantContext);
-        } else {
-            tenantContextWithAccountRecordId = tenantContext;
-        }
-        return tenantContextWithAccountRecordId;
-    }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentRefresher.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentRefresher.java
new file mode 100644
index 0000000..359dcfd
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentRefresher.java
@@ -0,0 +1,579 @@
+/*
+ * Copyright 2014-2018 Groupon, Inc
+ * Copyright 2014-2018 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import org.killbill.billing.ErrorCode;
+import org.killbill.billing.account.api.AccountInternalApi;
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.invoice.api.InvoiceInternalApi;
+import org.killbill.billing.payment.api.DefaultPayment;
+import org.killbill.billing.payment.api.DefaultPaymentAttempt;
+import org.killbill.billing.payment.api.DefaultPaymentTransaction;
+import org.killbill.billing.payment.api.Payment;
+import org.killbill.billing.payment.api.PaymentApiException;
+import org.killbill.billing.payment.api.PaymentAttempt;
+import org.killbill.billing.payment.api.PaymentTransaction;
+import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.payment.core.janitor.IncompletePaymentTransactionTask;
+import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
+import org.killbill.billing.payment.dao.PaymentDao;
+import org.killbill.billing.payment.dao.PaymentModelDao;
+import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
+import org.killbill.billing.payment.dao.PluginPropertySerializer;
+import org.killbill.billing.payment.dao.PluginPropertySerializer.PluginPropertySerializerException;
+import org.killbill.billing.payment.glue.DefaultPaymentService;
+import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
+import org.killbill.billing.payment.plugin.api.PaymentPluginApiException;
+import org.killbill.billing.payment.plugin.api.PaymentTransactionInfoPlugin;
+import org.killbill.billing.payment.retry.DefaultRetryService;
+import org.killbill.billing.payment.retry.PaymentRetryNotificationKey;
+import org.killbill.billing.tag.TagInternalApi;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.billing.util.callcontext.TenantContext;
+import org.killbill.billing.util.entity.DefaultPagination;
+import org.killbill.billing.util.entity.Pagination;
+import org.killbill.billing.util.entity.dao.DefaultPaginationHelper.EntityPaginationBuilder;
+import org.killbill.billing.util.entity.dao.DefaultPaginationHelper.SourcePaginationBuilder;
+import org.killbill.clock.Clock;
+import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+
+import static org.killbill.billing.util.entity.dao.DefaultPaginationHelper.getEntityPagination;
+import static org.killbill.billing.util.entity.dao.DefaultPaginationHelper.getEntityPaginationFromPlugins;
+
+public class PaymentRefresher extends ProcessorBase {
+
+    private static final Logger log = LoggerFactory.getLogger(PaymentRefresher.class);
+
+    private static final String SCHEDULED = "SCHEDULED";
+    private static final ImmutableList<PluginProperty> PLUGIN_PROPERTIES = ImmutableList.<PluginProperty>of();
+
+    private final NotificationQueueService notificationQueueService;
+    private final IncompletePaymentTransactionTask incompletePaymentTransactionTask;
+
+    @Inject
+    public PaymentRefresher(final PaymentPluginServiceRegistration paymentPluginServiceRegistration,
+                            final AccountInternalApi accountUserApi,
+                            final PaymentDao paymentDao,
+                            final TagInternalApi tagUserApi,
+                            final GlobalLocker locker,
+                            final InternalCallContextFactory internalCallContextFactory,
+                            final InvoiceInternalApi invoiceApi,
+                            final Clock clock,
+                            final NotificationQueueService notificationQueueService,
+                            final IncompletePaymentTransactionTask incompletePaymentTransactionTask) {
+        super(paymentPluginServiceRegistration, accountUserApi, paymentDao, tagUserApi, locker, internalCallContextFactory, invoiceApi, clock);
+        this.notificationQueueService = notificationQueueService;
+        this.incompletePaymentTransactionTask = incompletePaymentTransactionTask;
+    }
+
+    protected void onJanitorChange(final PaymentTransactionModelDao curPaymentTransactionModelDao,
+                                   final InternalTenantContext internalTenantContext) {}
+
+    public PaymentModelDao invokeJanitor(final PaymentModelDao curPaymentModelDao,
+                                         final Collection<PaymentTransactionModelDao> curTransactionsModelDao,
+                                         @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions,
+                                         final InternalTenantContext internalTenantContext) {
+        PaymentModelDao newPaymentModelDao = curPaymentModelDao;
+        final Collection<PaymentTransactionModelDao> transactionsModelDao = new LinkedList<PaymentTransactionModelDao>();
+        for (final PaymentTransactionModelDao curPaymentTransactionModelDao : curTransactionsModelDao) {
+            PaymentTransactionModelDao newPaymentTransactionModelDao = curPaymentTransactionModelDao;
+
+            final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin = findPaymentTransactionInfoPlugin(newPaymentTransactionModelDao, pluginTransactions);
+            if (paymentTransactionInfoPlugin != null) {
+                // Make sure to invoke the Janitor task in case the plugin fixes its state on the fly
+                // See https://github.com/killbill/killbill/issues/341
+                final boolean hasChanged = incompletePaymentTransactionTask.updatePaymentAndTransactionIfNeededWithAccountLock(newPaymentModelDao, newPaymentTransactionModelDao, paymentTransactionInfoPlugin, internalTenantContext);
+                if (hasChanged) {
+                    newPaymentModelDao = paymentDao.getPayment(newPaymentModelDao.getId(), internalTenantContext);
+                    newPaymentTransactionModelDao = paymentDao.getPaymentTransaction(newPaymentTransactionModelDao.getId(), internalTenantContext);
+
+                    onJanitorChange(curPaymentTransactionModelDao, internalTenantContext);
+                }
+            } else {
+                log.warn("Unable to find transaction={} from pluginTransactions={}", curPaymentTransactionModelDao, pluginTransactions);
+            }
+
+            transactionsModelDao.add(newPaymentTransactionModelDao);
+        }
+
+        curTransactionsModelDao.clear();
+        curTransactionsModelDao.addAll(transactionsModelDao);
+
+        return newPaymentModelDao;
+    }
+
+    private PaymentTransactionInfoPlugin findPaymentTransactionInfoPlugin(final PaymentTransactionModelDao paymentTransactionModelDao,
+                                                                          @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions) {
+        if (pluginTransactions == null) {
+            return null;
+        }
+
+        return Iterables.tryFind(pluginTransactions,
+                                 new Predicate<PaymentTransactionInfoPlugin>() {
+                                     @Override
+                                     public boolean apply(final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin) {
+                                         return paymentTransactionModelDao.getId().equals(paymentTransactionInfoPlugin.getKbTransactionPaymentId());
+                                     }
+                                 }).orNull();
+    }
+
+    public List<Payment> getAccountPayments(final UUID accountId, final boolean withPluginInfo, final boolean withAttempts, final TenantContext context, final InternalTenantContext tenantContext) throws PaymentApiException {
+        final List<PaymentModelDao> paymentsModelDao = paymentDao.getPaymentsForAccount(accountId, tenantContext);
+        final List<PaymentTransactionModelDao> transactionsModelDao = paymentDao.getTransactionsForAccount(accountId, tenantContext);
+
+        final Map<UUID, PaymentPluginApi> paymentPluginByPaymentMethodId = new HashMap<UUID, PaymentPluginApi>();
+        final Collection<UUID> absentPlugins = new HashSet<UUID>();
+        final List<Payment> transformedPayments = Lists.<PaymentModelDao, Payment>transform(paymentsModelDao,
+                                                                                            new Function<PaymentModelDao, Payment>() {
+                                                                                                @Override
+                                                                                                public Payment apply(final PaymentModelDao paymentModelDao) {
+                                                                                                    List<PaymentTransactionInfoPlugin> pluginInfo = null;
+
+                                                                                                    if (withPluginInfo) {
+                                                                                                        PaymentPluginApi pluginApi = paymentPluginByPaymentMethodId.get(paymentModelDao.getPaymentMethodId());
+                                                                                                        if (pluginApi == null && !absentPlugins.contains(paymentModelDao.getPaymentMethodId())) {
+                                                                                                            try {
+                                                                                                                pluginApi = getPaymentProviderPlugin(paymentModelDao.getPaymentMethodId(), true, tenantContext);
+                                                                                                                paymentPluginByPaymentMethodId.put(paymentModelDao.getPaymentMethodId(), pluginApi);
+                                                                                                            } catch (final PaymentApiException e) {
+                                                                                                                log.warn("Unable to retrieve pluginApi for payment method " + paymentModelDao.getPaymentMethodId());
+                                                                                                                absentPlugins.add(paymentModelDao.getPaymentMethodId());
+                                                                                                            }
+                                                                                                        }
+
+                                                                                                        pluginInfo = getPaymentTransactionInfoPluginsIfNeeded(pluginApi, paymentModelDao, context);
+                                                                                                    }
+
+                                                                                                    return toPayment(paymentModelDao, transactionsModelDao, pluginInfo, withAttempts, tenantContext);
+                                                                                                }
+                                                                                            });
+
+        // Copy the transformed list, so the transformation function is applied once (otherwise, the Janitor could be invoked multiple times)
+        return ImmutableList.<Payment>copyOf(transformedPayments);
+    }
+
+    public Payment getPayment(final UUID paymentId, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
+        final PaymentModelDao paymentModelDao = paymentDao.getPayment(paymentId, internalTenantContext);
+        return getPayment(paymentModelDao, withPluginInfo, withAttempts, properties, tenantContext, internalTenantContext);
+    }
+
+    public Payment getPaymentByExternalKey(final String paymentExternalKey, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
+        final PaymentModelDao paymentModelDao = paymentDao.getPaymentByExternalKey(paymentExternalKey, internalTenantContext);
+        return getPayment(paymentModelDao, withPluginInfo, withAttempts, properties, tenantContext, internalTenantContext);
+    }
+
+    public Payment getPaymentByTransactionId(final UUID transactionId, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
+        final PaymentTransactionModelDao paymentTransactionDao = paymentDao.getPaymentTransaction(transactionId, internalTenantContext);
+        if (null != paymentTransactionDao) {
+            final PaymentModelDao paymentModelDao = paymentDao.getPayment(paymentTransactionDao.getPaymentId(), internalTenantContext);
+            return toPayment(paymentModelDao, withPluginInfo, withAttempts, properties, tenantContext, internalTenantContext);
+        }
+        return null;
+    }
+
+    public Payment getPaymentByTransactionExternalKey(final String transactionExternalKey, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
+        final List<PaymentTransactionModelDao> paymentTransactionDao = paymentDao.getPaymentTransactionsByExternalKey(transactionExternalKey, internalTenantContext);
+        if (paymentTransactionDao.isEmpty()) {
+            return null;
+        }
+        // All transactions must be on the same payment (see sanity in buildPaymentStateContext)
+        final PaymentModelDao paymentModelDao = paymentDao.getPayment(paymentTransactionDao.get(0).getPaymentId(), internalTenantContext);
+        return toPayment(paymentModelDao, withPluginInfo, withAttempts, properties, tenantContext, internalTenantContext);
+    }
+
+    protected Payment getPayment(final PaymentModelDao paymentModelDao, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
+        if (paymentModelDao == null) {
+            return null;
+        }
+        return toPayment(paymentModelDao, withPluginInfo, withAttempts, properties, tenantContext, internalTenantContext);
+    }
+
+    public Pagination<Payment> getPayments(final Long offset, final Long limit, final boolean withPluginInfo, final boolean withAttempts,
+                                           final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) {
+        final Map<UUID, Optional<PaymentPluginApi>> paymentMethodIdToPaymentPluginApi = new HashMap<UUID, Optional<PaymentPluginApi>>();
+
+        try {
+            return getEntityPagination(limit,
+                                       new SourcePaginationBuilder<PaymentModelDao, PaymentApiException>() {
+                                           @Override
+                                           public Pagination<PaymentModelDao> build() {
+                                               // Find all payments for all accounts
+                                               return paymentDao.get(offset, limit, internalTenantContext);
+                                           }
+                                       },
+                                       new Function<PaymentModelDao, Payment>() {
+                                           @Override
+                                           public Payment apply(final PaymentModelDao paymentModelDao) {
+                                               final PaymentPluginApi pluginApi;
+                                               if (!withPluginInfo) {
+                                                   pluginApi = null;
+                                               } else {
+                                                   if (paymentMethodIdToPaymentPluginApi.get(paymentModelDao.getPaymentMethodId()) == null) {
+                                                       try {
+                                                           final PaymentPluginApi paymentProviderPlugin = getPaymentProviderPlugin(paymentModelDao.getPaymentMethodId(), true, internalTenantContext);
+                                                           paymentMethodIdToPaymentPluginApi.put(paymentModelDao.getPaymentMethodId(), Optional.<PaymentPluginApi>of(paymentProviderPlugin));
+                                                       } catch (final PaymentApiException e) {
+                                                           log.warn("Unable to retrieve PaymentPluginApi for paymentMethodId='{}'", paymentModelDao.getPaymentMethodId(), e);
+                                                           // We use Optional to avoid printing the log line for each result
+                                                           paymentMethodIdToPaymentPluginApi.put(paymentModelDao.getPaymentMethodId(), Optional.<PaymentPluginApi>absent());
+                                                       }
+                                                   }
+                                                   pluginApi = paymentMethodIdToPaymentPluginApi.get(paymentModelDao.getPaymentMethodId()).orNull();
+                                               }
+                                               final List<PaymentTransactionInfoPlugin> pluginInfo = getPaymentTransactionInfoPluginsIfNeeded(pluginApi, paymentModelDao, tenantContext);
+                                               return toPayment(paymentModelDao.getId(), pluginInfo, withAttempts, internalTenantContext);
+                                           }
+                                       }
+                                      );
+        } catch (final PaymentApiException e) {
+            log.warn("Unable to get payments", e);
+            return new DefaultPagination<Payment>(offset, limit, null, null, ImmutableSet.<Payment>of().iterator());
+        }
+    }
+
+    public Pagination<Payment> getPayments(final Long offset, final Long limit, final String pluginName, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
+        final PaymentPluginApi pluginApi = withPluginInfo ? getPaymentPluginApi(pluginName) : null;
+
+        return getEntityPagination(limit,
+                                   new SourcePaginationBuilder<PaymentModelDao, PaymentApiException>() {
+                                       @Override
+                                       public Pagination<PaymentModelDao> build() {
+                                           // Find all payments for all accounts
+                                           return paymentDao.getPayments(pluginName, offset, limit, internalTenantContext);
+                                       }
+                                   },
+                                   new Function<PaymentModelDao, Payment>() {
+                                       @Override
+                                       public Payment apply(final PaymentModelDao paymentModelDao) {
+                                           final List<PaymentTransactionInfoPlugin> pluginInfo = getPaymentTransactionInfoPluginsIfNeeded(pluginApi, paymentModelDao, tenantContext);
+                                           return toPayment(paymentModelDao.getId(), pluginInfo, withAttempts, internalTenantContext);
+                                       }
+                                   }
+                                  );
+    }
+
+    public Pagination<Payment> searchPayments(final String searchKey, final Long offset, final Long limit, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) {
+        if (withPluginInfo) {
+            return getEntityPaginationFromPlugins(false,
+                                                  getAvailablePlugins(),
+                                                  offset,
+                                                  limit,
+                                                  new EntityPaginationBuilder<Payment, PaymentApiException>() {
+                                                      @Override
+                                                      public Pagination<Payment> build(final Long offset, final Long limit, final String pluginName) throws PaymentApiException {
+                                                          return searchPayments(searchKey, offset, limit, pluginName, withPluginInfo, withAttempts, properties, tenantContext, internalTenantContext);
+                                                      }
+                                                  }
+                                                 );
+        } else {
+            try {
+                return getEntityPagination(limit,
+                                           new SourcePaginationBuilder<PaymentModelDao, PaymentApiException>() {
+                                               @Override
+                                               public Pagination<PaymentModelDao> build() {
+                                                   return paymentDao.searchPayments(searchKey, offset, limit, internalTenantContext);
+                                               }
+                                           },
+                                           new Function<PaymentModelDao, Payment>() {
+                                               @Override
+                                               public Payment apply(final PaymentModelDao paymentModelDao) {
+                                                   return toPayment(paymentModelDao.getId(), null, withAttempts, internalTenantContext);
+                                               }
+                                           }
+                                          );
+            } catch (final PaymentApiException e) {
+                log.warn("Unable to search through payments", e);
+                return new DefaultPagination<Payment>(offset, limit, null, null, ImmutableSet.<Payment>of().iterator());
+            }
+        }
+    }
+
+    public Pagination<Payment> searchPayments(final String searchKey, final Long offset, final Long limit, final String pluginName, final boolean withPluginInfo,
+                                              final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext tenantContext, final InternalTenantContext internalTenantContext) throws PaymentApiException {
+        final PaymentPluginApi pluginApi = getPaymentPluginApi(pluginName);
+
+        return getEntityPagination(limit,
+                                   new SourcePaginationBuilder<PaymentTransactionInfoPlugin, PaymentApiException>() {
+                                       @Override
+                                       public Pagination<PaymentTransactionInfoPlugin> build() throws PaymentApiException {
+                                           try {
+                                               return pluginApi.searchPayments(searchKey, offset, limit, properties, tenantContext);
+                                           } catch (final PaymentPluginApiException e) {
+                                               throw new PaymentApiException(e, ErrorCode.PAYMENT_PLUGIN_SEARCH_PAYMENTS, pluginName, searchKey);
+                                           }
+                                       }
+
+                                   },
+                                   new Function<PaymentTransactionInfoPlugin, Payment>() {
+                                       final List<PaymentTransactionInfoPlugin> cachedPaymentTransactions = new LinkedList<PaymentTransactionInfoPlugin>();
+
+                                       @Override
+                                       public Payment apply(final PaymentTransactionInfoPlugin pluginTransaction) {
+                                           if (pluginTransaction.getKbPaymentId() == null) {
+                                               // Garbage from the plugin?
+                                               log.debug("Plugin {} returned a payment without a kbPaymentId for searchKey {}", pluginName, searchKey);
+                                               return null;
+                                           }
+
+                                           if (cachedPaymentTransactions.isEmpty() ||
+                                               (cachedPaymentTransactions.get(0).getKbPaymentId().equals(pluginTransaction.getKbPaymentId()))) {
+                                               cachedPaymentTransactions.add(pluginTransaction);
+                                               return null;
+                                           } else {
+                                               final Payment result = toPayment(pluginTransaction.getKbPaymentId(), withPluginInfo ? ImmutableList.<PaymentTransactionInfoPlugin>copyOf(cachedPaymentTransactions) : ImmutableList.<PaymentTransactionInfoPlugin>of(), withAttempts, internalTenantContext);
+                                               cachedPaymentTransactions.clear();
+                                               cachedPaymentTransactions.add(pluginTransaction);
+                                               return result;
+                                           }
+                                       }
+                                   }
+                                  );
+    }
+
+    // Used in bulk get APIs (getPayments / searchPayments)
+    private Payment toPayment(final UUID paymentId,
+                              @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions,
+                              final boolean withAttempts,
+                              final InternalTenantContext tenantContext) {
+        final PaymentModelDao paymentModelDao = paymentDao.getPayment(paymentId, tenantContext);
+        if (paymentModelDao == null) {
+            log.warn("Unable to find payment id " + paymentId);
+            return null;
+        }
+
+        return toPayment(paymentModelDao, pluginTransactions, withAttempts, tenantContext);
+    }
+
+    // Used in single get APIs (getPayment / getPaymentByExternalKey)
+    private Payment toPayment(final PaymentModelDao paymentModelDao, final boolean withPluginInfo, final boolean withAttempts, final Iterable<PluginProperty> properties, final TenantContext context, final InternalTenantContext tenantContext) throws PaymentApiException {
+        final PaymentPluginApi plugin = getPaymentProviderPlugin(paymentModelDao.getPaymentMethodId(), true, tenantContext);
+        final List<PaymentTransactionInfoPlugin> pluginTransactions = withPluginInfo ? getPaymentTransactionInfoPlugins(plugin, paymentModelDao, properties, context) : null;
+
+        return toPayment(paymentModelDao, pluginTransactions, withAttempts, tenantContext);
+    }
+
+    private Payment toPayment(final PaymentModelDao paymentModelDao, @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions,
+                              final boolean withAttempts, final InternalTenantContext tenantContext) {
+        final InternalTenantContext tenantContextWithAccountRecordId = getInternalTenantContextWithAccountRecordId(paymentModelDao.getAccountId(), tenantContext);
+        final List<PaymentTransactionModelDao> transactionsForPayment = paymentDao.getTransactionsForPayment(paymentModelDao.getId(), tenantContextWithAccountRecordId);
+
+        return toPayment(paymentModelDao, transactionsForPayment, pluginTransactions, withAttempts, tenantContextWithAccountRecordId);
+    }
+
+    // Used in bulk get API (getAccountPayments)
+    private Payment toPayment(final PaymentModelDao curPaymentModelDao, final Collection<PaymentTransactionModelDao> allTransactionsModelDao, @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions, final boolean withAttempts, final InternalTenantContext internalTenantContext) {
+        // Need to filter for optimized codepaths looking up by account_record_id
+        final Collection<PaymentTransactionModelDao> transactionsModelDao = new LinkedList<PaymentTransactionModelDao>(Collections2.filter(allTransactionsModelDao, new Predicate<PaymentTransactionModelDao>() {
+            @Override
+            public boolean apply(final PaymentTransactionModelDao curPaymentTransactionModelDao) {
+                return curPaymentTransactionModelDao.getPaymentId().equals(curPaymentModelDao.getId());
+            }
+        }));
+
+        if (pluginTransactions != null) {
+            invokeJanitor(curPaymentModelDao, transactionsModelDao, pluginTransactions, internalTenantContext);
+        }
+
+        final Collection<PaymentTransaction> transactions = new LinkedList<PaymentTransaction>();
+        for (final PaymentTransactionModelDao newPaymentTransactionModelDao : transactionsModelDao) {
+            final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin = findPaymentTransactionInfoPlugin(newPaymentTransactionModelDao, pluginTransactions);
+            final PaymentTransaction transaction = new DefaultPaymentTransaction(newPaymentTransactionModelDao.getId(),
+                                                                                 newPaymentTransactionModelDao.getAttemptId(),
+                                                                                 newPaymentTransactionModelDao.getTransactionExternalKey(),
+                                                                                 newPaymentTransactionModelDao.getCreatedDate(),
+                                                                                 newPaymentTransactionModelDao.getUpdatedDate(),
+                                                                                 newPaymentTransactionModelDao.getPaymentId(),
+                                                                                 newPaymentTransactionModelDao.getTransactionType(),
+                                                                                 newPaymentTransactionModelDao.getEffectiveDate(),
+                                                                                 newPaymentTransactionModelDao.getTransactionStatus(),
+                                                                                 newPaymentTransactionModelDao.getAmount(),
+                                                                                 newPaymentTransactionModelDao.getCurrency(),
+                                                                                 newPaymentTransactionModelDao.getProcessedAmount(),
+                                                                                 newPaymentTransactionModelDao.getProcessedCurrency(),
+                                                                                 newPaymentTransactionModelDao.getGatewayErrorCode(),
+                                                                                 newPaymentTransactionModelDao.getGatewayErrorMsg(),
+                                                                                 paymentTransactionInfoPlugin);
+            transactions.add(transaction);
+        }
+
+        final Ordering<PaymentTransaction> perPaymentTransactionOrdering = Ordering.<PaymentTransaction>from(new Comparator<PaymentTransaction>() {
+            @Override
+            public int compare(final PaymentTransaction o1, final PaymentTransaction o2) {
+                return o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
+            }
+        });
+        final List<PaymentTransaction> sortedTransactions = perPaymentTransactionOrdering.immutableSortedCopy(transactions);
+        return new DefaultPayment(curPaymentModelDao.getId(),
+                                  curPaymentModelDao.getCreatedDate(),
+                                  curPaymentModelDao.getUpdatedDate(),
+                                  curPaymentModelDao.getAccountId(),
+                                  curPaymentModelDao.getPaymentMethodId(),
+                                  curPaymentModelDao.getPaymentNumber(),
+                                  curPaymentModelDao.getExternalKey(),
+                                  sortedTransactions,
+                                  (withAttempts && !sortedTransactions.isEmpty()) ?
+                                  getPaymentAttempts(paymentDao.getPaymentAttempts(curPaymentModelDao.getExternalKey(), internalTenantContext),
+                                                     internalTenantContext) : null
+        );
+    }
+
+    private List<PaymentAttempt> getPaymentAttempts(final List<PaymentAttemptModelDao> pastPaymentAttempts,
+                                                    final InternalTenantContext internalTenantContext) {
+
+        final List<PaymentAttempt> paymentAttempts = new ArrayList<PaymentAttempt>();
+
+        // Add Past Payment Attempts
+        for (final PaymentAttemptModelDao pastPaymentAttempt : pastPaymentAttempts) {
+            final PaymentAttempt paymentAttempt = new DefaultPaymentAttempt(pastPaymentAttempt.getAccountId(),
+                                                                            pastPaymentAttempt.getPaymentMethodId(),
+                                                                            pastPaymentAttempt.getId(),
+                                                                            pastPaymentAttempt.getCreatedDate(),
+                                                                            pastPaymentAttempt.getUpdatedDate(),
+                                                                            pastPaymentAttempt.getCreatedDate(),
+                                                                            pastPaymentAttempt.getPaymentExternalKey(),
+                                                                            pastPaymentAttempt.getTransactionId(),
+                                                                            pastPaymentAttempt.getTransactionExternalKey(),
+                                                                            pastPaymentAttempt.getTransactionType(),
+                                                                            pastPaymentAttempt.getStateName(),
+                                                                            pastPaymentAttempt.getAmount(),
+                                                                            pastPaymentAttempt.getCurrency(),
+                                                                            pastPaymentAttempt.getPluginName(),
+                                                                            buildPluginProperties(pastPaymentAttempt));
+            paymentAttempts.add(paymentAttempt);
+        }
+
+        // Get Future Payment Attempts from Notification Queue and add them to the list
+        try {
+            final NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, DefaultRetryService.QUEUE_NAME);
+            final Iterable<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
+                    retryQueue.getFutureNotificationForSearchKeys(internalTenantContext.getAccountRecordId(), internalTenantContext.getTenantRecordId());
+
+            for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationEventWithMetadatas) {
+                // Last Attempt
+                final PaymentAttemptModelDao lastPaymentAttempt = getLastPaymentAttempt(pastPaymentAttempts,
+                                                                                        ((PaymentRetryNotificationKey) notificationEvent.getEvent()).getAttemptId());
+
+                if (lastPaymentAttempt != null) {
+                    final PaymentAttempt futurePaymentAttempt = new DefaultPaymentAttempt(lastPaymentAttempt.getAccountId(), // accountId
+                                                                                          lastPaymentAttempt.getPaymentMethodId(), // paymentMethodId
+                                                                                          ((PaymentRetryNotificationKey) notificationEvent.getEvent()).getAttemptId(), // id
+                                                                                          null, // createdDate
+                                                                                          null, // updatedDate
+                                                                                          notificationEvent.getEffectiveDate(), // effectiveDate
+                                                                                          lastPaymentAttempt.getPaymentExternalKey(), // paymentExternalKey
+                                                                                          null, // transactionId
+                                                                                          lastPaymentAttempt.getTransactionExternalKey(), // transactionExternalKey
+                                                                                          lastPaymentAttempt.getTransactionType(), // transactionType
+                                                                                          SCHEDULED, // stateName
+                                                                                          lastPaymentAttempt.getAmount(), // amount
+                                                                                          lastPaymentAttempt.getCurrency(), // currency
+                                                                                          ((PaymentRetryNotificationKey) notificationEvent.getEvent()).getPaymentControlPluginNames().get(0), // pluginName,
+                                                                                          buildPluginProperties(lastPaymentAttempt)); // pluginProperties
+                    paymentAttempts.add(futurePaymentAttempt);
+                }
+            }
+        } catch (final NoSuchNotificationQueue noSuchNotificationQueue) {
+            log.error("ERROR Loading Notification Queue - " + noSuchNotificationQueue.getMessage());
+        }
+        return paymentAttempts;
+    }
+
+    private PaymentAttemptModelDao getLastPaymentAttempt(final List<PaymentAttemptModelDao> pastPaymentAttempts, final UUID attemptId) {
+        if (!pastPaymentAttempts.isEmpty()) {
+            for (int i = pastPaymentAttempts.size() - 1; i >= 0; i--) {
+                if (pastPaymentAttempts.get(i).getId().equals(attemptId)) {
+                    return pastPaymentAttempts.get(i);
+                }
+            }
+        }
+        return null;
+    }
+
+    private List<PluginProperty> buildPluginProperties(final PaymentAttemptModelDao pastPaymentAttempt) {
+        if (pastPaymentAttempt.getPluginProperties() != null) {
+            try {
+                return Lists.newArrayList(PluginPropertySerializer.deserialize(pastPaymentAttempt.getPluginProperties()));
+            } catch (final PluginPropertySerializerException e) {
+                log.error("ERROR Deserializing Plugin Properties - " + e.getMessage());
+            }
+        }
+        return null;
+    }
+
+    private InternalTenantContext getInternalTenantContextWithAccountRecordId(final UUID accountId, final InternalTenantContext tenantContext) {
+        final InternalTenantContext tenantContextWithAccountRecordId;
+        if (tenantContext.getAccountRecordId() == null) {
+            tenantContextWithAccountRecordId = internalCallContextFactory.createInternalTenantContext(accountId, tenantContext);
+        } else {
+            tenantContextWithAccountRecordId = tenantContext;
+        }
+        return tenantContextWithAccountRecordId;
+    }
+
+    // Used in bulk get API (getAccountPayments / getPayments)
+    private List<PaymentTransactionInfoPlugin> getPaymentTransactionInfoPluginsIfNeeded(@Nullable final PaymentPluginApi pluginApi, final PaymentModelDao paymentModelDao, final TenantContext context) {
+        if (pluginApi == null) {
+            return null;
+        }
+
+        try {
+            return getPaymentTransactionInfoPlugins(pluginApi, paymentModelDao, PLUGIN_PROPERTIES, context);
+        } catch (final PaymentApiException e) {
+            log.warn("Unable to retrieve plugin info for payment " + paymentModelDao.getId());
+            return null;
+        }
+    }
+
+    List<PaymentTransactionInfoPlugin> getPaymentTransactionInfoPlugins(final PaymentPluginApi plugin, final PaymentModelDao paymentModelDao, final Iterable<PluginProperty> properties, final TenantContext context) throws PaymentApiException {
+        try {
+            return plugin.getPaymentInfo(paymentModelDao.getAccountId(), paymentModelDao.getId(), properties, context);
+        } catch (final PaymentPluginApiException e) {
+            throw new PaymentApiException(e, ErrorCode.PAYMENT_PLUGIN_GET_PAYMENT_INFO, paymentModelDao.getId(), e.toString());
+        }
+    }
+}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/control/CompletionControlOperation.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/control/CompletionControlOperation.java
index 91fea81..74b4ffe 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/control/CompletionControlOperation.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/control/CompletionControlOperation.java
@@ -28,6 +28,7 @@ import org.killbill.billing.payment.api.PaymentApiException;
 import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.core.PaymentProcessor;
+import org.killbill.billing.payment.core.PaymentRefresher;
 import org.killbill.billing.payment.core.ProcessorBase.DispatcherCallback;
 import org.killbill.billing.payment.core.sm.control.ControlPluginRunner.DefaultPaymentControlContext;
 import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
@@ -50,13 +51,17 @@ public class CompletionControlOperation extends OperationControlCallback {
 
     private static final Joiner JOINER = Joiner.on(", ");
 
+    private final PaymentRefresher paymentRefresher;
+
     public CompletionControlOperation(final GlobalLocker locker,
                                       final PluginDispatcher<OperationResult> paymentPluginDispatcher,
                                       final PaymentConfig paymentConfig,
                                       final PaymentStateControlContext paymentStateContext,
+                                      final PaymentRefresher paymentRefresher,
                                       final PaymentProcessor paymentProcessor,
                                       final ControlPluginRunner controlPluginRunner) {
         super(locker, paymentPluginDispatcher, paymentStateContext, paymentProcessor, paymentConfig, controlPluginRunner);
+        this.paymentRefresher = paymentRefresher;
     }
 
     @Override
@@ -115,6 +120,6 @@ public class CompletionControlOperation extends OperationControlCallback {
 
     @Override
     protected Payment doCallSpecificOperationCallback() throws PaymentApiException {
-        return paymentProcessor.getPayment(paymentStateContext.getPaymentId(), false, false, ImmutableList.<PluginProperty>of(), paymentStateContext.getCallContext(), paymentStateContext.getInternalCallContext());
+        return paymentRefresher.getPayment(paymentStateContext.getPaymentId(), false, false, ImmutableList.<PluginProperty>of(), paymentStateContext.getCallContext(), paymentStateContext.getInternalCallContext());
     }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlPaymentAutomatonRunner.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlPaymentAutomatonRunner.java
index 5be8359..ff64641 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlPaymentAutomatonRunner.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/PluginControlPaymentAutomatonRunner.java
@@ -46,6 +46,7 @@ import org.killbill.billing.payment.api.TransactionType;
 import org.killbill.billing.payment.core.PaymentExecutors;
 import org.killbill.billing.payment.core.PaymentPluginServiceRegistration;
 import org.killbill.billing.payment.core.PaymentProcessor;
+import org.killbill.billing.payment.core.PaymentRefresher;
 import org.killbill.billing.payment.core.sm.control.AuthorizeControlOperation;
 import org.killbill.billing.payment.core.sm.control.CaptureControlOperation;
 import org.killbill.billing.payment.core.sm.control.ChargebackControlOperation;
@@ -94,12 +95,13 @@ public class PluginControlPaymentAutomatonRunner extends PaymentAutomatonRunner 
     private final PaymentControlStateMachineHelper paymentControlStateMachineHelper;
     private final ControlPluginRunner controlPluginRunner;
     private final PaymentConfig paymentConfig;
+    private final PaymentRefresher paymentRefresher;
 
     @Inject
     public PluginControlPaymentAutomatonRunner(final PaymentDao paymentDao, final GlobalLocker locker, final PaymentPluginServiceRegistration paymentPluginServiceRegistration,
                                                final OSGIServiceRegistration<PaymentControlPluginApi> paymentControlPluginRegistry, final Clock clock, final PaymentProcessor paymentProcessor, @Named(RETRYABLE_NAMED) final RetryServiceScheduler retryServiceScheduler,
                                                final PaymentConfig paymentConfig, final PaymentExecutors executors, final PaymentStateMachineHelper paymentSMHelper, final PaymentControlStateMachineHelper paymentControlStateMachineHelper,
-                                               final ControlPluginRunner controlPluginRunner, final PersistentBus eventBus) {
+                                               final ControlPluginRunner controlPluginRunner, final PersistentBus eventBus, final PaymentRefresher paymentRefresher) {
         super(paymentConfig, paymentDao, locker, paymentPluginServiceRegistration, clock, executors, eventBus, paymentSMHelper);
         this.paymentProcessor = paymentProcessor;
         this.paymentControlPluginRegistry = paymentControlPluginRegistry;
@@ -107,6 +109,7 @@ public class PluginControlPaymentAutomatonRunner extends PaymentAutomatonRunner 
         this.paymentControlStateMachineHelper = paymentControlStateMachineHelper;
         this.controlPluginRunner = controlPluginRunner;
         this.paymentConfig = paymentConfig;
+        this.paymentRefresher = paymentRefresher;
     }
 
     public Payment run(final boolean isApiPayment,
@@ -274,7 +277,7 @@ public class PluginControlPaymentAutomatonRunner extends PaymentAutomatonRunner 
 
     public Payment completeRun(final PaymentStateControlContext paymentStateContext) throws PaymentApiException {
         try {
-            final OperationCallback callback = new CompletionControlOperation(locker, paymentPluginDispatcher, paymentConfig, paymentStateContext, paymentProcessor, controlPluginRunner);
+            final OperationCallback callback = new CompletionControlOperation(locker, paymentPluginDispatcher, paymentConfig, paymentStateContext, paymentRefresher, paymentProcessor, controlPluginRunner);
             final LeavingStateCallback leavingStateCallback = new NoopControlInitiated();
             final EnteringStateCallback enteringStateCallback = new DefaultControlCompleted(this, paymentStateContext, paymentControlStateMachineHelper.getRetriedState(), retryServiceScheduler);
 
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/MockRetryablePaymentAutomatonRunner.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/MockRetryablePaymentAutomatonRunner.java
index bfbde89..d319207 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/MockRetryablePaymentAutomatonRunner.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/MockRetryablePaymentAutomatonRunner.java
@@ -39,6 +39,7 @@ import org.killbill.billing.payment.api.TransactionType;
 import org.killbill.billing.payment.core.PaymentExecutors;
 import org.killbill.billing.payment.core.PaymentPluginServiceRegistration;
 import org.killbill.billing.payment.core.PaymentProcessor;
+import org.killbill.billing.payment.core.PaymentRefresher;
 import org.killbill.billing.payment.core.sm.control.ControlPluginRunner;
 import org.killbill.billing.payment.core.sm.control.PaymentStateControlContext;
 import org.killbill.billing.payment.dao.PaymentDao;
@@ -61,8 +62,8 @@ public class MockRetryablePaymentAutomatonRunner extends PluginControlPaymentAut
     @Inject
     public MockRetryablePaymentAutomatonRunner(final PaymentDao paymentDao, final GlobalLocker locker, final PaymentPluginServiceRegistration paymentPluginServiceRegistration, final OSGIServiceRegistration<PaymentControlPluginApi> retryPluginRegistry, final Clock clock, final TagInternalApi tagApi, final PaymentProcessor paymentProcessor,
                                                @Named(RETRYABLE_NAMED) final RetryServiceScheduler retryServiceScheduler, final PaymentConfig paymentConfig, final PaymentExecutors executors,
-                                               final PaymentStateMachineHelper paymentSMHelper, final PaymentControlStateMachineHelper retrySMHelper, final ControlPluginRunner controlPluginRunner, final PersistentBus eventBus) {
-        super(paymentDao, locker, paymentPluginServiceRegistration, retryPluginRegistry, clock, paymentProcessor, retryServiceScheduler, paymentConfig, executors, paymentSMHelper, retrySMHelper, controlPluginRunner, eventBus);
+                                               final PaymentStateMachineHelper paymentSMHelper, final PaymentControlStateMachineHelper retrySMHelper, final ControlPluginRunner controlPluginRunner, final PersistentBus eventBus, final PaymentRefresher paymentRefresher) {
+        super(paymentDao, locker, paymentPluginServiceRegistration, retryPluginRegistry, clock, paymentProcessor, retryServiceScheduler, paymentConfig, executors, paymentSMHelper, retrySMHelper, controlPluginRunner, eventBus, paymentRefresher);
     }
 
     @Override
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryablePayment.java b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryablePayment.java
index 74f67f9..b50995c 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryablePayment.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/sm/TestRetryablePayment.java
@@ -178,7 +178,8 @@ public class TestRetryablePayment extends PaymentTestSuiteNoDB {
                 paymentSMHelper,
                 retrySMHelper,
                 controlPluginRunner,
-                eventBus);
+                eventBus,
+                paymentRefresher);
 
         paymentStateContext =
                 new PaymentStateControlContext(ImmutableList.<String>of(MockPaymentControlProviderPlugin.PLUGIN_NAME),
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/TestPaymentProcessor.java b/payment/src/test/java/org/killbill/billing/payment/core/TestPaymentProcessor.java
index 92470f0..ca86a83 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/TestPaymentProcessor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/TestPaymentProcessor.java
@@ -92,7 +92,7 @@ public class TestPaymentProcessor extends PaymentTestSuiteWithEmbeddedDB {
 
         mockPaymentProviderPlugin.overridePaymentPluginStatus(paymentId, authorization.getTransactions().get(0).getId(), PaymentPluginStatus.PROCESSED);
 
-        final List<Payment> payments = paymentProcessor.getAccountPayments(account.getId(), true, false, callContext, internalCallContext);
+        final List<Payment> payments = paymentControlAwareRefresher.getAccountPayments(account.getId(), true, false, callContext, internalCallContext);
         Assert.assertEquals(payments.size(), 1);
         verifyPayment(payments.get(0), paymentExternalKey, TEN, ZERO, ZERO, 1);
         verifyPaymentTransaction(payments.get(0).getTransactions().get(0), authorizationKey, TransactionType.AUTHORIZE, TEN, paymentId);
@@ -227,7 +227,7 @@ public class TestPaymentProcessor extends PaymentTestSuiteWithEmbeddedDB {
         } catch (final PaymentApiException e) {
             Assert.assertEquals(e.getCode(), ErrorCode.PAYMENT_INVALID_OPERATION.getCode());
         }
-        final Payment refreshedPayment = paymentProcessor.getPayment(authorization.getId(), false, false, PLUGIN_PROPERTIES, callContext, internalCallContext);
+        final Payment refreshedPayment = paymentRefresher.getPayment(authorization.getId(), false, false, PLUGIN_PROPERTIES, callContext, internalCallContext);
         // Make sure no state has been created (no UNKNOWN transaction for the refund)
         verifyPayment(refreshedPayment, paymentExternalKey, ZERO, ZERO, ZERO, 1);
         paymentBusListener.verify(0, 1, 0, account.getId(), paymentId, ZERO, TransactionStatus.PAYMENT_FAILURE);
diff --git a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteNoDB.java b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteNoDB.java
index ed3df88..4f451dc 100644
--- a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteNoDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteNoDB.java
@@ -36,6 +36,7 @@ import org.killbill.billing.payment.core.PaymentExecutors;
 import org.killbill.billing.payment.core.PaymentMethodProcessor;
 import org.killbill.billing.payment.core.PaymentPluginServiceRegistration;
 import org.killbill.billing.payment.core.PaymentProcessor;
+import org.killbill.billing.payment.core.PaymentRefresher;
 import org.killbill.billing.payment.core.PluginControlPaymentProcessor;
 import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PluginControlPaymentAutomatonRunner;
@@ -96,6 +97,8 @@ public abstract class PaymentTestSuiteNoDB extends GuicyKillbillTestSuiteNoDB {
     @Inject
     protected PaymentStateMachineHelper paymentSMHelper;
     @Inject
+    protected PaymentRefresher paymentRefresher;
+    @Inject
     protected PaymentProcessor paymentProcessor;
     @Inject
     protected PluginControlPaymentProcessor pluginControlPaymentProcessor;
diff --git a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
index 82ee233..db0e354 100644
--- a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
@@ -28,10 +28,12 @@ import org.killbill.billing.payment.api.InvoicePaymentApi;
 import org.killbill.billing.payment.api.PaymentApi;
 import org.killbill.billing.payment.api.PaymentGatewayApi;
 import org.killbill.billing.payment.caching.StateMachineConfigCache;
+import org.killbill.billing.payment.core.PaymentControlAwareRefresher;
 import org.killbill.billing.payment.core.PaymentExecutors;
 import org.killbill.billing.payment.core.PaymentMethodProcessor;
 import org.killbill.billing.payment.core.PaymentPluginServiceRegistration;
 import org.killbill.billing.payment.core.PaymentProcessor;
+import org.killbill.billing.payment.core.PaymentRefresher;
 import org.killbill.billing.payment.core.janitor.IncompletePaymentTransactionTask;
 import org.killbill.billing.payment.core.janitor.Janitor;
 import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
@@ -67,6 +69,10 @@ public abstract class PaymentTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
     @Inject
     protected PaymentMethodProcessor paymentMethodProcessor;
     @Inject
+    protected PaymentRefresher paymentRefresher;
+    @Inject
+    protected PaymentControlAwareRefresher paymentControlAwareRefresher;
+    @Inject
     protected PaymentProcessor paymentProcessor;
     @Inject
     protected DefaultRetryService retryService;
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java b/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
index d1da4a7..0381d75 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
@@ -86,7 +86,7 @@ public class TestRetryService extends PaymentTestSuiteNoDB {
     }
 
     private Payment getPaymentForExternalKey(final String externalKey) throws PaymentApiException {
-        final Payment payment = paymentProcessor.getPaymentByExternalKey(externalKey, false, false, ImmutableList.<PluginProperty>of(), callContext, internalCallContext);
+        final Payment payment = paymentRefresher.getPaymentByExternalKey(externalKey, false, false, ImmutableList.<PluginProperty>of(), callContext, internalCallContext);
         return payment;
     }