killbill-uncached

Rework some of the janitor tasks after code review: Merge both

6/12/2015 10:59:45 PM

Changes

payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingPaymentTransactionTask.java 74(+0 -74)

payment/src/main/java/org/killbill/billing/payment/core/janitor/UnknownPaymentTransactionTask.java 161(+0 -161)

pom.xml 2(+1 -1)

Details

diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
index 4d66815..1073e71 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
@@ -106,8 +106,4 @@ abstract class CompletionTaskBase<T> implements Runnable {
         return callContext;
     }
 
-    protected DateTime getCreatedDateBefore() {
-        final long delayBeforeNowMs = paymentConfig.getJanitorPendingCleanupTime().getMillis();
-        return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
-    }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
index d58a5e5..64220d0 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
@@ -19,6 +19,7 @@ package org.killbill.billing.payment.core.janitor;
 
 import java.util.List;
 
+import org.joda.time.DateTime;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.account.api.AccountApiException;
 import org.killbill.billing.account.api.AccountInternalApi;
@@ -124,4 +125,10 @@ final class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAttem
             log.warn("Janitor AttemptCompletionTask failed to complete payment attempt " + attempt.getId(), e);
         }
     }
+
+    private DateTime getCreatedDateBefore() {
+        final long delayBeforeNowMs = paymentConfig.getIncompleteAttemptsTimeSpanDelay().getMillis();
+        return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
+    }
+
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
new file mode 100644
index 0000000..4edf27f
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core.janitor;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.account.api.AccountInternalApi;
+import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.core.PaymentTransactionInfoPluginConverter;
+import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
+import org.killbill.billing.payment.core.sm.PluginRoutingPaymentAutomatonRunner;
+import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
+import org.killbill.billing.payment.dao.PaymentDao;
+import org.killbill.billing.payment.dao.PaymentMethodModelDao;
+import org.killbill.billing.payment.dao.PaymentModelDao;
+import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
+import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
+import org.killbill.billing.payment.plugin.api.PaymentPluginApiException;
+import org.killbill.billing.payment.plugin.api.PaymentTransactionInfoPlugin;
+import org.killbill.billing.util.callcontext.CallContext;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.clock.Clock;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class IncompletePaymentTransactionTask extends CompletionTaskBase<PaymentTransactionModelDao> {
+
+    private final static ImmutableList<TransactionStatus> TRANSACTION_STATUSES_TO_CONSIDER = ImmutableList.<TransactionStatus>builder()
+                                                                                                          .add(TransactionStatus.PENDING)
+                                                                                                          .add(TransactionStatus.UNKNOWN)
+                                                                                                          .build();
+    private final static int MAX_ITEMS_PER_LOOP = 100;
+
+
+    public IncompletePaymentTransactionTask(final Janitor janitor, final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
+                                            final PaymentDao paymentDao, final Clock clock,
+                                            final PaymentStateMachineHelper paymentStateMachineHelper, final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
+                                            final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
+        super(janitor, internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
+    }
+
+    @Override
+    public List<PaymentTransactionModelDao> getItemsForIteration() {
+        final List<PaymentTransactionModelDao> result = paymentDao.getByTransactionStatusAcrossTenants(TRANSACTION_STATUSES_TO_CONSIDER, getCreatedDateBefore(), getCreatedDateAfter(), MAX_ITEMS_PER_LOOP);
+        if (!result.isEmpty()) {
+            log.info("Janitor IncompletePaymentTransactionTask start run: found {} errored/unknown payments", result.size());
+        }
+        return result;
+    }
+
+    @Override
+    public void doIteration(final PaymentTransactionModelDao paymentTransaction) {
+
+        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(paymentTransaction.getTenantRecordId(), paymentTransaction.getAccountRecordId());
+        final CallContext callContext = createCallContext("IncompletePaymentTransactionTask", internalTenantContext);
+        final PaymentModelDao payment = paymentDao.getPayment(paymentTransaction.getPaymentId(), internalTenantContext);
+
+
+        final PaymentMethodModelDao paymentMethod = paymentDao.getPaymentMethod(payment.getPaymentMethodId(), internalTenantContext);
+        final PaymentPluginApi paymentPluginApi = getPaymentPluginApi(payment, paymentMethod.getPluginName());
+
+        PaymentTransactionInfoPlugin paymentTransactionInfoPlugin;
+        try {
+            final List<PaymentTransactionInfoPlugin> result = paymentPluginApi.getPaymentInfo(payment.getAccountId(), payment.getId(), ImmutableList.<PluginProperty>of(), callContext);
+            paymentTransactionInfoPlugin = Iterables.tryFind(result, new Predicate<PaymentTransactionInfoPlugin>() {
+                @Override
+                public boolean apply(final PaymentTransactionInfoPlugin input) {
+                    return input.getKbTransactionPaymentId().equals(paymentTransaction.getId());
+                }
+            }).orNull();
+        } catch (final PaymentPluginApiException ignored) {
+            // The paymentTransactionInfoPlugin will be null and handled below as if the plugin did not know about that transaction
+            paymentTransactionInfoPlugin = null;
+        }
+
+        //
+        // First obtain the new transactionStatus,
+        // Then compute the new paymentState; this one is mostly interesting in case of success (to compute the lastSuccessPaymentState below)
+        //
+        final TransactionStatus transactionStatus = computeNewTransactionStatusFromPaymentTransactionInfoPlugin(paymentTransactionInfoPlugin, paymentTransaction.getTransactionStatus());
+        final String newPaymentState;
+        switch (transactionStatus) {
+            case PENDING:
+                newPaymentState = paymentStateMachineHelper.getPendingStateForTransaction(paymentTransaction.getTransactionType());
+                break;
+            case SUCCESS:
+                newPaymentState = paymentStateMachineHelper.getSuccessfulStateForTransaction(paymentTransaction.getTransactionType());
+                break;
+            case PAYMENT_FAILURE:
+                newPaymentState = paymentStateMachineHelper.getFailureStateForTransaction(paymentTransaction.getTransactionType());
+                break;
+            case PLUGIN_FAILURE:
+            case UNKNOWN:
+            default:
+                // In this case we don't try to update the current paymentState (since we could not get anything interesting from the plugin)
+                newPaymentState = payment.getStateName();
+                break;
+        }
+
+        // Recompute new lastSuccessPaymentState. This is important to be able to allow new operations on the state machine (for e.g an AUTH_SUCCESS would now allow a CAPTURE operation)
+        final String lastSuccessPaymentState = paymentStateMachineHelper.isSuccessState(newPaymentState) ? newPaymentState : null;
+
+        // Update the processedAmount, processedCurrency if we got a paymentTransactionInfoPlugin from the plugin and if this is a non error state
+        final BigDecimal processedAmount = (paymentTransactionInfoPlugin != null && isPendingOrFinalTransactionStatus(transactionStatus)) ?
+                                           paymentTransactionInfoPlugin.getAmount() : paymentTransaction.getProcessedAmount();
+        final Currency processedCurrency = (paymentTransactionInfoPlugin != null && isPendingOrFinalTransactionStatus(transactionStatus)) ?
+                                           paymentTransactionInfoPlugin.getCurrency() : paymentTransaction.getProcessedCurrency();
+
+        // Update the gatewayErrorCode, gatewayError if we got a paymentTransactionInfoPlugin
+        final String gatewayErrorCode = paymentTransactionInfoPlugin != null ? paymentTransactionInfoPlugin.getGatewayErrorCode() : paymentTransaction.getGatewayErrorCode();
+        final String gatewayError = paymentTransactionInfoPlugin != null ? paymentTransactionInfoPlugin.getGatewayError() : paymentTransaction.getGatewayErrorMsg();
+
+        log.info("Janitor IncompletePaymentTransactionTask repairing payment {}, transaction {}", payment.getId(), paymentTransaction.getId());
+
+        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(payment.getAccountId(), callContext);
+        paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, lastSuccessPaymentState,
+                                                           paymentTransaction.getId(), transactionStatus, processedAmount, processedCurrency, gatewayErrorCode, gatewayError, internalCallContext);
+
+    }
+
+    // Keep the existing currentTransactionStatus if we can't obtain a better answer from the plugin; if not, return the newTransactionStatus
+    private TransactionStatus computeNewTransactionStatusFromPaymentTransactionInfoPlugin(@Nullable PaymentTransactionInfoPlugin input, final TransactionStatus currentTransactionStatus) {
+        final TransactionStatus newTransactionStatus = PaymentTransactionInfoPluginConverter.toTransactionStatus(input);
+        if (newTransactionStatus == TransactionStatus.PLUGIN_FAILURE || newTransactionStatus ==  TransactionStatus.UNKNOWN) {
+            return currentTransactionStatus;
+        } else {
+            return newTransactionStatus;
+        }
+    }
+
+    private boolean isPendingOrFinalTransactionStatus(final TransactionStatus transactionStatus) {
+        return (transactionStatus == TransactionStatus.PENDING ||
+                transactionStatus == TransactionStatus.SUCCESS ||
+                transactionStatus == TransactionStatus.PAYMENT_FAILURE);
+    }
+
+    private PaymentPluginApi getPaymentPluginApi(final PaymentModelDao item, final String pluginName) {
+        final PaymentPluginApi pluginApi = pluginRegistry.getServiceForName(pluginName);
+        Preconditions.checkState(pluginApi != null, "Janitor IncompletePaymentTransactionTask cannot retrieve PaymentPluginApi " + item.getId() + ", skipping");
+        return pluginApi;
+    }
+
+    private DateTime getCreatedDateBefore() {
+        final long delayBeforeNowMs = paymentConfig.getIncompleteTransactionsTimeSpanDelay().getMillis();
+        return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
+    }
+
+
+    private DateTime getCreatedDateAfter() {
+        final long delayBeforeNowMs = paymentConfig.getIncompleteTransactionsTimeSpanGiveup().getMillis();
+        return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
+    }
+}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
index b202a5d..73eddf5 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
@@ -48,9 +48,8 @@ public class Janitor {
 
     private final ScheduledExecutorService janitorExecutor;
     private final PaymentConfig paymentConfig;
-    private final PendingPaymentTransactionTask pendingPaymentTransactionTask;
     private final IncompletePaymentAttemptTask incompletePaymentAttemptTask;
-    private final UnknownPaymentTransactionTask erroredPaymentCompletionTask;
+    private final IncompletePaymentTransactionTask incompletePaymentTransactionTask;
 
     private volatile boolean isStopped;
 
@@ -67,11 +66,9 @@ public class Janitor {
                    final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
         this.janitorExecutor = janitorExecutor;
         this.paymentConfig = paymentConfig;
-        this.pendingPaymentTransactionTask = new PendingPaymentTransactionTask(this, internalCallContextFactory, paymentConfig, paymentDao, clock, paymentSMHelper, retrySMHelper,
-                                                                 accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
         this.incompletePaymentAttemptTask = new IncompletePaymentAttemptTask(this, internalCallContextFactory, paymentConfig, paymentDao, clock, paymentSMHelper, retrySMHelper,
                                                                accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
-        this.erroredPaymentCompletionTask = new UnknownPaymentTransactionTask(this, internalCallContextFactory, paymentConfig, paymentDao, clock, paymentSMHelper, retrySMHelper,
+        this.incompletePaymentTransactionTask = new IncompletePaymentTransactionTask(this, internalCallContextFactory, paymentConfig, paymentDao, clock, paymentSMHelper, retrySMHelper,
                                                                    accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
         this.isStopped = false;
     }
@@ -82,11 +79,6 @@ public class Janitor {
             return;
         }
 
-        // Start task for removing old pending payments.
-        final TimeUnit pendingRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
-        final long pendingPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
-        janitorExecutor.scheduleAtFixedRate(pendingPaymentTransactionTask, pendingPeriod, pendingPeriod, pendingRateUnit);
-
         // Start task for completing incomplete payment attempts
         final TimeUnit attemptCompletionRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
         final long attemptCompletionPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
@@ -95,7 +87,7 @@ public class Janitor {
         // Start task for completing incomplete payment attempts
         final TimeUnit erroredCompletionRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
         final long erroredCompletionPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
-        janitorExecutor.scheduleAtFixedRate(erroredPaymentCompletionTask, erroredCompletionPeriod, erroredCompletionPeriod, erroredCompletionRateUnit);
+        janitorExecutor.scheduleAtFixedRate(incompletePaymentTransactionTask, erroredCompletionPeriod, erroredCompletionPeriod, erroredCompletionRateUnit);
     }
 
     public void stop() {
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index 910db19..c1f1203 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -156,12 +156,18 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(final TransactionStatus transactionStatus, final DateTime createdBeforeDate) {
+    public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, final DateTime createdBeforeDate, final DateTime createdAfterDate, final int limit) {
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentTransactionModelDao>>() {
             @Override
             public List<PaymentTransactionModelDao> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
                 final TransactionSqlDao transactional = entitySqlDaoWrapperFactory.become(TransactionSqlDao.class);
-                return transactional.getByTransactionStatusPriorDateAcrossTenants(transactionStatus.toString(), createdBeforeDate.toDate());
+                final Collection<String> allTransactionStatus = ImmutableList.copyOf(Iterables.transform(transactionStatuses, new Function<TransactionStatus, String>() {
+                    @Override
+                    public String apply(final TransactionStatus input) {
+                        return input.toString();
+                    }
+                }));
+                 return transactional.getByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBeforeDate.toDate(), createdAfterDate.toDate(), limit);
             }
         });
     }
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
index 8f95b2e..2ac7ea1 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
@@ -30,7 +30,7 @@ import org.killbill.billing.util.entity.Pagination;
 
 public interface PaymentDao {
 
-    public List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(final TransactionStatus transactionStatus, final DateTime createdBeforeDate);
+    public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, int limit);
 
     public PaymentAttemptModelDao insertPaymentAttemptWithProperties(PaymentAttemptModelDao attempt, InternalCallContext context);
 
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
index 19427e2..d291ca0 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
@@ -29,7 +29,6 @@ import org.killbill.billing.util.audit.ChangeType;
 import org.killbill.billing.util.entity.dao.Audited;
 import org.killbill.billing.util.entity.dao.EntitySqlDao;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoStringTemplate;
-import org.killbill.billing.util.tag.dao.UUIDCollectionBinder;
 import org.skife.jdbi.v2.sqlobject.Bind;
 import org.skife.jdbi.v2.sqlobject.BindBean;
 import org.skife.jdbi.v2.sqlobject.SqlQuery;
@@ -53,8 +52,10 @@ public interface TransactionSqlDao extends EntitySqlDao<PaymentTransactionModelD
                                                                          @BindBean final InternalTenantContext context);
 
     @SqlQuery
-    List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@Bind("transactionStatus") final String transactionStatus,
-                                                                                  @Bind("beforeCreatedDate") final Date beforeCreatedDate);
+    List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
+                                                                                  @Bind("createdBeforeDate") final Date createdBeforeDate,
+                                                                                  @Bind("createdAfterDate") final Date createdAfterDate,
+                                                                                  @Bind("limit") final int limit);
 
     @SqlQuery
     public List<PaymentTransactionModelDao> getByPaymentId(@Bind("paymentId") final UUID paymentId,
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionStatusCollectionBinder.java b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionStatusCollectionBinder.java
new file mode 100644
index 0000000..cd8973d
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionStatusCollectionBinder.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.dao;
+
+import java.lang.annotation.Annotation;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.Collection;
+
+import org.killbill.billing.payment.dao.TransactionStatusCollectionBinder.TransactionStatusCollectionBinderFactory;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.sqlobject.Binder;
+import org.skife.jdbi.v2.sqlobject.BinderFactory;
+import org.skife.jdbi.v2.sqlobject.BindingAnnotation;
+
+@BindingAnnotation(TransactionStatusCollectionBinderFactory.class)
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.PARAMETER})
+public @interface TransactionStatusCollectionBinder {
+
+    public static class TransactionStatusCollectionBinderFactory implements BinderFactory {
+
+        @Override
+        public Binder build(Annotation annotation) {
+            return new Binder<TransactionStatusCollectionBinder, Collection<String>>() {
+
+                @Override
+                public void bind(SQLStatement<?> query, TransactionStatusCollectionBinder bind, Collection<String> allTransactionStatus) {
+                    query.define("statuses", allTransactionStatus);
+
+                    int idx = 0;
+                    for (String transactionStatus : allTransactionStatus) {
+                        query.bind("status_" + idx, transactionStatus);
+                        idx++;
+                    }
+                }
+            };
+        }
+    }
+}
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentSqlDao.sql.stg
index e17fdff..0d8d592 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentSqlDao.sql.stg
@@ -108,7 +108,7 @@ select
 from <tableName()> t
 where
 created_date >= :createdAfterDate
