killbill-memoizeit

Add a scheduled task in payment to take care of old PENDING transactions

7/8/2014 11:01:03 PM

Details

payment/pom.xml 4(+4 -0)

diff --git a/payment/pom.xml b/payment/pom.xml
index 41b6dee..d8fb8ba 100644
--- a/payment/pom.xml
+++ b/payment/pom.xml
@@ -150,6 +150,10 @@
         </dependency>
         <dependency>
             <groupId>org.kill-bill.commons</groupId>
+            <artifactId>killbill-concurrent</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.kill-bill.commons</groupId>
             <artifactId>killbill-embeddeddb-h2</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java
new file mode 100644
index 0000000..02de4a5
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/core/Janitor.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core;
+
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.dao.PaymentDao;
+import org.killbill.billing.payment.glue.PaymentModule;
+import org.killbill.billing.util.callcontext.CallOrigin;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.clock.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Takes care of incomplete payment/transactions.
+ */
+public class Janitor {
+
+    private final static Logger log = LoggerFactory.getLogger(Janitor.class);
+
+    private final ScheduledExecutorService janitorExecutor;
+    private final PaymentDao paymentDao;
+    private final Clock clock;
+    private final PaymentConfig paymentConfig;
+    private final InternalCallContextFactory internalCallContextFactory;
+
+    @Inject
+    public Janitor(final PaymentDao paymentDao,
+                   final PaymentConfig paymentConfig,
+                   final Clock clock,
+                   final InternalCallContextFactory internalCallContextFactory,
+                   @Named(PaymentModule.JANITOR_EXECUTOR_NAMED) final ScheduledExecutorService janitorExecutor) {
+        this.paymentDao = paymentDao;
+        this.clock = clock;
+        this.paymentConfig = paymentConfig;
+        this.janitorExecutor = janitorExecutor;
+        this.internalCallContextFactory = internalCallContextFactory;
+    }
+
+    public void start() {
+        final TimeUnit rateUnit = paymentConfig.getJanitorRunningRate().getUnit();
+        final long period = paymentConfig.getJanitorRunningRate().getPeriod();
+        janitorExecutor.scheduleAtFixedRate(new PendingTransactionTask(), period, period, rateUnit);
+    }
+
+    public void stop() {
+        janitorExecutor.shutdown();
+    }
+
+    /**
+     * Task to find old PENDING transactions and move them into
+     */
+    private final class PendingTransactionTask implements Runnable {
+
+        private final InternalTenantContext fakeCallContext;
+
+        private PendingTransactionTask() {
+            this.fakeCallContext = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, "PendingJanitorTask", CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID());
+        }
+
+        private DateTime getCreatedDateBefore() {
+            final long delayBeforeNowMs = paymentConfig.getJanitorPendingCleanupTime().getMillis();
+            return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
+        }
+
+        @Override
+        public void run() {
+            paymentDao.failOldPendingTransactions(TransactionStatus.PLUGIN_FAILURE, getCreatedDateBefore(), fakeCallContext);
+        }
+    }
+}
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index 9197bb5..d9cb89c 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -27,6 +27,7 @@ import java.util.UUID;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 
+import org.joda.time.DateTime;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.api.Currency;
@@ -34,6 +35,7 @@ import org.killbill.billing.entity.EntityPersistenceException;
 import org.killbill.billing.payment.api.DirectPayment;
 import org.killbill.billing.payment.api.PaymentMethod;
 import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.api.TransactionType;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.dao.NonEntityDao;
 import org.killbill.billing.util.entity.Pagination;
@@ -46,6 +48,7 @@ import org.killbill.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import org.killbill.clock.Clock;
 import org.skife.jdbi.v2.IDBI;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
