killbill-memoizeit

Details

diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
index 1073e71..ac1a218 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
@@ -19,14 +19,12 @@ package org.killbill.billing.payment.core.janitor;
 
 import java.util.List;
 
-import org.joda.time.DateTime;
 import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.callcontext.DefaultCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.osgi.api.OSGIServiceRegistration;
-import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
-import org.killbill.billing.payment.core.sm.PluginRoutingPaymentAutomatonRunner;
 import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
+import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.dao.PaymentDao;
 import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
 import org.killbill.billing.util.UUIDs;
@@ -44,9 +42,6 @@ abstract class CompletionTaskBase<T> implements Runnable {
 
     protected Logger log = LoggerFactory.getLogger(CompletionTaskBase.class);
 
-    private final Janitor janitor;
-    private final String taskName;
-
     protected final PaymentConfig paymentConfig;
     protected final Clock clock;
     protected final PaymentDao paymentDao;
@@ -54,14 +49,14 @@ abstract class CompletionTaskBase<T> implements Runnable {
     protected final PaymentStateMachineHelper paymentStateMachineHelper;
     protected final PaymentControlStateMachineHelper retrySMHelper;
     protected final AccountInternalApi accountInternalApi;
-    protected final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner;
     protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
 
-    public CompletionTaskBase(final Janitor janitor, final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
+    private volatile boolean isStopped;
+
+    public CompletionTaskBase(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
                               final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
                               final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
-                              final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
-        this.janitor = janitor;
+                              final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
         this.internalCallContextFactory = internalCallContextFactory;
         this.paymentConfig = paymentConfig;
         this.paymentDao = paymentDao;
@@ -69,23 +64,20 @@ abstract class CompletionTaskBase<T> implements Runnable {
         this.paymentStateMachineHelper = paymentStateMachineHelper;
         this.retrySMHelper = retrySMHelper;
         this.accountInternalApi = accountInternalApi;
-        this.pluginControlledPaymentAutomatonRunner = pluginControlledPaymentAutomatonRunner;
         this.pluginRegistry = pluginRegistry;
-        // Limit the length of the username in the context (limited to 50 characters)
-        this.taskName = this.getClass().getSimpleName();
+        this.isStopped = false;
     }
 
     @Override
     public void run() {
-
-        if (janitor.isStopped()) {
-            log.info("Janitor Task " + taskName + " was requested to stop");
+        if (isStopped) {
+            log.info("Janitor was requested to stop");
             return;
         }
         final List<T> items = getItemsForIteration();
         for (final T item : items) {
-            if (janitor.isStopped()) {
-                log.info("Janitor Task " + taskName + " was requested to stop");
+            if (isStopped) {
+                log.info("Janitor was requested to stop");
                 return;
             }
             try {
@@ -96,14 +88,16 @@ abstract class CompletionTaskBase<T> implements Runnable {
         }
     }
 
+    public synchronized void stop() {
+        this.isStopped = true;
+    }
+
     public abstract List<T> getItemsForIteration();
 
     public abstract void doIteration(final T item);
 
     protected CallContext createCallContext(final String taskName, final InternalTenantContext internalTenantContext) {
         final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
-        final CallContext callContext = new DefaultCallContext(tenantContext.getTenantId(), taskName, CallOrigin.INTERNAL, UserType.SYSTEM, UUIDs.randomUUID(), clock);
-        return callContext;
+        return new DefaultCallContext(tenantContext.getTenantId(), taskName, CallOrigin.INTERNAL, UserType.SYSTEM, UUIDs.randomUUID(), clock);
     }
-
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
index 64220d0..ddd73dc 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
@@ -19,6 +19,8 @@ package org.killbill.billing.payment.core.janitor;
 
 import java.util.List;
 
+import javax.inject.Inject;
+
 import org.joda.time.DateTime;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.account.api.AccountApiException;
@@ -28,9 +30,9 @@ import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.osgi.api.OSGIServiceRegistration;
 import org.killbill.billing.payment.api.PaymentApiException;
 import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PluginRoutingPaymentAutomatonRunner;
-import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
 import org.killbill.billing.payment.core.sm.control.PaymentStateControlContext;
 import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
 import org.killbill.billing.payment.dao.PaymentDao;
@@ -52,13 +54,17 @@ import com.google.common.collect.Iterables;
  * If the state of the transaction associated with the attempt completed, but the attempt state machine did not,
  * we rerun the retry state machine to complete the call and transition the attempt into a terminal state.
  */
-final class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAttemptModelDao> {
+public class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAttemptModelDao> {
 
-    public IncompletePaymentAttemptTask(final Janitor janitor, final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
+    private final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner;
+
+    @Inject
+    public IncompletePaymentAttemptTask(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
                                         final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
                                         final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
                                         final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
-        super(janitor, internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
+        super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry);
+        this.pluginControlledPaymentAutomatonRunner = pluginControlledPaymentAutomatonRunner;
     }
 
     @Override
@@ -97,18 +103,18 @@ final class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAttem
             final Account account = accountInternalApi.getAccountById(attempt.getAccountId(), tenantContext);
             final boolean isApiPayment = true; // unclear
             final PaymentStateControlContext paymentStateContext = new PaymentStateControlContext(attempt.toPaymentControlPluginNames(),
-                                                                                                      isApiPayment,
-                                                                                                      transaction.getPaymentId(),
-                                                                                                      attempt.getPaymentExternalKey(),
-                                                                                                      transaction.getTransactionExternalKey(),
-                                                                                                      transaction.getTransactionType(),
-                                                                                                      account,
-                                                                                                      attempt.getPaymentMethodId(),
-                                                                                                      transaction.getAmount(),
-                                                                                                      transaction.getCurrency(),
-                                                                                                      PluginPropertySerializer.deserialize(attempt.getPluginProperties()),
-                                                                                                      internalCallContext,
-                                                                                                      callContext);
+                                                                                                  isApiPayment,
+                                                                                                  transaction.getPaymentId(),
+                                                                                                  attempt.getPaymentExternalKey(),
+                                                                                                  transaction.getTransactionExternalKey(),
+                                                                                                  transaction.getTransactionType(),
+                                                                                                  account,
+                                                                                                  attempt.getPaymentMethodId(),
+                                                                                                  transaction.getAmount(),
+                                                                                                  transaction.getCurrency(),
+                                                                                                  PluginPropertySerializer.deserialize(attempt.getPluginProperties()),
+                                                                                                  internalCallContext,
+                                                                                                  callContext);
 
             paymentStateContext.setAttemptId(attempt.getId()); // Normally set by leavingState Callback
             paymentStateContext.setPaymentTransactionModelDao(transaction); // Normally set by raw state machine
@@ -130,5 +136,4 @@ final class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAttem
         final long delayBeforeNowMs = paymentConfig.getIncompleteAttemptsTimeSpanDelay().getMillis();
         return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
     }
-
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
index ede05e3..f7d87a1 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
@@ -20,7 +20,7 @@ package org.killbill.billing.payment.core.janitor;
 import java.math.BigDecimal;
 import java.util.List;
 
-import javax.annotation.Nullable;
+import javax.inject.Inject;
 
 import org.joda.time.DateTime;
 import org.killbill.billing.account.api.AccountInternalApi;
@@ -33,18 +33,17 @@ import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.core.PaymentTransactionInfoPluginConverter;
 import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
-import org.killbill.billing.payment.core.sm.PluginRoutingPaymentAutomatonRunner;
 import org.killbill.billing.payment.dao.PaymentDao;
 import org.killbill.billing.payment.dao.PaymentMethodModelDao;
 import org.killbill.billing.payment.dao.PaymentModelDao;
 import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
 import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
-import org.killbill.billing.payment.plugin.api.PaymentPluginApiException;
 import org.killbill.billing.payment.plugin.api.PaymentPluginStatus;
 import org.killbill.billing.payment.plugin.api.PaymentTransactionInfoPlugin;
 import org.killbill.billing.payment.provider.DefaultNoOpPaymentInfoPlugin;
 import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.billing.util.callcontext.TenantContext;
 import org.killbill.billing.util.config.PaymentConfig;
 import org.killbill.clock.Clock;
 
@@ -56,17 +55,18 @@ import com.google.common.collect.Iterables;
 
 public class IncompletePaymentTransactionTask extends CompletionTaskBase<PaymentTransactionModelDao> {
 
-    private final static ImmutableList<TransactionStatus> TRANSACTION_STATUSES_TO_CONSIDER = ImmutableList.<TransactionStatus>builder()
+    private static final ImmutableList<TransactionStatus> TRANSACTION_STATUSES_TO_CONSIDER = ImmutableList.<TransactionStatus>builder()
                                                                                                           .add(TransactionStatus.PENDING)
                                                                                                           .add(TransactionStatus.UNKNOWN)
                                                                                                           .build();
-    private final static int MAX_ITEMS_PER_LOOP = 100;
+    private static final int MAX_ITEMS_PER_LOOP = 100;
 
-    public IncompletePaymentTransactionTask(final Janitor janitor, final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
+    @Inject
+    public IncompletePaymentTransactionTask(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
                                             final PaymentDao paymentDao, final Clock clock,
                                             final PaymentStateMachineHelper paymentStateMachineHelper, final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
-                                            final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
-        super(janitor, internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
+                                            final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
+        super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry);
     }
 
     @Override
@@ -80,27 +80,25 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
 
     @Override
     public void doIteration(final PaymentTransactionModelDao paymentTransaction) {
-
         final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(paymentTransaction.getTenantRecordId(), paymentTransaction.getAccountRecordId());
-        final CallContext callContext = createCallContext("IncompletePaymentTransactionTask", internalTenantContext);
+        final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
         final PaymentModelDao payment = paymentDao.getPayment(paymentTransaction.getPaymentId(), internalTenantContext);
 
         final PaymentMethodModelDao paymentMethod = paymentDao.getPaymentMethod(payment.getPaymentMethodId(), internalTenantContext);
         final PaymentPluginApi paymentPluginApi = getPaymentPluginApi(payment, paymentMethod.getPluginName());
 
-
         final PaymentTransactionInfoPlugin undefinedPaymentTransaction = new DefaultNoOpPaymentInfoPlugin(payment.getId(),
-                                                                                                     paymentTransaction.getId(),
-                                                                                                     paymentTransaction.getTransactionType(),
-                                                                                                     paymentTransaction.getAmount(),
-                                                                                                     paymentTransaction.getCurrency(),
-                                                                                                     paymentTransaction.getCreatedDate(),
-                                                                                                     paymentTransaction.getCreatedDate(),
-                                                                                                     PaymentPluginStatus.UNDEFINED,
-                                                                                                     null);
+                                                                                                          paymentTransaction.getId(),
+                                                                                                          paymentTransaction.getTransactionType(),
+                                                                                                          paymentTransaction.getAmount(),
+                                                                                                          paymentTransaction.getCurrency(),
+                                                                                                          paymentTransaction.getCreatedDate(),
+                                                                                                          paymentTransaction.getCreatedDate(),
+                                                                                                          PaymentPluginStatus.UNDEFINED,
+                                                                                                          null);
         PaymentTransactionInfoPlugin paymentTransactionInfoPlugin;
         try {
-            final List<PaymentTransactionInfoPlugin> result = paymentPluginApi.getPaymentInfo(payment.getAccountId(), payment.getId(), ImmutableList.<PluginProperty>of(), callContext);
+            final List<PaymentTransactionInfoPlugin> result = paymentPluginApi.getPaymentInfo(payment.getAccountId(), payment.getId(), ImmutableList.<PluginProperty>of(), tenantContext);
             paymentTransactionInfoPlugin = Iterables.tryFind(result, new Predicate<PaymentTransactionInfoPlugin>() {
                 @Override
                 public boolean apply(final PaymentTransactionInfoPlugin input) {
@@ -116,10 +114,20 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
             paymentTransactionInfoPlugin = undefinedPaymentTransaction;
         }
 
-        //
+        updatePaymentAndTransactionIfNeeded(payment, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+    }
+
+    public boolean updatePaymentAndTransactionIfNeeded(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
+        final CallContext callContext = createCallContext("IncompletePaymentTransactionTask", internalTenantContext);
+
+        // Can happen in the GET case, see PaymentProcessor#toPayment
+        if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(paymentTransaction.getTransactionStatus())) {
+            // Nothing to do
+            return false;
+        }
+
         // First obtain the new transactionStatus,
         // Then compute the new paymentState; this one is mostly interesting in case of success (to compute the lastSuccessPaymentState below)
-        //
         final TransactionStatus transactionStatus = computeNewTransactionStatusFromPaymentTransactionInfoPlugin(paymentTransactionInfoPlugin, paymentTransaction.getTransactionStatus());
         final String newPaymentState;
         switch (transactionStatus) {
@@ -135,10 +143,10 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
             case PLUGIN_FAILURE:
             case UNKNOWN:
             default:
-                log.info("Janitor IncompletePaymentTransactionTask repairing payment {}, transaction {}, bail early...",
-                         new Object[]{payment.getId(), paymentTransaction.getId(), paymentTransaction.getTransactionStatus(), transactionStatus});
+                log.info("Janitor IncompletePaymentTransactionTask unable to repair payment {}, transaction {}: {} -> {}",
+                         payment.getId(), paymentTransaction.getId(), paymentTransaction.getTransactionStatus(), transactionStatus);
                 // We can't get anything interesting from the plugin...
-                return;
+                return false;
         }
 
         // Recompute new lastSuccessPaymentState. This is important to be able to allow new operations on the state machine (for e.g an AUTH_SUCCESS would now allow a CAPTURE operation)
@@ -155,16 +163,17 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
         final String gatewayError = paymentTransactionInfoPlugin != null ? paymentTransactionInfoPlugin.getGatewayError() : paymentTransaction.getGatewayErrorMsg();
 
         log.info("Janitor IncompletePaymentTransactionTask repairing payment {}, transaction {}, transitioning transactionStatus from {} -> {}",
-                 new Object[]{payment.getId(), paymentTransaction.getId(), paymentTransaction.getTransactionStatus(), transactionStatus});
+                 payment.getId(), paymentTransaction.getId(), paymentTransaction.getTransactionStatus(), transactionStatus);
 
         final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(payment.getAccountId(), callContext);
         paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, lastSuccessPaymentState,
                                                            paymentTransaction.getId(), transactionStatus, processedAmount, processedCurrency, gatewayErrorCode, gatewayError, internalCallContext);
 
+        return true;
     }
 
     // Keep the existing currentTransactionStatus if we can't obtain a better answer from the plugin; if not, return the newTransactionStatus
-    private TransactionStatus computeNewTransactionStatusFromPaymentTransactionInfoPlugin(PaymentTransactionInfoPlugin input, final TransactionStatus currentTransactionStatus) {
+    private TransactionStatus computeNewTransactionStatusFromPaymentTransactionInfoPlugin(final PaymentTransactionInfoPlugin input, final TransactionStatus currentTransactionStatus) {
         final TransactionStatus newTransactionStatus = PaymentTransactionInfoPluginConverter.toTransactionStatus(input);
         return (newTransactionStatus != TransactionStatus.UNKNOWN) ? newTransactionStatus : currentTransactionStatus;
     }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
index 73eddf5..032b1a0 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
@@ -23,17 +23,8 @@ import java.util.concurrent.TimeUnit;
 import javax.inject.Inject;
 import javax.inject.Named;
 
-import org.killbill.billing.account.api.AccountInternalApi;
-import org.killbill.billing.osgi.api.OSGIServiceRegistration;
-import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
-import org.killbill.billing.payment.core.sm.PluginRoutingPaymentAutomatonRunner;
-import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
-import org.killbill.billing.payment.dao.PaymentDao;
 import org.killbill.billing.payment.glue.PaymentModule;
-import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
-import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.config.PaymentConfig;
-import org.killbill.clock.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,22 +45,14 @@ public class Janitor {
     private volatile boolean isStopped;
 
     @Inject
-    public Janitor(final AccountInternalApi accountInternalApi,
-                   final PaymentDao paymentDao,
-                   final PaymentConfig paymentConfig,
-                   final Clock clock,
-                   final InternalCallContextFactory internalCallContextFactory,
-                   final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner,
+    public Janitor(final PaymentConfig paymentConfig,
                    @Named(PaymentModule.JANITOR_EXECUTOR_NAMED) final ScheduledExecutorService janitorExecutor,
-                   final PaymentStateMachineHelper paymentSMHelper,
-                   final PaymentControlStateMachineHelper retrySMHelper,
-                   final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
+                   final IncompletePaymentAttemptTask incompletePaymentAttemptTask,
+                   final IncompletePaymentTransactionTask incompletePaymentTransactionTask) {
         this.janitorExecutor = janitorExecutor;
         this.paymentConfig = paymentConfig;
-        this.incompletePaymentAttemptTask = new IncompletePaymentAttemptTask(this, internalCallContextFactory, paymentConfig, paymentDao, clock, paymentSMHelper, retrySMHelper,
-                                                               accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
-        this.incompletePaymentTransactionTask = new IncompletePaymentTransactionTask(this, internalCallContextFactory, paymentConfig, paymentDao, clock, paymentSMHelper, retrySMHelper,
-                                                                   accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
+        this.incompletePaymentAttemptTask = incompletePaymentAttemptTask;
+        this.incompletePaymentTransactionTask = incompletePaymentTransactionTask;
         this.isStopped = false;
     }
 
@@ -95,6 +78,10 @@ public class Janitor {
             log.warn("Janitor is already in a stopped state");
             return;
         }
+
+        incompletePaymentAttemptTask.stop();
+        incompletePaymentTransactionTask.stop();
+
         try {
             /* Previously submitted tasks will be executed with shutdown(); when task executes as a result of shutdown being called
              * or because it was already in its execution loop, it will check for the volatile boolean isStopped flag and
@@ -113,8 +100,4 @@ public class Janitor {
             isStopped = true;
         }
     }
-
-    public boolean isStopped() {
-        return isStopped;
-    }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
index ffc9030..cf6afd1 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
@@ -49,6 +49,7 @@ import org.killbill.billing.payment.api.PaymentTransaction;
 import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.payment.core.janitor.IncompletePaymentTransactionTask;
 import org.killbill.billing.payment.core.sm.PaymentAutomatonRunner;
 import org.killbill.billing.payment.dao.PaymentDao;
 import org.killbill.billing.payment.dao.PaymentModelDao;
@@ -85,6 +86,7 @@ public class PaymentProcessor extends ProcessorBase {
     private static final ImmutableList<PluginProperty> PLUGIN_PROPERTIES = ImmutableList.<PluginProperty>of();
 
     private final PaymentAutomatonRunner paymentAutomatonRunner;
+    private final IncompletePaymentTransactionTask incompletePaymentTransactionTask;
 
     private static final Logger log = LoggerFactory.getLogger(PaymentProcessor.class);
 
@@ -98,9 +100,11 @@ public class PaymentProcessor extends ProcessorBase {
                             final GlobalLocker locker,
                             @Named(PLUGIN_EXECUTOR_NAMED) final ExecutorService executor,
                             final PaymentAutomatonRunner paymentAutomatonRunner,
+                            final IncompletePaymentTransactionTask incompletePaymentTransactionTask,
                             final Clock clock) {
         super(pluginRegistry, accountUserApi, paymentDao, tagUserApi, locker, executor, internalCallContextFactory, invoiceApi, clock);
         this.paymentAutomatonRunner = paymentAutomatonRunner;
+        this.incompletePaymentTransactionTask = incompletePaymentTransactionTask;
     }
 
     public Payment createAuthorization(final boolean isApiPayment, @Nullable final UUID attemptId, final Account account, @Nullable final UUID paymentMethodId, @Nullable final UUID paymentId, final BigDecimal amount, final Currency currency,
@@ -183,7 +187,7 @@ public class PaymentProcessor extends ProcessorBase {
                                                                      pluginInfo = getPaymentTransactionInfoPluginsIfNeeded(pluginApi, paymentModelDao, context);
                                                                  }
 
-                                                                 return toPayment(paymentModelDao, transactionsModelDao, pluginInfo);
+                                                                 return toPayment(paymentModelDao, transactionsModelDao, pluginInfo, tenantContext);
                                                              }
                                                          });
     }
@@ -383,11 +387,11 @@ public class PaymentProcessor extends ProcessorBase {
         final InternalTenantContext tenantContextWithAccountRecordId = getInternalTenantContextWithAccountRecordId(paymentModelDao.getAccountId(), tenantContext);
         final List<PaymentTransactionModelDao> transactionsForPayment = paymentDao.getTransactionsForPayment(paymentModelDao.getId(), tenantContextWithAccountRecordId);
 
-        return toPayment(paymentModelDao, transactionsForPayment, pluginTransactions);
+        return toPayment(paymentModelDao, transactionsForPayment, pluginTransactions, tenantContext);
     }
 
     // Used in bulk get API (getAccountPayments)
-    private Payment toPayment(final PaymentModelDao curPaymentModelDao, final Iterable<PaymentTransactionModelDao> transactionsModelDao, @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions) {
+    private Payment toPayment(final PaymentModelDao curPaymentModelDao, final Iterable<PaymentTransactionModelDao> curTransactionsModelDao, @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions, final InternalTenantContext internalTenantContext) {
         final Ordering<PaymentTransaction> perPaymentTransactionOrdering = Ordering.<PaymentTransaction>from(new Comparator<PaymentTransaction>() {
             @Override
             public int compare(final PaymentTransaction o1, final PaymentTransaction o2) {
@@ -396,34 +400,71 @@ public class PaymentProcessor extends ProcessorBase {
         });
 
         // Need to filter for optimized codepaths looking up by account_record_id
-        final Iterable<PaymentTransactionModelDao> filteredTransactions = Iterables.filter(transactionsModelDao, new Predicate<PaymentTransactionModelDao>() {
+        final Iterable<PaymentTransactionModelDao> filteredTransactions = Iterables.filter(curTransactionsModelDao, new Predicate<PaymentTransactionModelDao>() {
             @Override
             public boolean apply(final PaymentTransactionModelDao curPaymentTransactionModelDao) {
                 return curPaymentTransactionModelDao.getPaymentId().equals(curPaymentModelDao.getId());
             }
         });
 
-        final Iterable<PaymentTransaction> transactions = Iterables.transform(filteredTransactions, new Function<PaymentTransactionModelDao, PaymentTransaction>() {
-            @Override
-            public PaymentTransaction apply(final PaymentTransactionModelDao paymentTransactionModelDao) {
-                final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin = pluginTransactions != null ?
-                                                                                  Iterables.tryFind(pluginTransactions, new Predicate<PaymentTransactionInfoPlugin>() {
-                                                                                      @Override
-                                                                                      public boolean apply(final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin) {
-                                                                                          return paymentTransactionModelDao.getId().equals(paymentTransactionInfoPlugin.getKbTransactionPaymentId());
-                                                                                      }
-                                                                                  }).orNull() : null;
-
-                return new DefaultPaymentTransaction(paymentTransactionModelDao.getId(), paymentTransactionModelDao.getAttemptId(), paymentTransactionModelDao.getTransactionExternalKey(), paymentTransactionModelDao.getCreatedDate(), paymentTransactionModelDao.getUpdatedDate(), paymentTransactionModelDao.getPaymentId(),
-                                                     paymentTransactionModelDao.getTransactionType(), paymentTransactionModelDao.getEffectiveDate(), paymentTransactionModelDao.getTransactionStatus(), paymentTransactionModelDao.getAmount(), paymentTransactionModelDao.getCurrency(),
-                                                     paymentTransactionModelDao.getProcessedAmount(), paymentTransactionModelDao.getProcessedCurrency(),
-                                                     paymentTransactionModelDao.getGatewayErrorCode(), paymentTransactionModelDao.getGatewayErrorMsg(), paymentTransactionInfoPlugin);
+        PaymentModelDao newPaymentModelDao = curPaymentModelDao;
+        final Collection<PaymentTransaction> transactions = new LinkedList<PaymentTransaction>();
+        for (final PaymentTransactionModelDao curPaymentTransactionModelDao : filteredTransactions) {
+            PaymentTransactionModelDao newPaymentTransactionModelDao = curPaymentTransactionModelDao;
+
+            final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin = findPaymentTransactionInfoPlugin(newPaymentTransactionModelDao, pluginTransactions);
+            if (paymentTransactionInfoPlugin != null) {
+                // Make sure to invoke the Janitor task in case the plugin fixes its state on the fly
+                // See https://github.com/killbill/killbill/issues/341
+                final boolean hasChanged = incompletePaymentTransactionTask.updatePaymentAndTransactionIfNeeded(newPaymentModelDao, newPaymentTransactionModelDao, paymentTransactionInfoPlugin, internalTenantContext);
+                if (hasChanged) {
+                    newPaymentModelDao = paymentDao.getPayment(newPaymentModelDao.getId(), internalTenantContext);
+                    newPaymentTransactionModelDao = paymentDao.getPaymentTransaction(newPaymentTransactionModelDao.getId(), internalTenantContext);
+                }
             }
-        });
+
+            final PaymentTransaction transaction = new DefaultPaymentTransaction(newPaymentTransactionModelDao.getId(),
+                                                                                 newPaymentTransactionModelDao.getAttemptId(),
+                                                                                 newPaymentTransactionModelDao.getTransactionExternalKey(),
+                                                                                 newPaymentTransactionModelDao.getCreatedDate(),
+                                                                                 newPaymentTransactionModelDao.getUpdatedDate(),
+                                                                                 newPaymentTransactionModelDao.getPaymentId(),
+                                                                                 newPaymentTransactionModelDao.getTransactionType(),
+                                                                                 newPaymentTransactionModelDao.getEffectiveDate(),
+                                                                                 newPaymentTransactionModelDao.getTransactionStatus(),
+                                                                                 newPaymentTransactionModelDao.getAmount(),
+                                                                                 newPaymentTransactionModelDao.getCurrency(),
+                                                                                 newPaymentTransactionModelDao.getProcessedAmount(),
+                                                                                 newPaymentTransactionModelDao.getProcessedCurrency(),
+                                                                                 newPaymentTransactionModelDao.getGatewayErrorCode(),
+                                                                                 newPaymentTransactionModelDao.getGatewayErrorMsg(),
+                                                                                 paymentTransactionInfoPlugin);
+            transactions.add(transaction);
+        }
 
         final List<PaymentTransaction> sortedTransactions = perPaymentTransactionOrdering.immutableSortedCopy(transactions);
-        return new DefaultPayment(curPaymentModelDao.getId(), curPaymentModelDao.getCreatedDate(), curPaymentModelDao.getUpdatedDate(), curPaymentModelDao.getAccountId(),
-                                  curPaymentModelDao.getPaymentMethodId(), curPaymentModelDao.getPaymentNumber(), curPaymentModelDao.getExternalKey(), sortedTransactions);
+        return new DefaultPayment(curPaymentModelDao.getId(),
+                                  curPaymentModelDao.getCreatedDate(),
+                                  curPaymentModelDao.getUpdatedDate(),
+                                  curPaymentModelDao.getAccountId(),
+                                  curPaymentModelDao.getPaymentMethodId(),
+                                  curPaymentModelDao.getPaymentNumber(),
+                                  curPaymentModelDao.getExternalKey(),
+                                  sortedTransactions);
+    }
+
+    private PaymentTransactionInfoPlugin findPaymentTransactionInfoPlugin(final PaymentTransactionModelDao paymentTransactionModelDao, @Nullable final Iterable<PaymentTransactionInfoPlugin> pluginTransactions) {
+        if (pluginTransactions == null) {
+            return null;
+        }
+
+        return Iterables.tryFind(pluginTransactions,
+                                 new Predicate<PaymentTransactionInfoPlugin>() {
+                                     @Override
+                                     public boolean apply(final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin) {
+                                         return paymentTransactionModelDao.getId().equals(paymentTransactionInfoPlugin.getKbTransactionPaymentId());
+                                     }
+                                 }).orNull();
     }
 
     private InternalTenantContext getInternalTenantContextWithAccountRecordId(final UUID accountId, final InternalTenantContext tenantContext) {
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
index 4bd9153..8053d33 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
@@ -37,18 +37,20 @@ import org.killbill.billing.payment.api.PaymentApi;
 import org.killbill.billing.payment.api.PaymentGatewayApi;
 import org.killbill.billing.payment.api.PaymentService;
 import org.killbill.billing.payment.bus.InvoiceHandler;
-import org.killbill.billing.payment.invoice.PaymentTagHandler;
-import org.killbill.billing.payment.invoice.dao.InvoicePaymentRoutingDao;
-import org.killbill.billing.payment.core.janitor.Janitor;
 import org.killbill.billing.payment.core.PaymentGatewayProcessor;
 import org.killbill.billing.payment.core.PaymentMethodProcessor;
 import org.killbill.billing.payment.core.PaymentProcessor;
 import org.killbill.billing.payment.core.PluginRoutingPaymentProcessor;
+import org.killbill.billing.payment.core.janitor.IncompletePaymentAttemptTask;
+import org.killbill.billing.payment.core.janitor.IncompletePaymentTransactionTask;
+import org.killbill.billing.payment.core.janitor.Janitor;
+import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PluginRoutingPaymentAutomatonRunner;
-import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
 import org.killbill.billing.payment.dao.DefaultPaymentDao;
 import org.killbill.billing.payment.dao.PaymentDao;
+import org.killbill.billing.payment.invoice.PaymentTagHandler;
+import org.killbill.billing.payment.invoice.dao.InvoicePaymentRoutingDao;
 import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
 import org.killbill.billing.payment.retry.BaseRetryService.RetryServiceScheduler;
 import org.killbill.billing.payment.retry.DefaultRetryService;
@@ -101,6 +103,8 @@ public class PaymentModule extends KillBillModule {
         final ScheduledExecutorService janitorExecutor = org.killbill.commons.concurrent.Executors.newSingleThreadScheduledExecutor("PaymentJanitor");
         bind(ScheduledExecutorService.class).annotatedWith(Names.named(JANITOR_EXECUTOR_NAMED)).toInstance(janitorExecutor);
 
+        bind(IncompletePaymentTransactionTask.class).asEagerSingleton();
+        bind(IncompletePaymentAttemptTask.class).asEagerSingleton();
         bind(Janitor.class).asEagerSingleton();
     }