-and created_date \<= :createdBeforeDate
+and created_date \< :createdBeforeDate
 and state_name in (<states: {state | :state_<i0>}; separator="," >)
 limit :limit
 ;
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
index 9da6875..1d54780 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
@@ -82,13 +82,15 @@ where payment_id = :paymentId
 
 
 /* Does not include AND_CHECK_TENANT() since this is a global operation */
-getByTransactionStatusPriorDateAcrossTenants() ::= <<
+getByTransactionStatusPriorDateAcrossTenants(statuses) ::= <<
 select <allTableFields()>
 from <tableName()>
-where transaction_status = :transactionStatus
-and created_date \< :beforeCreatedDate
+where
+created_date >= :createdAfterDate
+and created_date \< :createdBeforeDate
+and transaction_status in (<statuses: {status | :status_<i0>}; separator="," >)
 <defaultOrderBy()>
+limit :limit
 ;
 >>
 
-
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
index cbf00ff..21aee20 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
@@ -64,11 +64,16 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(final TransactionStatus transactionStatus, final DateTime createdBeforeDate) {
+    public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, int limit) {
         return ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
             @Override
             public boolean apply(final PaymentTransactionModelDao input) {
-                return input.getTransactionStatus() == transactionStatus;
+                return Iterables.any(transactionStatuses, new Predicate<TransactionStatus>() {
+                    @Override
+                    public boolean apply(final TransactionStatus transactionStatus) {
+                        return input.getTransactionStatus() == transactionStatus;
+                    }
+                });
             }
         }));
     }
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
index 37e6e90..358cf2b 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
@@ -259,7 +259,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         final String transactionExternalKey3 = "transaction3";
         final String transactionExternalKey4 = "transaction4";
 
