killbill-aplcache

payment: make IncompletePaymentTransactionTask resilient

12/13/2016 7:07:23 AM

Details

diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
index a6b0780..a526818 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
@@ -100,6 +100,10 @@ abstract class CompletionTaskBase<T> implements Runnable {
         }
     }
 
+    public synchronized void start() {
+        this.isStopped = false;
+    }
+
     public synchronized void stop() {
         this.isStopped = true;
     }
@@ -119,15 +123,22 @@ abstract class CompletionTaskBase<T> implements Runnable {
     }
 
     protected <T> T doJanitorOperationWithAccountLock(final JanitorIterationCallback callback, final InternalTenantContext internalTenantContext) {
+        try {
+            return tryToDoJanitorOperationWithAccountLock(callback, internalTenantContext);
+        } catch (final LockFailedException e) {
+            log.warn("Error locking accountRecordId='{}'", internalTenantContext.getAccountRecordId(), e);
+        }
+        return null;
+    }
+
+    protected <T> T tryToDoJanitorOperationWithAccountLock(final JanitorIterationCallback callback, final InternalTenantContext internalTenantContext) throws LockFailedException {
         GlobalLock lock = null;
         try {
             final ImmutableAccountData account = accountInternalApi.getImmutableAccountDataByRecordId(internalTenantContext.getAccountRecordId(), internalTenantContext);
             lock = locker.lockWithNumberOfTries(LockerType.ACCNT_INV_PAY.toString(), account.getExternalKey(), paymentConfig.getMaxGlobalLockRetries());
             return callback.doIteration();
-        } catch (AccountApiException e) {
+        } catch (final AccountApiException e) {
             log.warn("Error retrieving accountRecordId='{}'", internalTenantContext.getAccountRecordId(), e);
-        } catch (LockFailedException e) {
-            log.warn("Error locking accountRecordId='{}'", internalTenantContext.getAccountRecordId(), e);
         } finally {
             if (lock != null) {
                 lock.release();
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
index 5fd6dd2..72f050f 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
@@ -51,6 +51,7 @@ import org.killbill.billing.util.callcontext.TenantContext;
 import org.killbill.billing.util.config.definition.PaymentConfig;
 import org.killbill.clock.Clock;
 import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.commons.locker.LockFailedException;
 import org.killbill.notificationq.api.NotificationEvent;
 import org.killbill.notificationq.api.NotificationQueue;
 import org.skife.config.TimeSpan;
@@ -93,12 +94,19 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
     }
 
     public void processNotification(final JanitorNotificationKey notificationKey, final UUID userToken, final Long accountRecordId, final long tenantRecordId) {
+        try {
+            tryToProcessNotification(notificationKey, userToken, accountRecordId, tenantRecordId);
+        } catch (final LockFailedException e) {
+            log.warn("Error locking accountRecordId='{}', will attempt to retry later", accountRecordId, e);
+            insertNewNotificationForUnresolvedTransactionIfNeeded(notificationKey.getUuidKey(), notificationKey.getAttemptNumber(), userToken, accountRecordId, tenantRecordId);
+        }
+    }
 
+    public void tryToProcessNotification(final JanitorNotificationKey notificationKey, final UUID userToken, final Long accountRecordId, final long tenantRecordId) throws LockFailedException {
         final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId);
-        doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
+        tryToDoJanitorOperationWithAccountLock(new JanitorIterationCallback() {
             @Override
             public Void doIteration() {
-
                 // State may have changed since we originally retrieved with no lock
                 final PaymentTransactionModelDao rehydratedPaymentTransaction = paymentDao.getPaymentTransaction(notificationKey.getUuidKey(), internalTenantContext);
 
@@ -139,7 +147,6 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
                 return null;
             }
         }, internalTenantContext);
-
     }
 
     @Override
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
index 30456c4..cb340c6 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
@@ -24,19 +24,10 @@ import java.util.concurrent.TimeUnit;
 import javax.inject.Inject;
 
 import org.joda.time.DateTime;
-import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.events.PaymentInternalEvent;
-import org.killbill.billing.osgi.api.OSGIServiceRegistration;
 import org.killbill.billing.payment.core.PaymentExecutors;
-import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
-import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
-import org.killbill.billing.payment.core.sm.PluginControlPaymentAutomatonRunner;
-import org.killbill.billing.payment.dao.PaymentDao;
 import org.killbill.billing.payment.glue.DefaultPaymentService;
-import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
-import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.config.definition.PaymentConfig;
-import org.killbill.clock.Clock;
 import org.killbill.commons.locker.GlobalLocker;
 import org.killbill.notificationq.api.NotificationEvent;
 import org.killbill.notificationq.api.NotificationQueue;
@@ -60,55 +51,28 @@ public class Janitor {
     private final NotificationQueueService notificationQueueService;
     private final PaymentConfig paymentConfig;
     private final PaymentExecutors paymentExecutors;
-    private final Clock clock;
-    private final PaymentDao paymentDao;
-    private final InternalCallContextFactory internalCallContextFactory;
-    private final PaymentStateMachineHelper paymentStateMachineHelper;
-    private final PaymentControlStateMachineHelper retrySMHelper;
-    private final AccountInternalApi accountInternalApi;
-    private final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
-    private final GlobalLocker locker;
-    private final PluginControlPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner;
 
-
-
-
-    private IncompletePaymentAttemptTask incompletePaymentAttemptTask;
-    private IncompletePaymentTransactionTask incompletePaymentTransactionTask;
+    private final IncompletePaymentAttemptTask incompletePaymentAttemptTask;
+    private final IncompletePaymentTransactionTask incompletePaymentTransactionTask;
     private NotificationQueue janitorQueue;
     private ScheduledExecutorService janitorExecutor;
 
     private volatile boolean isStopped;
 
     @Inject
-    public Janitor(final InternalCallContextFactory internalCallContextFactory,
-                   final PaymentDao paymentDao,
-                   final Clock clock,
-                   final PaymentStateMachineHelper paymentStateMachineHelper,
-                   final PaymentControlStateMachineHelper retrySMHelper,
-                   final AccountInternalApi accountInternalApi,
-                   final PluginControlPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner,
-                   final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry,
+    public Janitor(final IncompletePaymentAttemptTask incompletePaymentAttemptTask,
+                   final IncompletePaymentTransactionTask incompletePaymentTransactionTask,
                    final GlobalLocker locker,
                    final PaymentConfig paymentConfig,
                    final NotificationQueueService notificationQueueService,
                    final PaymentExecutors paymentExecutors) {
+        this.incompletePaymentAttemptTask = incompletePaymentAttemptTask;
+        this.incompletePaymentTransactionTask = incompletePaymentTransactionTask;
         this.notificationQueueService = notificationQueueService;
         this.paymentExecutors = paymentExecutors;
         this.paymentConfig = paymentConfig;
-        this.internalCallContextFactory = internalCallContextFactory;
-        this.paymentDao = paymentDao;
-        this.clock = clock;
-        this.pluginControlledPaymentAutomatonRunner = pluginControlledPaymentAutomatonRunner;
-        this.paymentStateMachineHelper = paymentStateMachineHelper;
-        this.retrySMHelper = retrySMHelper;
-        this.accountInternalApi = accountInternalApi;
-        this.pluginRegistry = pluginRegistry;
-        this.locker = locker;
-
     }
 
-
     public void initialize() throws NotificationQueueAlreadyExists {
         janitorQueue = notificationQueueService.createNotificationQueue(DefaultPaymentService.SERVICE_NAME,
                                                                         QUEUE_NAME,
@@ -128,36 +92,16 @@ public class Janitor {
                                                                         }
                                                                        );
 
-        this.incompletePaymentAttemptTask = new IncompletePaymentAttemptTask(internalCallContextFactory,
-                                                                             paymentConfig,
-                                                                             paymentDao,
-                                                                             clock,
-                                                                             paymentStateMachineHelper,
-                                                                             retrySMHelper,
-                                                                             accountInternalApi,
-                                                                             pluginControlledPaymentAutomatonRunner,
-                                                                             pluginRegistry,
-                                                                             locker);
-
-        this.incompletePaymentTransactionTask = new IncompletePaymentTransactionTask(internalCallContextFactory,
-                                                                                     paymentConfig,
-                                                                                     paymentDao,
-                                                                                     clock,
-                                                                                     paymentStateMachineHelper,
-                                                                                     retrySMHelper,
-                                                                                     accountInternalApi,
-                                                                                     pluginRegistry,
-                                                                                     locker);
-
-
         incompletePaymentTransactionTask.attachJanitorQueue(janitorQueue);
         incompletePaymentAttemptTask.attachJanitorQueue(janitorQueue);
     }
 
     public void start() {
-
         this.isStopped = false;
 
+        incompletePaymentAttemptTask.start();
+        incompletePaymentTransactionTask.start();
+
         janitorExecutor = paymentExecutors.getJanitorExecutorService();
 
         janitorQueue.startQueue();
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
index 9b066d3..0a31eee 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
@@ -41,6 +41,8 @@ import org.killbill.billing.payment.core.PaymentGatewayProcessor;
 import org.killbill.billing.payment.core.PaymentMethodProcessor;
 import org.killbill.billing.payment.core.PaymentProcessor;
 import org.killbill.billing.payment.core.PluginControlPaymentProcessor;
+import org.killbill.billing.payment.core.janitor.IncompletePaymentAttemptTask;
+import org.killbill.billing.payment.core.janitor.IncompletePaymentTransactionTask;
 import org.killbill.billing.payment.core.janitor.Janitor;
 import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
@@ -127,6 +129,8 @@ public class PaymentModule extends KillBillModule {
     }
 
     protected void installProcessors(final PaymentConfig paymentConfig) {
+        bind(IncompletePaymentAttemptTask.class).asEagerSingleton();
+        bind(IncompletePaymentTransactionTask.class).asEagerSingleton();
         bind(PaymentProcessor.class).asEagerSingleton();
         bind(PluginControlPaymentProcessor.class).asEagerSingleton();
         bind(PaymentGatewayProcessor.class).asEagerSingleton();
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
new file mode 100644
index 0000000..c764c8a
--- /dev/null
+++ b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core.janitor;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.UUID;
+
+import org.killbill.billing.account.api.Account;
+import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.payment.PaymentTestSuiteWithEmbeddedDB;
+import org.killbill.billing.payment.api.Payment;
+import org.killbill.billing.payment.api.PaymentApiException;
+import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.payment.plugin.api.PaymentPluginStatus;
+import org.killbill.billing.payment.provider.MockPaymentProviderPlugin;
+import org.killbill.billing.util.globallocker.LockerType;
+import org.killbill.commons.locker.GlobalLock;
+import org.killbill.commons.locker.LockFailedException;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class TestIncompletePaymentTransactionTaskWithDB extends PaymentTestSuiteWithEmbeddedDB {
+
+    private MockPaymentProviderPlugin mockPaymentProviderPlugin;
+    private Account account;
+
+    @BeforeClass(groups = "slow")
+    protected void beforeClass() throws Exception {
+        super.beforeClass();
+
+        mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
+    }
+
+    @BeforeMethod(groups = "slow")
+    public void beforeMethod() throws Exception {
+        super.beforeMethod();
+
+        mockPaymentProviderPlugin.clear();
+        account = testHelper.createTestAccount(UUID.randomUUID().toString(), true);
+    }
+
+    @Test(groups = "slow", description = "https://github.com/killbill/killbill/issues/675")
+    public void testHandleLockExceptions() throws PaymentApiException {
+        final Payment payment = paymentApi.createPurchase(account,
+                                                          account.getPaymentMethodId(),
+                                                          null,
+                                                          BigDecimal.TEN,
+                                                          Currency.EUR,
+                                                          UUID.randomUUID().toString(),
+                                                          UUID.randomUUID().toString(),
+                                                          ImmutableList.<PluginProperty>of(new PluginProperty(MockPaymentProviderPlugin.PLUGIN_PROPERTY_PAYMENT_PLUGIN_STATUS_OVERRIDE, PaymentPluginStatus.PENDING.toString(), false)),
+                                                          callContext);
+
+        final UUID transactionId = payment.getTransactions().get(0).getId();
+        final JanitorNotificationKey notificationKey = new JanitorNotificationKey(transactionId, incompletePaymentTransactionTask.getClass().toString(), 1);
+        final UUID userToken = UUID.randomUUID();
+
+        Assert.assertTrue(incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()).isEmpty());
+
+        GlobalLock lock = null;
+        try {
+            lock = locker.lockWithNumberOfTries(LockerType.ACCNT_INV_PAY.toString(), account.getExternalKey(), paymentConfig.getMaxGlobalLockRetries());
+
+            incompletePaymentTransactionTask.processNotification(notificationKey, userToken, internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+
+            final List<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+            Assert.assertFalse(futureNotifications.isEmpty());
+            Assert.assertEquals(futureNotifications.get(0).getUserToken(), userToken);
+            Assert.assertEquals(futureNotifications.get(0).getEvent().getClass(), JanitorNotificationKey.class);
+            final JanitorNotificationKey event = (JanitorNotificationKey) futureNotifications.get(0).getEvent();
+            Assert.assertEquals(event.getUuidKey(), transactionId);
+            Assert.assertEquals((int) event.getAttemptNumber(), 2);
+
+            // Based on config "15s,1m,3m,1h,1d,1d,1d,1d,1d"
+            Assert.assertTrue(futureNotifications.get(0).getEffectiveDate().compareTo(clock.getUTCNow().plusMinutes(1).plusSeconds(1)) < 0);
+        } catch (final LockFailedException e) {
+            Assert.fail();
+        } finally {
+            if (lock != null) {
+                lock.release();
+            }
+        }
+    }
+}
diff --git a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
index 73cc4ea..4f08145 100644
--- a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
@@ -30,6 +30,8 @@ import org.killbill.billing.payment.caching.StateMachineConfigCache;
 import org.killbill.billing.payment.core.PaymentExecutors;
 import org.killbill.billing.payment.core.PaymentMethodProcessor;
 import org.killbill.billing.payment.core.PaymentProcessor;
+import org.killbill.billing.payment.core.janitor.IncompletePaymentTransactionTask;
+import org.killbill.billing.payment.core.janitor.Janitor;
 import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.dao.PaymentDao;
 import org.killbill.billing.payment.glue.PaymentModule;
@@ -40,6 +42,7 @@ import org.killbill.billing.platform.api.KillbillConfigSource;
 import org.killbill.billing.util.config.definition.PaymentConfig;
 import org.killbill.billing.util.dao.NonEntityDao;
 import org.killbill.bus.api.PersistentBus;
+import org.killbill.commons.locker.GlobalLocker;
 import org.killbill.commons.profiling.Profiling;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -88,6 +91,12 @@ public abstract class PaymentTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
     protected NonEntityDao nonEntityDao;
     @Inject
     protected StateMachineConfigCache stateMachineConfigCache;
+    @Inject
+    protected Janitor janitor;
+    @Inject
+    protected IncompletePaymentTransactionTask incompletePaymentTransactionTask;
+    @Inject
+    protected GlobalLocker locker;
 
     @Override
     protected KillbillConfigSource getConfigSource() {
@@ -113,10 +122,14 @@ public abstract class PaymentTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
         eventBus.start();
         Profiling.resetPerThreadProfilingData();
         clock.resetDeltaFromReality();
+
+        janitor.initialize();
+        janitor.start();
     }
 
     @AfterMethod(groups = "slow")
     public void afterMethod() throws Exception {
+        janitor.stop();
         eventBus.stop();
         paymentExecutors.stop();
     }
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
index c248d23..77697da 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -101,8 +101,6 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
     @Inject
     protected NotificationQueueService notificationQueueService;
     @Inject
-    private Janitor janitor;
-    @Inject
     private PaymentBusEventHandler handler;
     private MockPaymentProviderPlugin mockPaymentProviderPlugin;
 
@@ -123,15 +121,10 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
         mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
     }
 
-    @AfterClass(groups = "slow")
-    protected void afterClass() throws Exception {
-    }
-
     @BeforeMethod(groups = "slow")
     public void beforeMethod() throws Exception {
         super.beforeMethod();
-        janitor.initialize();
-        janitor.start();
+
         eventBus.register(handler);
         testListener.reset();
         eventBus.register(testListener);
@@ -145,7 +138,6 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
     public void afterMethod() throws Exception {
         testListener.assertListenerStatus();
 
-        janitor.stop();
         eventBus.unregister(handler);
         eventBus.unregister(testListener);
         super.afterMethod();