killbill-aplcache
Changes
payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java 13(+10 -3)
Details
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
index a6b0780..a526818 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
@@ -100,6 +100,10 @@ abstract class CompletionTaskBase<T> implements Runnable {
}
}
+ public synchronized void start() {
+ this.isStopped = false;
+ }
+
public synchronized void stop() {
this.isStopped = true;
}
@@ -119,15 +123,22 @@ abstract class CompletionTaskBase<T> implements Runnable {
}
protected <T> T doJanitorOperationWithAccountLock(final JanitorIterationCallback callback, final InternalTenantContext internalTenantContext) {
+ try {
+ return tryToDoJanitorOperationWithAccountLock(callback, internalTenantContext);
+ } catch (final LockFailedException e) {
+ log.warn("Error locking accountRecordId='{}'", internalTenantContext.getAccountRecordId(), e);
+ }
+ return null;
+ }
+
+ protected <T> T tryToDoJanitorOperationWithAccountLock(final JanitorIterationCallback callback, final InternalTenantContext internalTenantContext) throws LockFailedException {
GlobalLock lock = null;
try {
final ImmutableAccountData account = accountInternalApi.getImmutableAccountDataByRecordId(internalTenantContext.getAccountRecordId(), internalTenantContext);
lock = locker.lockWithNumberOfTries(LockerType.ACCNT_INV_PAY.toString(), account.getExternalKey(), paymentConfig.getMaxGlobalLockRetries());
return callback.doIteration();
- } catch (AccountApiException e) {
+ } catch (final AccountApiException e) {
log.warn("Error retrieving accountRecordId='{}'", internalTenantContext.getAccountRecordId(), e);
- } catch (LockFailedException e) {
- log.warn("Error locking accountRecordId='{}'", internalTenantContext.getAccountRecordId(), e);
} finally {
if (lock != null) {
lock.release();
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
index 5fd6dd2..72f050f 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
@@ -51,6 +51,7 @@ import org.killbill.billing.util.callcontext.TenantContext;
import org.killbill.billing.util.config.definition.PaymentConfig;
import org.killbill.clock.Clock;
import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.commons.locker.LockFailedException;
import org.killbill.notificationq.api.NotificationEvent;
import org.killbill.notificationq.api.NotificationQueue;
import org.skife.config.TimeSpan;
@@ -93,12 +94,19 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
}
public void processNotification(final JanitorNotificationKey notificationKey, final UUID userToken, final Long accountRecordId, final long tenantRecordId) {
+ try {
+ tryToProcessNotification(notificationKey, userToken, accountRecordId, tenantRecordId);
+ } catch (final LockFailedException e) {
+ log.warn("Error locking accountRecordId='{}', will attempt to retry later", accountRecordId, e);
+ insertNewNotificationForUnresolvedTransactionIfNeeded(notificationKey.getUuidKey(), notificationKey.getAttemptNumber(), userToken, accountRecordId, tenantRecordId);
+ }
+ }
+ public void tryToProcessNotification(final JanitorNotificationKey notificationKey, final UUID userToken, final Long accountRecordId, final long tenantRecordId) throws LockFailedException {
final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId);
- doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
+ tryToDoJanitorOperationWithAccountLock(new JanitorIterationCallback() {
@Override
public Void doIteration() {
-
// State may have changed since we originally retrieved with no lock
final PaymentTransactionModelDao rehydratedPaymentTransaction = paymentDao.getPaymentTransaction(notificationKey.getUuidKey(), internalTenantContext);
@@ -139,7 +147,6 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
return null;
}
}, internalTenantContext);
-
}
@Override
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
index 30456c4..cb340c6 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
@@ -24,19 +24,10 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.joda.time.DateTime;
-import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.events.PaymentInternalEvent;
-import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.core.PaymentExecutors;
-import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
-import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
-import org.killbill.billing.payment.core.sm.PluginControlPaymentAutomatonRunner;
-import org.killbill.billing.payment.dao.PaymentDao;
import org.killbill.billing.payment.glue.DefaultPaymentService;
-import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
-import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.config.definition.PaymentConfig;
-import org.killbill.clock.Clock;
import org.killbill.commons.locker.GlobalLocker;
import org.killbill.notificationq.api.NotificationEvent;
import org.killbill.notificationq.api.NotificationQueue;
@@ -60,55 +51,28 @@ public class Janitor {
private final NotificationQueueService notificationQueueService;
private final PaymentConfig paymentConfig;
private final PaymentExecutors paymentExecutors;
- private final Clock clock;
- private final PaymentDao paymentDao;
- private final InternalCallContextFactory internalCallContextFactory;
- private final PaymentStateMachineHelper paymentStateMachineHelper;
- private final PaymentControlStateMachineHelper retrySMHelper;
- private final AccountInternalApi accountInternalApi;
- private final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
- private final GlobalLocker locker;
- private final PluginControlPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner;
-
-
-
- private IncompletePaymentAttemptTask incompletePaymentAttemptTask;
- private IncompletePaymentTransactionTask incompletePaymentTransactionTask;
+ private final IncompletePaymentAttemptTask incompletePaymentAttemptTask;
+ private final IncompletePaymentTransactionTask incompletePaymentTransactionTask;
private NotificationQueue janitorQueue;
private ScheduledExecutorService janitorExecutor;
private volatile boolean isStopped;
@Inject
- public Janitor(final InternalCallContextFactory internalCallContextFactory,
- final PaymentDao paymentDao,
- final Clock clock,
- final PaymentStateMachineHelper paymentStateMachineHelper,
- final PaymentControlStateMachineHelper retrySMHelper,
- final AccountInternalApi accountInternalApi,
- final PluginControlPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner,
- final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry,
+ public Janitor(final IncompletePaymentAttemptTask incompletePaymentAttemptTask,
+ final IncompletePaymentTransactionTask incompletePaymentTransactionTask,
final GlobalLocker locker,
final PaymentConfig paymentConfig,
final NotificationQueueService notificationQueueService,
final PaymentExecutors paymentExecutors) {
+ this.incompletePaymentAttemptTask = incompletePaymentAttemptTask;
+ this.incompletePaymentTransactionTask = incompletePaymentTransactionTask;
this.notificationQueueService = notificationQueueService;
this.paymentExecutors = paymentExecutors;
this.paymentConfig = paymentConfig;
- this.internalCallContextFactory = internalCallContextFactory;
- this.paymentDao = paymentDao;
- this.clock = clock;
- this.pluginControlledPaymentAutomatonRunner = pluginControlledPaymentAutomatonRunner;
- this.paymentStateMachineHelper = paymentStateMachineHelper;
- this.retrySMHelper = retrySMHelper;
- this.accountInternalApi = accountInternalApi;
- this.pluginRegistry = pluginRegistry;
- this.locker = locker;
-
}
-
public void initialize() throws NotificationQueueAlreadyExists {
janitorQueue = notificationQueueService.createNotificationQueue(DefaultPaymentService.SERVICE_NAME,
QUEUE_NAME,
@@ -128,36 +92,16 @@ public class Janitor {
}
);
- this.incompletePaymentAttemptTask = new IncompletePaymentAttemptTask(internalCallContextFactory,
- paymentConfig,
- paymentDao,
- clock,
- paymentStateMachineHelper,
- retrySMHelper,
- accountInternalApi,
- pluginControlledPaymentAutomatonRunner,
- pluginRegistry,
- locker);
-
- this.incompletePaymentTransactionTask = new IncompletePaymentTransactionTask(internalCallContextFactory,
- paymentConfig,
- paymentDao,
- clock,
- paymentStateMachineHelper,
- retrySMHelper,
- accountInternalApi,
- pluginRegistry,
- locker);
-
-
incompletePaymentTransactionTask.attachJanitorQueue(janitorQueue);
incompletePaymentAttemptTask.attachJanitorQueue(janitorQueue);
}
public void start() {
-
this.isStopped = false;
+ incompletePaymentAttemptTask.start();
+ incompletePaymentTransactionTask.start();
+
janitorExecutor = paymentExecutors.getJanitorExecutorService();
janitorQueue.startQueue();
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
index 9b066d3..0a31eee 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
@@ -41,6 +41,8 @@ import org.killbill.billing.payment.core.PaymentGatewayProcessor;
import org.killbill.billing.payment.core.PaymentMethodProcessor;
import org.killbill.billing.payment.core.PaymentProcessor;
import org.killbill.billing.payment.core.PluginControlPaymentProcessor;
+import org.killbill.billing.payment.core.janitor.IncompletePaymentAttemptTask;
+import org.killbill.billing.payment.core.janitor.IncompletePaymentTransactionTask;
import org.killbill.billing.payment.core.janitor.Janitor;
import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
@@ -127,6 +129,8 @@ public class PaymentModule extends KillBillModule {
}
protected void installProcessors(final PaymentConfig paymentConfig) {
+ bind(IncompletePaymentAttemptTask.class).asEagerSingleton();
+ bind(IncompletePaymentTransactionTask.class).asEagerSingleton();
bind(PaymentProcessor.class).asEagerSingleton();
bind(PluginControlPaymentProcessor.class).asEagerSingleton();
bind(PaymentGatewayProcessor.class).asEagerSingleton();
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
new file mode 100644
index 0000000..c764c8a
--- /dev/null
+++ b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core.janitor;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.UUID;
+
+import org.killbill.billing.account.api.Account;
+import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.payment.PaymentTestSuiteWithEmbeddedDB;
+import org.killbill.billing.payment.api.Payment;
+import org.killbill.billing.payment.api.PaymentApiException;
+import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.payment.plugin.api.PaymentPluginStatus;
+import org.killbill.billing.payment.provider.MockPaymentProviderPlugin;
+import org.killbill.billing.util.globallocker.LockerType;
+import org.killbill.commons.locker.GlobalLock;
+import org.killbill.commons.locker.LockFailedException;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class TestIncompletePaymentTransactionTaskWithDB extends PaymentTestSuiteWithEmbeddedDB {
+
+ private MockPaymentProviderPlugin mockPaymentProviderPlugin;
+ private Account account;
+
+ @BeforeClass(groups = "slow")
+ protected void beforeClass() throws Exception {
+ super.beforeClass();
+
+ mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
+ }
+
+ @BeforeMethod(groups = "slow")
+ public void beforeMethod() throws Exception {
+ super.beforeMethod();
+
+ mockPaymentProviderPlugin.clear();
+ account = testHelper.createTestAccount(UUID.randomUUID().toString(), true);
+ }
+
+ @Test(groups = "slow", description = "https://github.com/killbill/killbill/issues/675")
+ public void testHandleLockExceptions() throws PaymentApiException {
+ final Payment payment = paymentApi.createPurchase(account,
+ account.getPaymentMethodId(),
+ null,
+ BigDecimal.TEN,
+ Currency.EUR,
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString(),
+ ImmutableList.<PluginProperty>of(new PluginProperty(MockPaymentProviderPlugin.PLUGIN_PROPERTY_PAYMENT_PLUGIN_STATUS_OVERRIDE, PaymentPluginStatus.PENDING.toString(), false)),
+ callContext);
+
+ final UUID transactionId = payment.getTransactions().get(0).getId();
+ final JanitorNotificationKey notificationKey = new JanitorNotificationKey(transactionId, incompletePaymentTransactionTask.getClass().toString(), 1);
+ final UUID userToken = UUID.randomUUID();
+
+ Assert.assertTrue(incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()).isEmpty());
+
+ GlobalLock lock = null;
+ try {
+ lock = locker.lockWithNumberOfTries(LockerType.ACCNT_INV_PAY.toString(), account.getExternalKey(), paymentConfig.getMaxGlobalLockRetries());
+
+ incompletePaymentTransactionTask.processNotification(notificationKey, userToken, internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+
+ final List<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+ Assert.assertFalse(futureNotifications.isEmpty());
+ Assert.assertEquals(futureNotifications.get(0).getUserToken(), userToken);
+ Assert.assertEquals(futureNotifications.get(0).getEvent().getClass(), JanitorNotificationKey.class);
+ final JanitorNotificationKey event = (JanitorNotificationKey) futureNotifications.get(0).getEvent();
+ Assert.assertEquals(event.getUuidKey(), transactionId);
+ Assert.assertEquals((int) event.getAttemptNumber(), 2);
+
+ // Based on config "15s,1m,3m,1h,1d,1d,1d,1d,1d"
+ Assert.assertTrue(futureNotifications.get(0).getEffectiveDate().compareTo(clock.getUTCNow().plusMinutes(1).plusSeconds(1)) < 0);
+ } catch (final LockFailedException e) {
+ Assert.fail();
+ } finally {
+ if (lock != null) {
+ lock.release();
+ }
+ }
+ }
+}
diff --git a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
index 73cc4ea..4f08145 100644
--- a/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/PaymentTestSuiteWithEmbeddedDB.java
@@ -30,6 +30,8 @@ import org.killbill.billing.payment.caching.StateMachineConfigCache;
import org.killbill.billing.payment.core.PaymentExecutors;
import org.killbill.billing.payment.core.PaymentMethodProcessor;
import org.killbill.billing.payment.core.PaymentProcessor;
+import org.killbill.billing.payment.core.janitor.IncompletePaymentTransactionTask;
+import org.killbill.billing.payment.core.janitor.Janitor;
import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
import org.killbill.billing.payment.dao.PaymentDao;
import org.killbill.billing.payment.glue.PaymentModule;
@@ -40,6 +42,7 @@ import org.killbill.billing.platform.api.KillbillConfigSource;
import org.killbill.billing.util.config.definition.PaymentConfig;
import org.killbill.billing.util.dao.NonEntityDao;
import org.killbill.bus.api.PersistentBus;
+import org.killbill.commons.locker.GlobalLocker;
import org.killbill.commons.profiling.Profiling;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -88,6 +91,12 @@ public abstract class PaymentTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
protected NonEntityDao nonEntityDao;
@Inject
protected StateMachineConfigCache stateMachineConfigCache;
+ @Inject
+ protected Janitor janitor;
+ @Inject
+ protected IncompletePaymentTransactionTask incompletePaymentTransactionTask;
+ @Inject
+ protected GlobalLocker locker;
@Override
protected KillbillConfigSource getConfigSource() {
@@ -113,10 +122,14 @@ public abstract class PaymentTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
eventBus.start();
Profiling.resetPerThreadProfilingData();
clock.resetDeltaFromReality();
+
+ janitor.initialize();
+ janitor.start();
}
@AfterMethod(groups = "slow")
public void afterMethod() throws Exception {
+ janitor.stop();
eventBus.stop();
paymentExecutors.stop();
}
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
index c248d23..77697da 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -101,8 +101,6 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
@Inject
protected NotificationQueueService notificationQueueService;
@Inject
- private Janitor janitor;
- @Inject
private PaymentBusEventHandler handler;
private MockPaymentProviderPlugin mockPaymentProviderPlugin;
@@ -123,15 +121,10 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
}
- @AfterClass(groups = "slow")
- protected void afterClass() throws Exception {
- }
-
@BeforeMethod(groups = "slow")
public void beforeMethod() throws Exception {
super.beforeMethod();
- janitor.initialize();
- janitor.start();
+
eventBus.register(handler);
testListener.reset();
eventBus.register(testListener);
@@ -145,7 +138,6 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
public void afterMethod() throws Exception {
testListener.assertListenerStatus();
- janitor.stop();
eventBus.unregister(handler);
eventBus.unregister(testListener);
super.afterMethod();