-        final DateTime initialTime = clock.getUTCNow();
+        final DateTime initialTime = clock.getUTCNow().minusMinutes(1);
 
         final PaymentModelDao paymentModelDao = new PaymentModelDao(initialTime, initialTime, accountId, paymentMethodId, externalKey);
         final PaymentTransactionModelDao transaction1 = new PaymentTransactionModelDao(initialTime, initialTime, null, transactionExternalKey1,
@@ -284,7 +284,6 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
 
         clock.addDays(1);
         final DateTime newTime = clock.getUTCNow();
-
         final InternalCallContext internalCallContextWithNewTime = new InternalCallContext(InternalCallContextFactory.INTERNAL_TENANT_RECORD_ID, 1687L, UUID.randomUUID(),
                                                                                            UUID.randomUUID().toString(), CallOrigin.TEST,
                                                                                            UserType.TEST, "Testing", "This is a test",
@@ -299,7 +298,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         final List<PaymentTransactionModelDao> result = getPendingTransactions(paymentModelDao.getId());
         Assert.assertEquals(result.size(), 3);
 
-        final List<PaymentTransactionModelDao> transactions1 = paymentDao.getByTransactionStatusPriorDateAcrossTenants(TransactionStatus.PENDING, newTime);
+        final List<PaymentTransactionModelDao> transactions1 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), newTime, initialTime, 3);
         for (PaymentTransactionModelDao paymentTransaction : transactions1) {
             final String newPaymentState = "XXX_FAILED";
             paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
@@ -317,7 +316,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         }
         ;
 
