killbill-aplcache

Details

diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
index ac1a218..81672ff 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
@@ -19,10 +19,13 @@ package org.killbill.billing.payment.core.janitor;
 
 import java.util.List;
 
+import org.killbill.billing.account.api.Account;
+import org.killbill.billing.account.api.AccountApiException;
 import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.callcontext.DefaultCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.core.ProcessorBase;
 import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.dao.PaymentDao;
@@ -34,7 +37,11 @@ import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.TenantContext;
 import org.killbill.billing.util.callcontext.UserType;
 import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.billing.util.globallocker.LockerType;
 import org.killbill.clock.Clock;
+import org.killbill.commons.locker.GlobalLock;
+import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.commons.locker.LockFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,13 +57,14 @@ abstract class CompletionTaskBase<T> implements Runnable {
     protected final PaymentControlStateMachineHelper retrySMHelper;
     protected final AccountInternalApi accountInternalApi;
     protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
+    protected final GlobalLocker locker;
 
     private volatile boolean isStopped;
 
     public CompletionTaskBase(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
                               final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
                               final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
-                              final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
+                              final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry, final GlobalLocker locker) {
         this.internalCallContextFactory = internalCallContextFactory;
         this.paymentConfig = paymentConfig;
         this.paymentDao = paymentDao;
@@ -65,6 +73,7 @@ abstract class CompletionTaskBase<T> implements Runnable {
         this.retrySMHelper = retrySMHelper;
         this.accountInternalApi = accountInternalApi;
         this.pluginRegistry = pluginRegistry;
+        this.locker = locker;
         this.isStopped = false;
     }
 
@@ -96,6 +105,28 @@ abstract class CompletionTaskBase<T> implements Runnable {
 
     public abstract void doIteration(final T item);
 
+    public interface JanitorIterationCallback {
+        public <T>  T doIteration();
+    }
+
+    protected <T> T doJanitorOperationWithAccountLock(final JanitorIterationCallback callback, final Long accountRecordId, final InternalTenantContext internalTenantContext) {
+        GlobalLock lock = null;
+        try {
+            final Account account = accountInternalApi.getAccountByRecordId(accountRecordId, internalTenantContext);
+            lock = locker.lockWithNumberOfTries(LockerType.ACCNT_INV_PAY.toString(), account.getExternalKey(), ProcessorBase.NB_LOCK_TRY);
+            return callback.doIteration();
+        } catch (AccountApiException e) {
+            log.warn(String.format("Janitor failed to retrieve account with recordId %s", accountRecordId), e);
+        } catch (LockFailedException e) {
+            log.warn(String.format("Janitor failed to grab account with recordId %s", accountRecordId), e);
+        } finally {
+            if (lock != null) {
+                lock.release();
+            }
+        }
+        return null;
+    }
+
     protected CallContext createCallContext(final String taskName, final InternalTenantContext internalTenantContext) {
         final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
         return new DefaultCallContext(tenantContext.getTenantId(), taskName, CallOrigin.INTERNAL, UserType.SYSTEM, UUIDs.randomUUID(), clock);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
index ddd73dc..040b81b 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
@@ -44,6 +44,7 @@ import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.config.PaymentConfig;
 import org.killbill.clock.Clock;
+import org.killbill.commons.locker.GlobalLocker;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
@@ -62,8 +63,9 @@ public class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAtte
     public IncompletePaymentAttemptTask(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
                                         final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
                                         final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
-                                        final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
-        super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry);
+                                        final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner,
+                                        final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry, final GlobalLocker locker) {
+        super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry, locker);
         this.pluginControlledPaymentAutomatonRunner = pluginControlledPaymentAutomatonRunner;
     }
 
@@ -78,6 +80,7 @@ public class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAtte
 
     @Override
     public void doIteration(final PaymentAttemptModelDao attempt) {
+        // We don't grab account lock here as the lock will be taken when calling the completeRun API.
         final InternalTenantContext tenantContext = internalCallContextFactory.createInternalTenantContext(attempt.getTenantRecordId(), attempt.getAccountRecordId());
         final CallContext callContext = createCallContext("AttemptCompletionJanitorTask", tenantContext);
         final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(attempt.getAccountId(), callContext);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
index f7d87a1..010b216 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
@@ -23,11 +23,14 @@ import java.util.List;
 import javax.inject.Inject;
 
 import org.joda.time.DateTime;
+import org.killbill.billing.ErrorCode;
+import org.killbill.billing.account.api.AccountApiException;
 import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.api.Currency;
 import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.api.PaymentApiException;
 import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.core.PaymentTransactionInfoPluginConverter;
@@ -46,6 +49,7 @@ import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.TenantContext;
 import org.killbill.billing.util.config.PaymentConfig;
 import org.killbill.clock.Clock;
+import org.killbill.commons.locker.GlobalLocker;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -65,8 +69,8 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
     public IncompletePaymentTransactionTask(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
                                             final PaymentDao paymentDao, final Clock clock,
                                             final PaymentStateMachineHelper paymentStateMachineHelper, final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
-                                            final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
-        super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry);
+                                            final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry, final GlobalLocker locker) {
+        super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry, locker);
     }
 
     @Override
@@ -80,44 +84,60 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
 
     @Override
     public void doIteration(final PaymentTransactionModelDao paymentTransaction) {
+
         final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(paymentTransaction.getTenantRecordId(), paymentTransaction.getAccountRecordId());
-        final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
-        final PaymentModelDao payment = paymentDao.getPayment(paymentTransaction.getPaymentId(), internalTenantContext);
-
-        final PaymentMethodModelDao paymentMethod = paymentDao.getPaymentMethod(payment.getPaymentMethodId(), internalTenantContext);
-        final PaymentPluginApi paymentPluginApi = getPaymentPluginApi(payment, paymentMethod.getPluginName());
-
-        final PaymentTransactionInfoPlugin undefinedPaymentTransaction = new DefaultNoOpPaymentInfoPlugin(payment.getId(),
-                                                                                                          paymentTransaction.getId(),
-                                                                                                          paymentTransaction.getTransactionType(),
-                                                                                                          paymentTransaction.getAmount(),
-                                                                                                          paymentTransaction.getCurrency(),
-                                                                                                          paymentTransaction.getCreatedDate(),
-                                                                                                          paymentTransaction.getCreatedDate(),
-                                                                                                          PaymentPluginStatus.UNDEFINED,
-                                                                                                          null);
-        PaymentTransactionInfoPlugin paymentTransactionInfoPlugin;
-        try {
-            final List<PaymentTransactionInfoPlugin> result = paymentPluginApi.getPaymentInfo(payment.getAccountId(), payment.getId(), ImmutableList.<PluginProperty>of(), tenantContext);
-            paymentTransactionInfoPlugin = Iterables.tryFind(result, new Predicate<PaymentTransactionInfoPlugin>() {
-                @Override
-                public boolean apply(final PaymentTransactionInfoPlugin input) {
-                    return input.getKbTransactionPaymentId().equals(paymentTransaction.getId());
-                }
-            }).or(new Supplier<PaymentTransactionInfoPlugin>() {
-                @Override
-                public PaymentTransactionInfoPlugin get() {
-                    return undefinedPaymentTransaction;
+        doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
+            @Override
+            public Void doIteration() {
+                final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
+                final PaymentModelDao payment = paymentDao.getPayment(paymentTransaction.getPaymentId(), internalTenantContext);
+
+                final PaymentMethodModelDao paymentMethod = paymentDao.getPaymentMethod(payment.getPaymentMethodId(), internalTenantContext);
+                final PaymentPluginApi paymentPluginApi = getPaymentPluginApi(payment, paymentMethod.getPluginName());
+
+                final PaymentTransactionInfoPlugin undefinedPaymentTransaction = new DefaultNoOpPaymentInfoPlugin(payment.getId(),
+                                                                                                                  paymentTransaction.getId(),
+                                                                                                                  paymentTransaction.getTransactionType(),
+                                                                                                                  paymentTransaction.getAmount(),
+                                                                                                                  paymentTransaction.getCurrency(),
+                                                                                                                  paymentTransaction.getCreatedDate(),
+                                                                                                                  paymentTransaction.getCreatedDate(),
+                                                                                                                  PaymentPluginStatus.UNDEFINED,
+                                                                                                                  null);
+                PaymentTransactionInfoPlugin paymentTransactionInfoPlugin;
+                try {
+                    final List<PaymentTransactionInfoPlugin> result = paymentPluginApi.getPaymentInfo(payment.getAccountId(), payment.getId(), ImmutableList.<PluginProperty>of(), tenantContext);
+                    paymentTransactionInfoPlugin = Iterables.tryFind(result, new Predicate<PaymentTransactionInfoPlugin>() {
+                        @Override
+                        public boolean apply(final PaymentTransactionInfoPlugin input) {
+                            return input.getKbTransactionPaymentId().equals(paymentTransaction.getId());
+                        }
+                    }).or(new Supplier<PaymentTransactionInfoPlugin>() {
+                        @Override
+                        public PaymentTransactionInfoPlugin get() {
+                            return undefinedPaymentTransaction;
+                        }
+                    });
+                } catch (final Exception e) {
+                    paymentTransactionInfoPlugin = undefinedPaymentTransaction;
                 }
-            });
-        } catch (final Exception e) {
-            paymentTransactionInfoPlugin = undefinedPaymentTransaction;
-        }
+                updatePaymentAndTransactionIfNeeded(payment, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+                return null;
+            }
+        }, paymentTransaction.getAccountRecordId(), internalTenantContext);
+
+    }
 
-        updatePaymentAndTransactionIfNeeded(payment, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+    public Boolean updatePaymentAndTransactionIfNeededWithAccountLock(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
+        return doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
+            @Override
+            public Boolean doIteration() {
+                return updatePaymentAndTransactionIfNeeded(payment, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+            }
+        }, internalTenantContext.getAccountRecordId(), internalTenantContext);
     }
 
-    public boolean updatePaymentAndTransactionIfNeeded(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
+    private boolean updatePaymentAndTransactionIfNeeded(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
         final CallContext callContext = createCallContext("IncompletePaymentTransactionTask", internalTenantContext);
 
         // Can happen in the GET case, see PaymentProcessor#toPayment
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
index cf6afd1..69b2651 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
@@ -413,11 +413,11 @@ public class PaymentProcessor extends ProcessorBase {
             PaymentTransactionModelDao newPaymentTransactionModelDao = curPaymentTransactionModelDao;
 
             final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin = findPaymentTransactionInfoPlugin(newPaymentTransactionModelDao, pluginTransactions);
-            if (paymentTransactionInfoPlugin != null) {
+            if (paymentTransactionInfoPlugin != null && curPaymentTransactionModelDao.getTransactionStatus() == TransactionStatus.UNKNOWN) {
                 // Make sure to invoke the Janitor task in case the plugin fixes its state on the fly
                 // See https://github.com/killbill/killbill/issues/341
-                final boolean hasChanged = incompletePaymentTransactionTask.updatePaymentAndTransactionIfNeeded(newPaymentModelDao, newPaymentTransactionModelDao, paymentTransactionInfoPlugin, internalTenantContext);
-                if (hasChanged) {
+                final Boolean hasChanged = incompletePaymentTransactionTask.updatePaymentAndTransactionIfNeededWithAccountLock(newPaymentModelDao, newPaymentTransactionModelDao, paymentTransactionInfoPlugin, internalTenantContext);
+                if (hasChanged != null && hasChanged) {
                     newPaymentModelDao = paymentDao.getPayment(newPaymentModelDao.getId(), internalTenantContext);
                     newPaymentTransactionModelDao = paymentDao.getPaymentTransaction(newPaymentTransactionModelDao.getId(), internalTenantContext);
                 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java b/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
index 77a1265..ae4e3a2 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
@@ -67,7 +67,7 @@ import com.google.common.collect.Iterables;
 
 public abstract class ProcessorBase {
 
-    private static final int NB_LOCK_TRY = 5;
+    public static final int NB_LOCK_TRY = 5;
 
     protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
     protected final AccountInternalApi accountInternalApi;