@@ -124,6 +127,28 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
+    public void failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalTenantContext context) {
+         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+            @Override
+            public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                final TransactionSqlDao transactional = entitySqlDaoWrapperFactory.become(TransactionSqlDao.class);
+                final List<PaymentTransactionModelDao> oldPendingTransactions = transactional.getByTransactionStatusPriorDate(TransactionStatus.PENDING.toString(), createdBeforeDate.toDate(), context);
+                if (oldPendingTransactions.size() > 0) {
+                    final Collection<String> oldPendingTransactionIds = Collections2.transform(oldPendingTransactions, new Function<PaymentTransactionModelDao, String>() {
+                        @Override
+                        public String apply(final PaymentTransactionModelDao input) {
+                            return input.getId().toString();
+                        }
+                    });
+                    transactional.failOldPendingTransactions(oldPendingTransactionIds, TransactionStatus.PAYMENT_FAILURE.toString(), context);
+                }
+                return null;
+            }
+        });
+    }
+
+
+    @Override
     public List<PaymentTransactionModelDao> getDirectPaymentTransactionsByExternalKey(final String transactionExternalKey, final InternalTenantContext context) {
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentTransactionModelDao>>() {
             @Override
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
index a41d336..a452ccd 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
@@ -20,6 +20,7 @@ import java.math.BigDecimal;
 import java.util.List;
 import java.util.UUID;
 
+import org.joda.time.DateTime;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.api.Currency;
@@ -28,6 +29,8 @@ import org.killbill.billing.util.entity.Pagination;
 
 public interface PaymentDao {
 
+    public void failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalTenantContext context);
+
     public PaymentAttemptModelDao insertPaymentAttemptWithProperties(PaymentAttemptModelDao attempt, InternalCallContext context);
 
     public void updatePaymentAttempt(UUID paymentAttemptId, UUID transactionId, String state, InternalCallContext context);
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
index e27dcdf..6e7ad8b 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
@@ -17,6 +17,8 @@
 package org.killbill.billing.payment.dao;
 
 import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.Date;
 import java.util.List;
 import java.util.UUID;
 
@@ -27,6 +29,7 @@ import org.killbill.billing.util.audit.ChangeType;
 import org.killbill.billing.util.entity.dao.Audited;
 import org.killbill.billing.util.entity.dao.EntitySqlDao;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoStringTemplate;
+import org.killbill.billing.util.tag.dao.UUIDCollectionBinder;
 import org.skife.jdbi.v2.sqlobject.Bind;
 import org.skife.jdbi.v2.sqlobject.BindBean;
 import org.skife.jdbi.v2.sqlobject.SqlQuery;
@@ -50,8 +53,19 @@ public interface TransactionSqlDao extends EntitySqlDao<PaymentTransactionModelD
                                                                          @BindBean final InternalTenantContext context);
 
     @SqlQuery
+    List<PaymentTransactionModelDao> getByTransactionStatusPriorDate(@Bind("transactionStatus") final String transactionStatus,
+                                                                     @Bind("beforeCreatedDate") final Date beforeCreatedDate,
+                                                                     @BindBean final InternalTenantContext context);
+
+    @SqlUpdate
+    @Audited(ChangeType.UPDATE)
+    void failOldPendingTransactions(@UUIDCollectionBinder final Collection<String> pendingTransactionIds,
+                                    @Bind("newTransactionStatus") final String newTransactionStatus,
+                                    @BindBean final InternalTenantContext context);
+
+    @SqlQuery
     public List<PaymentTransactionModelDao> getByPaymentId(@Bind("paymentId") final UUID paymentId,
-                                                                 @BindBean final InternalTenantContext context);
+                                                           @BindBean final InternalTenantContext context);
 }
 
 
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java b/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
index 09f8861..eeb7319 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
@@ -22,6 +22,7 @@ import org.killbill.billing.payment.api.DirectPaymentApi;
 import org.killbill.billing.payment.api.PaymentService;
 import org.killbill.billing.payment.bus.InvoiceHandler;
 import org.killbill.billing.payment.control.PaymentTagHandler;
+import org.killbill.billing.payment.core.Janitor;
 import org.killbill.billing.payment.retry.DefaultRetryService;
 import org.killbill.billing.platform.api.LifecycleHandlerType;
 import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