-        final List<PaymentTransactionModelDao> transactions2 = paymentDao.getByTransactionStatusPriorDateAcrossTenants(TransactionStatus.PENDING, clock.getUTCNow());
+        final List<PaymentTransactionModelDao> transactions2 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), clock.getUTCNow(), initialTime, 1);
         for (PaymentTransactionModelDao paymentTransaction : transactions2) {
             final String newPaymentState = "XXX_FAILED";
             paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
index 57e2235..986106b 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -235,7 +235,7 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
         paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
                                                            payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
                                                            "foo", "bar", internalCallContext);
-        // The UnknownPaymentTransactionTask will look for UNKNOWN payment that *just happened* (in the last SAFETY_DELAY_MS=3min delay), and that are not too old (less than 3 days)
+        // The UnknownPaymentTransactionTask will look for UNKNOWN payment that *just happened* , and that are not too old (less than 7 days)
         clock.addDays(1);
         try {
             Thread.sleep(1500);
@@ -271,7 +271,7 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
         }
 
         final Payment updatedPayment = paymentApi.getPayment(payment.getId(), false, ImmutableList.<PluginProperty>of(), callContext);
-        Assert.assertEquals(updatedPayment.getTransactions().get(0).getTransactionStatus(), TransactionStatus.PAYMENT_FAILURE);
+        Assert.assertEquals(updatedPayment.getTransactions().get(0).getTransactionStatus(), TransactionStatus.SUCCESS);
 
     }
 

