Details
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingTransactionTask.java
index 8bc737d..23aa0fa 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingTransactionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingTransactionTask.java
@@ -21,48 +21,52 @@ import java.util.List;
import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.callcontext.InternalTenantContext;
import org.killbill.billing.osgi.api.OSGIServiceRegistration;
import org.killbill.billing.payment.api.TransactionStatus;
import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
import org.killbill.billing.payment.core.sm.PluginRoutingPaymentAutomatonRunner;
import org.killbill.billing.payment.core.sm.RetryStateMachineHelper;
import org.killbill.billing.payment.dao.PaymentDao;
+import org.killbill.billing.payment.dao.PaymentModelDao;
+import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
-import org.killbill.billing.util.UUIDs;
-import org.killbill.billing.util.callcontext.CallOrigin;
+import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
-import org.killbill.billing.util.callcontext.UserType;
import org.killbill.billing.util.config.PaymentConfig;
import org.killbill.clock.Clock;
-import com.google.common.collect.ImmutableList;
-
/**
* Task to find old PENDING transactions and move them into
*/
-final class PendingTransactionTask extends CompletionTaskBase<Integer> {
-
- private final List<Integer> itemsForIterations;
+final class PendingTransactionTask extends CompletionTaskBase<PaymentTransactionModelDao> {
public PendingTransactionTask(final Janitor janitor, final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
final RetryStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
super(janitor, internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
- this.itemsForIterations = ImmutableList.of(new Integer(1));
}
@Override
- public List<Integer> getItemsForIteration() {
- return itemsForIterations;
+ public List<PaymentTransactionModelDao> getItemsForIteration() {
+ return paymentDao.getByTransactionStatusPriorDateAcrossTenants(TransactionStatus.PENDING, getCreatedDateBefore());
}
@Override
- public void doIteration(final Integer item) {
- final InternalCallContext contextTemplate = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, "PendingTransactionTask", CallOrigin.INTERNAL, UserType.SYSTEM, UUIDs.randomUUID());
- int result = paymentDao.failOldPendingTransactions(TransactionStatus.PLUGIN_FAILURE, getCreatedDateBefore(), contextTemplate);
- if (result > 0) {
- log.info("Janitor PendingTransactionTask moved " + result + " PENDING payments -> PLUGIN_FAILURE");
- }
+ public void doIteration(final PaymentTransactionModelDao paymentTransaction) {
+
+ final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(paymentTransaction.getTenantRecordId(), paymentTransaction.getAccountRecordId());
+ final CallContext callContext = createCallContext("PendingTransactionTask", internalTenantContext);
+ final PaymentModelDao payment = paymentDao.getPayment(paymentTransaction.getPaymentId(), internalTenantContext);
+
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(payment.getAccountId(), callContext);
+
+ final String newPaymentState = paymentStateMachineHelper.getFailureStateForTransaction(paymentTransaction.getTransactionType());
+ paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
+ paymentTransaction.getId(), TransactionStatus.PAYMENT_FAILURE, paymentTransaction.getProcessedAmount(), paymentTransaction.getProcessedCurrency(),
+ paymentTransaction.getGatewayErrorCode(), paymentTransaction.getGatewayErrorMsg(), internalCallContext);
+
+ log.info("Janitor PendingTransactionTask repairing payment {}, transaction {}", payment.getId(), paymentTransaction.getId());
}
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index 75f8fe7..910db19 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -156,44 +156,17 @@ public class DefaultPaymentDao implements PaymentDao {
}
@Override
- public int failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalCallContext internalCallContextTemplate) {
- return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Integer>() {
+ public List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(final TransactionStatus transactionStatus, final DateTime createdBeforeDate) {
+ return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentTransactionModelDao>>() {
@Override
- public Integer inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
+ public List<PaymentTransactionModelDao> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
final TransactionSqlDao transactional = entitySqlDaoWrapperFactory.become(TransactionSqlDao.class);
- final List<PaymentTransactionModelDao> oldPendingTransactions = transactional.getByTransactionStatusPriorDateAcrossTenants(TransactionStatus.PENDING.toString(), createdBeforeDate.toDate());
- if (oldPendingTransactions.size() > 0) {
-
- // Partition per tenant to compute a valid context
- final Map<Long, List<PaymentTransactionModelDao>> perTenantPendingTransactions = new HashMap<Long, List<PaymentTransactionModelDao>>();
- for (PaymentTransactionModelDao curPaymentTransactionModelDao : oldPendingTransactions) {
- List<PaymentTransactionModelDao> pendingTransactions = perTenantPendingTransactions.get(curPaymentTransactionModelDao.getTenantRecordId());
- if (pendingTransactions == null) {
- pendingTransactions = new LinkedList<PaymentTransactionModelDao>();
- perTenantPendingTransactions.put(curPaymentTransactionModelDao.getTenantRecordId(), pendingTransactions);
- }
- pendingTransactions.add(curPaymentTransactionModelDao);
- }
-
- int result = 0;
- for (final Long curTenantRecordId : perTenantPendingTransactions.keySet()) {
- final InternalCallContext validContext = new InternalCallContext(internalCallContextTemplate, -1L, curTenantRecordId);
- final Collection<String> perTenantPendingTransactionIds = Collections2.transform(perTenantPendingTransactions.get(curTenantRecordId), new Function<PaymentTransactionModelDao, String>() {
- @Nullable
- @Override
- public String apply(@Nullable final PaymentTransactionModelDao input) {
- return input.getId().toString();
- }
- });
- result += transactional.failOldPendingTransactions(perTenantPendingTransactionIds, TransactionStatus.PAYMENT_FAILURE.toString(), validContext);
- }
- return result;
- }
- return 0;
+ return transactional.getByTransactionStatusPriorDateAcrossTenants(transactionStatus.toString(), createdBeforeDate.toDate());
}
});
}
+
@Override
public List<PaymentTransactionModelDao> getPaymentTransactionsByExternalKey(final String transactionExternalKey, final InternalTenantContext context) {
return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentTransactionModelDao>>() {
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
index 76ac6c8..8f95b2e 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
@@ -30,7 +30,7 @@ import org.killbill.billing.util.entity.Pagination;
public interface PaymentDao {
- public int failOldPendingTransactions(TransactionStatus newTransactionStatus, DateTime createdBeforeDate, InternalCallContext internalCallContextTemplate);
+ public List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(final TransactionStatus transactionStatus, final DateTime createdBeforeDate);
public PaymentAttemptModelDao insertPaymentAttemptWithProperties(PaymentAttemptModelDao attempt, InternalCallContext context);
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
index 2f1853f..19427e2 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
@@ -56,12 +56,6 @@ public interface TransactionSqlDao extends EntitySqlDao<PaymentTransactionModelD
List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@Bind("transactionStatus") final String transactionStatus,
@Bind("beforeCreatedDate") final Date beforeCreatedDate);
- @SqlUpdate
- @Audited(ChangeType.UPDATE)
- int failOldPendingTransactions(@UUIDCollectionBinder final Collection<String> pendingTransactionIds,
- @Bind("newTransactionStatus") final String newTransactionStatus,
- @BindBean final InternalCallContext context);
-
@SqlQuery
public List<PaymentTransactionModelDao> getByPaymentId(@Bind("paymentId") final UUID paymentId,
@BindBean final InternalTenantContext context);
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
index b14134f..9da6875 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
@@ -92,11 +92,3 @@ and created_date \< :beforeCreatedDate
>>
-failOldPendingTransactions(ids) ::= <<
-update <tableName()>
-set transaction_status = :newTransactionStatus
-, updated_by = :updatedBy
-, updated_date = :createdDate
-where <idField("")> in (<ids: {id | :id_<i0>}; separator="," >)
-;
->>
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
index 03e743b..cbf00ff 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
@@ -64,15 +64,13 @@ public class MockPaymentDao implements PaymentDao {
}
@Override
- public int failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalCallContext context) {
- int result = 0;
- synchronized (transactions) {
- for (PaymentTransactionModelDao cur : transactions.values()) {
- cur.setTransactionStatus(newTransactionStatus);
- result++;
+ public List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(final TransactionStatus transactionStatus, final DateTime createdBeforeDate) {
+ return ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
+ @Override
+ public boolean apply(final PaymentTransactionModelDao input) {
+ return input.getTransactionStatus() == transactionStatus;
}
- }
- return result;
+ }));
}
@Override
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
index 0b1d03f..37e6e90 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
@@ -267,7 +267,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
TransactionStatus.PENDING, BigDecimal.TEN, Currency.AED,
"pending", "");
- paymentDao.insertPaymentWithFirstTransaction(paymentModelDao, transaction1, internalCallContext);
+ final PaymentModelDao payment = paymentDao.insertPaymentWithFirstTransaction(paymentModelDao, transaction1, internalCallContext);
final PaymentTransactionModelDao transaction2 = new PaymentTransactionModelDao(initialTime, initialTime, null, transactionExternalKey2,
paymentModelDao.getId(), TransactionType.AUTHORIZE, initialTime,
@@ -299,7 +299,13 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
final List<PaymentTransactionModelDao> result = getPendingTransactions(paymentModelDao.getId());
Assert.assertEquals(result.size(), 3);
- paymentDao.failOldPendingTransactions(TransactionStatus.PAYMENT_FAILURE, newTime, internalCallContext);
+ final List<PaymentTransactionModelDao> transactions1 = paymentDao.getByTransactionStatusPriorDateAcrossTenants(TransactionStatus.PENDING, newTime);
+ for (PaymentTransactionModelDao paymentTransaction : transactions1) {
+ final String newPaymentState = "XXX_FAILED";
+ paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
+ paymentTransaction.getId(), TransactionStatus.PAYMENT_FAILURE, paymentTransaction.getProcessedAmount(), paymentTransaction.getProcessedCurrency(),
+ paymentTransaction.getGatewayErrorCode(), paymentTransaction.getGatewayErrorMsg(), internalCallContext);
+ }
final List<PaymentTransactionModelDao> result2 = getPendingTransactions(paymentModelDao.getId());
Assert.assertEquals(result2.size(), 1);
@@ -311,7 +317,13 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
}
;
- paymentDao.failOldPendingTransactions(TransactionStatus.PAYMENT_FAILURE, clock.getUTCNow(), internalCallContextWithNewTime);
+ final List<PaymentTransactionModelDao> transactions2 = paymentDao.getByTransactionStatusPriorDateAcrossTenants(TransactionStatus.PENDING, clock.getUTCNow());
+ for (PaymentTransactionModelDao paymentTransaction : transactions2) {
+ final String newPaymentState = "XXX_FAILED";
+ paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
+ paymentTransaction.getId(), TransactionStatus.PAYMENT_FAILURE, paymentTransaction.getProcessedAmount(), paymentTransaction.getProcessedCurrency(),
+ paymentTransaction.getGatewayErrorCode(), paymentTransaction.getGatewayErrorMsg(), internalCallContext);
+ }
final List<PaymentTransactionModelDao> result3 = getPendingTransactions(paymentModelDao.getId());
Assert.assertEquals(result3.size(), 0);