@@ -44,18 +45,21 @@ public class DefaultPaymentService implements PaymentService {
     private final PersistentBus eventBus;
     private final DirectPaymentApi api;
     private final DefaultRetryService retryService;
+    private final Janitor janitor;
 
     @Inject
     public DefaultPaymentService(final InvoiceHandler invoiceHandler,
                                  final PaymentTagHandler tagHandler,
                                  final DirectPaymentApi api,
                                  final DefaultRetryService retryService,
-                                 final PersistentBus eventBus) {
+                                 final PersistentBus eventBus,
+                                 final Janitor janitor) {
         this.invoiceHandler = invoiceHandler;
         this.tagHandler = tagHandler;
         this.eventBus = eventBus;
         this.api = api;
         this.retryService = retryService;
+        this.janitor = janitor;
     }
 
     @Override
@@ -77,6 +81,7 @@ public class DefaultPaymentService implements PaymentService {
     @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
     public void start() {
         retryService.start();
+        janitor.start();
     }
 
     @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
@@ -88,6 +93,7 @@ public class DefaultPaymentService implements PaymentService {
             throw new RuntimeException("Unable to unregister to the EventBus!", e);
         }
         retryService.stop();
+        janitor.stop();
     }
 
     @Override
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
index 27efa92..4074131 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
@@ -20,6 +20,7 @@ package org.killbill.billing.payment.glue;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 
 import javax.inject.Provider;
@@ -36,6 +37,7 @@ import org.killbill.billing.payment.bus.InvoiceHandler;
 import org.killbill.billing.payment.control.PaymentTagHandler;
 import org.killbill.billing.payment.control.dao.InvoicePaymentControlDao;
 import org.killbill.billing.payment.core.DirectPaymentProcessor;
+import org.killbill.billing.payment.core.Janitor;
 import org.killbill.billing.payment.core.PaymentGatewayProcessor;
 import org.killbill.billing.payment.core.PaymentMethodProcessor;
 import org.killbill.billing.payment.core.PluginControlledPaymentProcessor;
@@ -63,6 +65,7 @@ public class PaymentModule extends KillBillModule {
 
     private static final String PLUGIN_THREAD_PREFIX = "Plugin-th-";
 
+    public static final String JANITOR_EXECUTOR_NAMED = "JanitorExecutor";
     public static final String PLUGIN_EXECUTOR_NAMED = "PluginExecutor";
     public static final String RETRYABLE_NAMED = "Retryable";
 
@@ -82,6 +85,12 @@ public class PaymentModule extends KillBillModule {
     protected void installPaymentProviderPlugins(final PaymentConfig config) {
     }
 
+    protected void installJanitor() {
+        final ScheduledExecutorService janitorExecutor = org.killbill.commons.concurrent.Executors.newSingleThreadScheduledExecutor("PaymentJanitor");
+        bind(ScheduledExecutorService.class).annotatedWith(Names.named(JANITOR_EXECUTOR_NAMED)).toInstance(janitorExecutor);
+
+        bind(Janitor.class).asEagerSingleton();
+    }
     protected void installRetryEngines() {
         bind(DefaultRetryService.class).asEagerSingleton();
         bind(RetryService.class).annotatedWith(Names.named(RETRYABLE_NAMED)).to(DefaultRetryService.class);
@@ -158,5 +167,6 @@ public class PaymentModule extends KillBillModule {
         installStateMachines();
         installAutomatonRunner();
         installRetryEngines();
+        installJanitor();
     }
 }
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
index f42c7fb..4a6f916 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
@@ -77,3 +77,24 @@ where payment_id = :paymentId
 <defaultOrderBy()>
 ;
 >>
+
+/* Does not include AND_CHECK_TENANT() since this is a global operation */
+getByTransactionStatusPriorDate() ::= <<
+select <allTableFields()>
+from <tableName()>
+where transaction_status = :transactionStatus
+and created_date \< :beforeCreatedDate
+<defaultOrderBy()>
+;
+>>
+
+
+failOldPendingTransactions(ids) ::= <<
+update <tableName()>
+set transaction_status = :newTransactionStatus
+, updated_by = :updatedBy
+, updated_date = :createdDate
+where <idField("")> in (<ids: {id | :id_<i0>}; separator="," >)
+<defaultOrderBy()>
+;
+>>
\ No newline at end of file
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
index ae583ad..6ea5e88 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.joda.time.DateTime;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.api.Currency;
@@ -50,6 +51,15 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
+    public void failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalTenantContext context) {
+        synchronized (transactions) {
+            for (PaymentTransactionModelDao cur : transactions.values()) {
+                cur.setTransactionStatus(newTransactionStatus);
+            }
+        }
+    }
+
+    @Override
     public PaymentAttemptModelDao insertPaymentAttemptWithProperties(final PaymentAttemptModelDao attempt, final InternalCallContext context) {
         synchronized (this) {
             attempts.put(attempt.getId(), attempt);
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
index 8077815..359e00b 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
@@ -21,16 +21,27 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import javax.annotation.Nullable;
+
 import org.joda.time.DateTime;
+import org.killbill.billing.GuicyKillbillTestSuite;
+import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.catalog.api.Currency;
 import org.killbill.billing.payment.PaymentTestSuiteWithEmbeddedDB;
 import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
 import org.killbill.billing.payment.dao.PluginPropertySerializer.PluginPropertySerializerException;
+import org.killbill.billing.util.callcontext.CallOrigin;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.billing.util.callcontext.UserType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -49,18 +60,15 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
 
         final UUID accountId = UUID.randomUUID();
 
-
-
         final List<PluginProperty> properties = new ArrayList<PluginProperty>();
         properties.add(new PluginProperty("key1", "value1", false));
         properties.add(new PluginProperty("key2", "value2", false));
 
-        final byte [] serialized = PluginPropertySerializer.serialize(properties);
+        final byte[] serialized = PluginPropertySerializer.serialize(properties);
         final PaymentAttemptModelDao attempt = new PaymentAttemptModelDao(UUID.randomUUID(), UUID.randomUUID(), clock.getUTCNow(), clock.getUTCNow(),
                                                                           paymentExternalKey, directTransactionId, transactionExternalKey, transactionType, stateName,
                                                                           BigDecimal.ZERO, Currency.ALL, pluginName, serialized);
 
-
         PaymentAttemptModelDao savedAttempt = paymentDao.insertPaymentAttemptWithProperties(attempt, internalCallContext);
         assertEquals(savedAttempt.getTransactionExternalKey(), transactionExternalKey);
         assertEquals(savedAttempt.getTransactionType(), transactionType);
@@ -185,7 +193,6 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         paymentDao.updateDirectPaymentAndTransactionOnCompletion(savedPayment.getId(), "AUTH_ABORTED", null, transactionModelDao2.getId(), TransactionStatus.SUCCESS,
                                                                  BigDecimal.ONE, Currency.USD, null, "nothing", internalCallContext);
 
-
         final PaymentModelDao savedPayment4Again = paymentDao.getDirectPayment(savedPayment.getId(), internalCallContext);
         assertEquals(savedPayment4Again.getId(), paymentModelDao.getId());
         assertEquals(savedPayment4Again.getStateName(), "AUTH_ABORTED");
@@ -214,7 +221,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         final String pluginName = "nobody";
         final Boolean isActive = Boolean.TRUE;
 
-        final PaymentMethodModelDao method = new PaymentMethodModelDao(paymentMethodId,UUID.randomUUID().toString(), null, null,
+        final PaymentMethodModelDao method = new PaymentMethodModelDao(paymentMethodId, UUID.randomUUID().toString(), null, null,
                                                                        accountId, pluginName, isActive);
 
         PaymentMethodModelDao savedMethod = paymentDao.insertPaymentMethod(method, internalCallContext);
@@ -243,4 +250,84 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         assertEquals(deletedPaymentMethod.getId(), paymentMethodId);
         assertEquals(deletedPaymentMethod.getPluginName(), pluginName);
     }
+
+    @Test(groups = "slow")
+    public void testPendingTransactions() {
+
+        final UUID paymentMethodId = UUID.randomUUID();
+        final UUID accountId = UUID.randomUUID();
+        final String externalKey = "hhhhooo";
+        final String transactionExternalKey1 = "transaction1";
+        final String transactionExternalKey2 = "transaction2";
+        final String transactionExternalKey3 = "transaction3";
+        final String transactionExternalKey4 = "transaction4";
+
+        final DateTime initialTime = clock.getUTCNow();
+
+        final PaymentModelDao paymentModelDao = new PaymentModelDao(initialTime, initialTime, accountId, paymentMethodId, externalKey);
+        final PaymentTransactionModelDao transaction1 = new PaymentTransactionModelDao(initialTime, initialTime, transactionExternalKey1,
+                                                                                       paymentModelDao.getId(), TransactionType.AUTHORIZE, initialTime,
+                                                                                       TransactionStatus.PENDING, BigDecimal.TEN, Currency.AED,
+                                                                                       "pending", "");
+
+        paymentDao.insertDirectPaymentWithFirstTransaction(paymentModelDao, transaction1, internalCallContext);
+
+        final PaymentTransactionModelDao transaction2 = new PaymentTransactionModelDao(initialTime, initialTime, transactionExternalKey2,
+                                                                                       paymentModelDao.getId(), TransactionType.AUTHORIZE, initialTime,
+                                                                                       TransactionStatus.PENDING, BigDecimal.TEN, Currency.AED,
+                                                                                       "pending", "");
+        paymentDao.updateDirectPaymentWithNewTransaction(paymentModelDao.getId(), transaction2, internalCallContext);
+
+        final PaymentTransactionModelDao transaction3 = new PaymentTransactionModelDao(initialTime, initialTime, transactionExternalKey3,
+                                                                                       paymentModelDao.getId(), TransactionType.AUTHORIZE, initialTime,
+                                                                                       TransactionStatus.SUCCESS, BigDecimal.TEN, Currency.AED,
+                                                                                       "success", "");
+
+        paymentDao.updateDirectPaymentWithNewTransaction(paymentModelDao.getId(), transaction3, internalCallContext);
+
+        clock.addDays(1);
+        final DateTime newTime = clock.getUTCNow();
+
+
+        final InternalCallContext internalCallContextWithNewTime = new InternalCallContext(InternalCallContextFactory.INTERNAL_TENANT_RECORD_ID, 1687L, UUID.randomUUID(),
+                                                                                        UUID.randomUUID().toString(), CallOrigin.TEST,
+                                                                                        UserType.TEST, "Testing", "This is a test",
+                                                                                        newTime, newTime);
+
+        final PaymentTransactionModelDao transaction4 = new PaymentTransactionModelDao(initialTime, initialTime, transactionExternalKey4,
+                                                                                       paymentModelDao.getId(), TransactionType.AUTHORIZE, newTime,
+                                                                                       TransactionStatus.PENDING, BigDecimal.TEN, Currency.AED,
+                                                                                       "pending", "");
+        paymentDao.updateDirectPaymentWithNewTransaction(paymentModelDao.getId(), transaction4, internalCallContextWithNewTime);
+
+
+        final List<PaymentTransactionModelDao> result = getPendingTransactions(paymentModelDao.getId());
+        Assert.assertEquals(result.size(), 3);
+
+
+        paymentDao.failOldPendingTransactions(TransactionStatus.PAYMENT_FAILURE, newTime, internalCallContext);
+
+        final List<PaymentTransactionModelDao> result2 = getPendingTransactions(paymentModelDao.getId());
+        Assert.assertEquals(result2.size(), 1);
+
+        // Just to guarantee that next clock.getUTCNow() > newTime
+        try { Thread.sleep(1000); } catch (InterruptedException e) {};
+
+        paymentDao.failOldPendingTransactions(TransactionStatus.PAYMENT_FAILURE, clock.getUTCNow(), internalCallContextWithNewTime);
+
+        final List<PaymentTransactionModelDao> result3 = getPendingTransactions(paymentModelDao.getId());
+        Assert.assertEquals(result3.size(), 0);
+
+    }
+
+    private List<PaymentTransactionModelDao> getPendingTransactions(final UUID paymentId) {
+        final List<PaymentTransactionModelDao> total =  paymentDao.getDirectTransactionsForDirectPayment(paymentId, internalCallContext);
+        return ImmutableList.copyOf(Iterables.filter(total, new Predicate<PaymentTransactionModelDao>() {
+            @Override
+            public boolean apply(final PaymentTransactionModelDao input) {
+                return input.getTransactionStatus() == TransactionStatus.PENDING;
+            }
+        }));
+    }
 }
+
diff --git a/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java b/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
index 4dde839..67f5014 100644
--- a/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
+++ b/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
@@ -65,6 +65,16 @@ public interface PaymentConfig extends KillbillConfig {
     @Description("Number of threads for plugin executor dispatcher")
     public int getPaymentPluginThreadNb();
 
+    @Config("org.killbill.payment.janitor.pending")
+    @Default("12h")
+    @Description("Delay after which pending transactions should be marked as failed")
+    public TimeSpan getJanitorPendingCleanupTime();
+
+    @Config("org.killbill.payment.janitor.rate")
+    @Default("1h")
+    @Description("Rate at which janitor tasks are scheduled")
+    public TimeSpan getJanitorRunningRate();
+
     @Config("org.killbill.payment.off")
     @Default("false")
     @Description("Whether the payment subsystem is off")
diff --git a/util/src/main/java/org/killbill/billing/util/entity/dao/EntitySqlDaoWrapperInvocationHandler.java b/util/src/main/java/org/killbill/billing/util/entity/dao/EntitySqlDaoWrapperInvocationHandler.java
index 240fe5b..456bf95 100644
--- a/util/src/main/java/org/killbill/billing/util/entity/dao/EntitySqlDaoWrapperInvocationHandler.java
+++ b/util/src/main/java/org/killbill/billing/util/entity/dao/EntitySqlDaoWrapperInvocationHandler.java
@@ -23,12 +23,14 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Type;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.killbill.billing.util.tag.dao.UUIDCollectionBinder;
 import org.skife.jdbi.v2.Binding;
 import org.skife.jdbi.v2.StatementContext;
 import org.skife.jdbi.v2.exceptions.DBIException;
@@ -330,19 +332,14 @@ public class EntitySqlDaoWrapperInvocationHandler<S extends EntitySqlDao<M, E>, 
                 }
             }
 
-            // Otherwise, use the first String argument, annotated with @Bind("id")
-            // This is true for e.g. update calls
-            if (!(arg instanceof String)) {
-                continue;
-            }
-
             for (final Annotation annotation : parameterAnnotations[i]) {
-                if (Bind.class.equals(annotation.annotationType()) && ("id").equals(((Bind) annotation).value())) {
+                if (arg instanceof String && Bind.class.equals(annotation.annotationType()) && ("id").equals(((Bind) annotation).value())) {
                     return ImmutableList.<String>of((String) arg);
+                } else if (arg instanceof Collection && UUIDCollectionBinder.class.equals(annotation.annotationType())) {
+                    return ImmutableList.<String>copyOf((Collection) arg);
                 }
             }
         }
-
         return null;
     }
 
diff --git a/util/src/main/java/org/killbill/billing/util/tag/dao/UUIDCollectionBinder.java b/util/src/main/java/org/killbill/billing/util/tag/dao/UUIDCollectionBinder.java
index db4df65..b5f431d 100644
--- a/util/src/main/java/org/killbill/billing/util/tag/dao/UUIDCollectionBinder.java
+++ b/util/src/main/java/org/killbill/billing/util/tag/dao/UUIDCollectionBinder.java
@@ -39,7 +39,7 @@ public @interface UUIDCollectionBinder {
 
                 @Override
                 public void bind(SQLStatement<?> query, UUIDCollectionBinder bind, Collection<String> ids) {
-                    query.define("tag_definition_ids", ids);
+                    query.define("ids", ids);
 
                     int idx = 0;
                     for (String id : ids) {
diff --git a/util/src/main/resources/org/killbill/billing/util/tag/dao/TagDefinitionSqlDao.sql.stg b/util/src/main/resources/org/killbill/billing/util/tag/dao/TagDefinitionSqlDao.sql.stg
index 08e0800..e313b15 100644
--- a/util/src/main/resources/org/killbill/billing/util/tag/dao/TagDefinitionSqlDao.sql.stg
+++ b/util/src/main/resources/org/killbill/billing/util/tag/dao/TagDefinitionSqlDao.sql.stg
@@ -58,12 +58,12 @@ and t.is_active
 ;
 >>
 
-getByIds(tag_definition_ids) ::= <<
+getByIds(ids) ::= <<
 select
   <allTableFields("t.")>
 from <tableName()> t
 where t.is_active
-and <idField("t.")> in (<tag_definition_ids: {id | :id_<i0>}; separator="," >)
+and <idField("t.")> in (<ids: {id | :id_<i0>}; separator="," >)
 <AND_CHECK_TENANT("t.")>
 ;
 >>