pom.xml 2(+1 -1)

diff --git a/pom.xml b/pom.xml
index 767de1a..ffe30ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>killbill-oss-parent</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.13-SNAPSHOT</version>
+        <version>0.14</version>
     </parent>
     <artifactId>killbill</artifactId>
     <version>0.15.0-SNAPSHOT</version>
diff --git a/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java b/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
index 4c90b6e..e0fb8bf 100644
--- a/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
+++ b/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
@@ -61,15 +61,21 @@ public interface PaymentConfig extends KillbillConfig {
     @Description("Number of threads for plugin executor dispatcher")
     public int getPaymentPluginThreadNb();
 
-    @Config("org.killbill.payment.janitor.pending")
+    @Config("org.killbill.payment.janitor.attempts.delay")
     @Default("12h")
-    @Description("Delay after which pending transactions should be marked as failed")
-    public TimeSpan getJanitorPendingCleanupTime();
+    @Description("Delay before which unresolved attempt should be retried")
+    public TimeSpan getIncompleteAttemptsTimeSpanDelay();
 
-    @Config("org.killbill.payment.janitor.attempts")
-    @Default("15m")
-    @Description("Delay after which incomplete  attempts should be completed")
-    public TimeSpan getJanitorAttemptCompletionTime();
+    @Config("org.killbill.payment.janitor.transactions.delay")
+    @Default("3m")
+    @Description("Delay before which unresolved transactions should be retried")
+    public TimeSpan getIncompleteTransactionsTimeSpanDelay();
+
+
+    @Config("org.killbill.payment.janitor.transactions.giveup")
+    @Default("7d")
+    @Description("Delay after which unresolved transactions should be abandoned")
+    public TimeSpan getIncompleteTransactionsTimeSpanGiveup();
 
     @Config("org.killbill.payment.janitor.rate")
     @Default("1h")