killbill-uncached

Fixes #328

6/8/2015 7:45:29 PM

Details

diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingTransactionTask.java
index 8bc737d..23aa0fa 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingTransactionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingTransactionTask.java
@@ -21,48 +21,52 @@ import java.util.List;
 
 import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.osgi.api.OSGIServiceRegistration;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PluginRoutingPaymentAutomatonRunner;
 import org.killbill.billing.payment.core.sm.RetryStateMachineHelper;
 import org.killbill.billing.payment.dao.PaymentDao;
+import org.killbill.billing.payment.dao.PaymentModelDao;
+import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
 import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
-import org.killbill.billing.util.UUIDs;
-import org.killbill.billing.util.callcontext.CallOrigin;
+import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
-import org.killbill.billing.util.callcontext.UserType;
 import org.killbill.billing.util.config.PaymentConfig;
 import org.killbill.clock.Clock;
 
-import com.google.common.collect.ImmutableList;
-
 /**
  * Task to find old PENDING transactions and move them into
  */
-final class PendingTransactionTask extends CompletionTaskBase<Integer> {
-
-    private final List<Integer> itemsForIterations;
+final class PendingTransactionTask extends CompletionTaskBase<PaymentTransactionModelDao> {
 
     public PendingTransactionTask(final Janitor janitor, final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
                                   final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
                                   final RetryStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
                                   final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
         super(janitor, internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
-        this.itemsForIterations = ImmutableList.of(new Integer(1));
     }
 
     @Override
-    public List<Integer> getItemsForIteration() {
-        return itemsForIterations;
+    public List<PaymentTransactionModelDao> getItemsForIteration() {
+        return paymentDao.getByTransactionStatusPriorDateAcrossTenants(TransactionStatus.PENDING, getCreatedDateBefore());
     }
 
     @Override
-    public void doIteration(final Integer item) {
-        final InternalCallContext contextTemplate = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, "PendingTransactionTask", CallOrigin.INTERNAL, UserType.SYSTEM, UUIDs.randomUUID());
-        int result = paymentDao.failOldPendingTransactions(TransactionStatus.PLUGIN_FAILURE, getCreatedDateBefore(), contextTemplate);
-        if (result > 0) {
-            log.info("Janitor PendingTransactionTask moved " + result + " PENDING payments ->  PLUGIN_FAILURE");
-        }
+    public void doIteration(final PaymentTransactionModelDao paymentTransaction) {
+
+        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(paymentTransaction.getTenantRecordId(), paymentTransaction.getAccountRecordId());
+        final CallContext callContext = createCallContext("PendingTransactionTask", internalTenantContext);
+        final PaymentModelDao payment = paymentDao.getPayment(paymentTransaction.getPaymentId(), internalTenantContext);
+
+        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(payment.getAccountId(), callContext);
+
+        final String newPaymentState = paymentStateMachineHelper.getFailureStateForTransaction(paymentTransaction.getTransactionType());
+        paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
+                                                           paymentTransaction.getId(), TransactionStatus.PAYMENT_FAILURE, paymentTransaction.getProcessedAmount(), paymentTransaction.getProcessedCurrency(),
+                                                           paymentTransaction.getGatewayErrorCode(), paymentTransaction.getGatewayErrorMsg(), internalCallContext);
+
+        log.info("Janitor PendingTransactionTask repairing payment {}, transaction {}", payment.getId(), paymentTransaction.getId());
     }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index 75f8fe7..910db19 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -156,44 +156,17 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
-    public int failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalCallContext internalCallContextTemplate) {
-        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Integer>() {
+    public List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(final TransactionStatus transactionStatus, final DateTime createdBeforeDate) {
+        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentTransactionModelDao>>() {
             @Override
-            public Integer inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
+            public List<PaymentTransactionModelDao> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
                 final TransactionSqlDao transactional = entitySqlDaoWrapperFactory.become(TransactionSqlDao.class);
-                final List<PaymentTransactionModelDao> oldPendingTransactions = transactional.getByTransactionStatusPriorDateAcrossTenants(TransactionStatus.PENDING.toString(), createdBeforeDate.toDate());
-                if (oldPendingTransactions.size() > 0) {
-
-                    // Partition per tenant to compute a valid context
-                    final Map<Long, List<PaymentTransactionModelDao>> perTenantPendingTransactions = new HashMap<Long, List<PaymentTransactionModelDao>>();
-                    for (PaymentTransactionModelDao curPaymentTransactionModelDao : oldPendingTransactions) {
-                        List<PaymentTransactionModelDao> pendingTransactions = perTenantPendingTransactions.get(curPaymentTransactionModelDao.getTenantRecordId());
-                        if (pendingTransactions == null) {
-                            pendingTransactions = new LinkedList<PaymentTransactionModelDao>();
-                            perTenantPendingTransactions.put(curPaymentTransactionModelDao.getTenantRecordId(), pendingTransactions);
-                        }
-                        pendingTransactions.add(curPaymentTransactionModelDao);
-                    }
-
-                    int result = 0;
-                    for (final Long curTenantRecordId : perTenantPendingTransactions.keySet()) {
-                        final InternalCallContext validContext = new InternalCallContext(internalCallContextTemplate, -1L, curTenantRecordId);
-                        final Collection<String> perTenantPendingTransactionIds = Collections2.transform(perTenantPendingTransactions.get(curTenantRecordId), new Function<PaymentTransactionModelDao, String>() {
-                            @Nullable
-                            @Override
-                            public String apply(@Nullable final PaymentTransactionModelDao input) {
-                                return input.getId().toString();
-                            }
-                        });
-                        result += transactional.failOldPendingTransactions(perTenantPendingTransactionIds, TransactionStatus.PAYMENT_FAILURE.toString(), validContext);
-                    }
-                    return result;
-                }
-                return 0;
+                return transactional.getByTransactionStatusPriorDateAcrossTenants(transactionStatus.toString(), createdBeforeDate.toDate());
             }
         });
     }
 
+
     @Override
     public List<PaymentTransactionModelDao> getPaymentTransactionsByExternalKey(final String transactionExternalKey, final InternalTenantContext context) {
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentTransactionModelDao>>() {
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
index 76ac6c8..8f95b2e 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
@@ -30,7 +30,7 @@ import org.killbill.billing.util.entity.Pagination;
 
 public interface PaymentDao {
 
-    public int failOldPendingTransactions(TransactionStatus newTransactionStatus, DateTime createdBeforeDate, InternalCallContext internalCallContextTemplate);
+    public List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(final TransactionStatus transactionStatus, final DateTime createdBeforeDate);
 
     public PaymentAttemptModelDao insertPaymentAttemptWithProperties(PaymentAttemptModelDao attempt, InternalCallContext context);
 
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
index 2f1853f..19427e2 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
@@ -56,12 +56,6 @@ public interface TransactionSqlDao extends EntitySqlDao<PaymentTransactionModelD
     List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@Bind("transactionStatus") final String transactionStatus,
                                                                                   @Bind("beforeCreatedDate") final Date beforeCreatedDate);
 
-    @SqlUpdate
-    @Audited(ChangeType.UPDATE)
-    int failOldPendingTransactions(@UUIDCollectionBinder final Collection<String> pendingTransactionIds,
-                                    @Bind("newTransactionStatus") final String newTransactionStatus,
-                                    @BindBean final InternalCallContext context);
-
     @SqlQuery
     public List<PaymentTransactionModelDao> getByPaymentId(@Bind("paymentId") final UUID paymentId,
                                                            @BindBean final InternalTenantContext context);
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
index b14134f..9da6875 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
@@ -92,11 +92,3 @@ and created_date \< :beforeCreatedDate
 >>
 
 
-failOldPendingTransactions(ids) ::= <<
-update <tableName()>
-set transaction_status = :newTransactionStatus
-, updated_by = :updatedBy
-, updated_date = :createdDate
-where <idField("")> in (<ids: {id | :id_<i0>}; separator="," >)
-;
->>
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
index 03e743b..cbf00ff 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
@@ -64,15 +64,13 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
-    public int failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalCallContext context) {
-        int result = 0;
-        synchronized (transactions) {
-            for (PaymentTransactionModelDao cur : transactions.values()) {
-                cur.setTransactionStatus(newTransactionStatus);
-                result++;
+    public List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(final TransactionStatus transactionStatus, final DateTime createdBeforeDate) {
+        return ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
+            @Override
+            public boolean apply(final PaymentTransactionModelDao input) {
+                return input.getTransactionStatus() == transactionStatus;
             }
-        }
-        return result;
+        }));
     }
 
     @Override
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
index 0b1d03f..37e6e90 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
@@ -267,7 +267,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
                                                                                        TransactionStatus.PENDING, BigDecimal.TEN, Currency.AED,
                                                                                        "pending", "");
 
-        paymentDao.insertPaymentWithFirstTransaction(paymentModelDao, transaction1, internalCallContext);
+        final PaymentModelDao payment  = paymentDao.insertPaymentWithFirstTransaction(paymentModelDao, transaction1, internalCallContext);
 
         final PaymentTransactionModelDao transaction2 = new PaymentTransactionModelDao(initialTime, initialTime, null, transactionExternalKey2,
                                                                                        paymentModelDao.getId(), TransactionType.AUTHORIZE, initialTime,
@@ -299,7 +299,13 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         final List<PaymentTransactionModelDao> result = getPendingTransactions(paymentModelDao.getId());
         Assert.assertEquals(result.size(), 3);
 
-        paymentDao.failOldPendingTransactions(TransactionStatus.PAYMENT_FAILURE, newTime, internalCallContext);
+        final List<PaymentTransactionModelDao> transactions1 = paymentDao.getByTransactionStatusPriorDateAcrossTenants(TransactionStatus.PENDING, newTime);
+        for (PaymentTransactionModelDao paymentTransaction : transactions1) {
+            final String newPaymentState = "XXX_FAILED";
+            paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
+                                                               paymentTransaction.getId(), TransactionStatus.PAYMENT_FAILURE, paymentTransaction.getProcessedAmount(), paymentTransaction.getProcessedCurrency(),
+                                                               paymentTransaction.getGatewayErrorCode(), paymentTransaction.getGatewayErrorMsg(), internalCallContext);
+        }
 
         final List<PaymentTransactionModelDao> result2 = getPendingTransactions(paymentModelDao.getId());
         Assert.assertEquals(result2.size(), 1);
@@ -311,7 +317,13 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         }
         ;
 
-        paymentDao.failOldPendingTransactions(TransactionStatus.PAYMENT_FAILURE, clock.getUTCNow(), internalCallContextWithNewTime);
+        final List<PaymentTransactionModelDao> transactions2 = paymentDao.getByTransactionStatusPriorDateAcrossTenants(TransactionStatus.PENDING, clock.getUTCNow());
+        for (PaymentTransactionModelDao paymentTransaction : transactions2) {
+            final String newPaymentState = "XXX_FAILED";
+            paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
+                                                               paymentTransaction.getId(), TransactionStatus.PAYMENT_FAILURE, paymentTransaction.getProcessedAmount(), paymentTransaction.getProcessedCurrency(),
+                                                               paymentTransaction.getGatewayErrorCode(), paymentTransaction.getGatewayErrorMsg(), internalCallContext);
+        }
 
         final List<PaymentTransactionModelDao> result3 = getPendingTransactions(paymentModelDao.getId());
         Assert.assertEquals(result3.size(), 0);