killbill-memoizeit

Implement new Janitor task to move payment/transaction in

9/22/2014 8:11:53 PM

Changes

Details

diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/AttemptCompletionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/AttemptCompletionTask.java
index dbd0b50..5714907 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/AttemptCompletionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/AttemptCompletionTask.java
@@ -18,17 +18,17 @@
 package org.killbill.billing.payment.core.janitor;
 
 import java.util.List;
-import java.util.UUID;
 
 import org.killbill.billing.ObjectType;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.account.api.AccountApiException;
 import org.killbill.billing.account.api.AccountInternalApi;
-import org.killbill.billing.callcontext.DefaultCallContext;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.osgi.api.OSGIServiceRegistration;
 import org.killbill.billing.payment.api.PaymentApiException;
 import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PluginControlledPaymentAutomatonRunner;
 import org.killbill.billing.payment.core.sm.RetryStateMachineHelper;
 import org.killbill.billing.payment.core.sm.RetryablePaymentStateContext;
@@ -37,12 +37,10 @@ import org.killbill.billing.payment.dao.PaymentDao;
 import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
 import org.killbill.billing.payment.dao.PluginPropertySerializer;
 import org.killbill.billing.payment.dao.PluginPropertySerializer.PluginPropertySerializerException;
-import org.killbill.billing.util.cache.Cachable.CacheType;
+import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.callcontext.CallContext;
-import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
-import org.killbill.billing.util.callcontext.UserType;
 import org.killbill.billing.util.config.PaymentConfig;
 import org.killbill.billing.util.dao.NonEntityDao;
 import org.killbill.clock.Clock;
@@ -58,27 +56,24 @@ import com.google.common.collect.Iterables;
  */
 final class AttemptCompletionTask extends CompletionTaskBase<PaymentAttemptModelDao> {
 
-
     public AttemptCompletionTask(final Janitor janitor, final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
-                                 final NonEntityDao nonEntityDao, final PaymentDao paymentDao, final Clock clock,
+                                 final NonEntityDao nonEntityDao, final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
                                  final RetryStateMachineHelper retrySMHelper, final CacheControllerDispatcher controllerDispatcher, final AccountInternalApi accountInternalApi,
-                                 final PluginControlledPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner) {
-        super(janitor, internalCallContextFactory, paymentConfig, nonEntityDao, paymentDao, clock, retrySMHelper, controllerDispatcher, accountInternalApi, pluginControlledPaymentAutomatonRunner);
+                                 final PluginControlledPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
+        super(janitor, internalCallContextFactory, paymentConfig, nonEntityDao, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, controllerDispatcher, accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
     }
 
     @Override
     public List<PaymentAttemptModelDao> getItemsForIteration() {
-        final List<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByState(retrySMHelper.getInitialState().getName(), getCreatedDateBefore(), fakeCallContext);
+        final List<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByState(retrySMHelper.getInitialState().getName(), getCreatedDateBefore(), completionTaskCallContext);
         log.info("Janitor AttemptCompletionTask start run : found " + incompleteAttempts.size() + " incomplete attempts");
         return incompleteAttempts;
     }
 
     @Override
     public void doIteration(final PaymentAttemptModelDao attempt) {
-        // STEPH seems a bit insane??
         final InternalTenantContext tenantContext = internalCallContextFactory.createInternalTenantContext(attempt.getAccountId(), attempt.getId(), ObjectType.PAYMENT_ATTEMPT);
-        final UUID tenantId = nonEntityDao.retrieveIdFromObject(tenantContext.getTenantRecordId(), ObjectType.TENANT, controllerDispatcher.getCacheController(CacheType.OBJECT_ID));
-        final CallContext callContext = new DefaultCallContext(tenantId, "AttemptCompletionJanitorTask", CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID(), clock);
+        final CallContext callContext = createCallContext("AttemptCompletionJanitorTask", tenantContext);
         final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(attempt.getAccountId(), callContext);
 
         final List<PaymentTransactionModelDao> transactions = paymentDao.getPaymentTransactionsByExternalKey(attempt.getTransactionExternalKey(), tenantContext);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
index c019395..9f68a5e 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
@@ -21,12 +21,20 @@ import java.util.List;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
+import org.killbill.billing.ObjectType;
 import org.killbill.billing.account.api.AccountInternalApi;
+import org.killbill.billing.callcontext.DefaultCallContext;
 import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PluginControlledPaymentAutomatonRunner;
 import org.killbill.billing.payment.core.sm.RetryStateMachineHelper;
 import org.killbill.billing.payment.dao.PaymentDao;
+import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
+import org.killbill.billing.util.cache.Cachable.CacheType;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
+import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.UserType;
@@ -42,35 +50,39 @@ abstract class CompletionTaskBase<T> implements Runnable {
 
     private Janitor janitor;
     private final String taskName;
-    private final PaymentConfig paymentConfig;
 
+    protected final PaymentConfig paymentConfig;
     protected final Clock clock;
     protected final PaymentDao paymentDao;
-    protected final InternalCallContext fakeCallContext;
+    protected final InternalCallContext completionTaskCallContext;
     protected final InternalCallContextFactory internalCallContextFactory;
     protected final NonEntityDao nonEntityDao;
+    protected final PaymentStateMachineHelper paymentStateMachineHelper;
     protected final RetryStateMachineHelper retrySMHelper;
     protected final CacheControllerDispatcher controllerDispatcher;
     protected final AccountInternalApi accountInternalApi;
     protected final PluginControlledPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner;
+    protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
 
 
     public CompletionTaskBase(final Janitor janitor, final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
-                              final NonEntityDao nonEntityDao, final PaymentDao paymentDao, final Clock clock,
+                              final NonEntityDao nonEntityDao, final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
                               final RetryStateMachineHelper retrySMHelper, final CacheControllerDispatcher controllerDispatcher, final AccountInternalApi accountInternalApi,
-                              final PluginControlledPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner) {
+                              final PluginControlledPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
         this.janitor = janitor;
         this.internalCallContextFactory = internalCallContextFactory;
         this.paymentConfig = paymentConfig;
         this.nonEntityDao = nonEntityDao;
         this.paymentDao = paymentDao;
         this.clock = clock;
+        this.paymentStateMachineHelper = paymentStateMachineHelper;
         this.retrySMHelper = retrySMHelper;
         this.controllerDispatcher = controllerDispatcher;
         this.accountInternalApi = accountInternalApi;
         this.pluginControlledPaymentAutomatonRunner = pluginControlledPaymentAutomatonRunner;
+        this.pluginRegistry = pluginRegistry;
         this.taskName = this.getClass().getName();
-        this.fakeCallContext = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, taskName, CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID());
+        this.completionTaskCallContext = internalCallContextFactory.createInternalCallContext((Long) null, (Long) null, taskName, CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID());
     }
 
     @Override
@@ -86,7 +98,11 @@ abstract class CompletionTaskBase<T> implements Runnable {
                 log.info("Janitor Task " + taskName + " was requested to stop");
                 return;
             }
-            doIteration(item);
+            try {
+                doIteration(item);
+            } catch(IllegalStateException e) {
+                log.warn(e.getMessage());
+            }
         }
     }
 
@@ -94,6 +110,13 @@ abstract class CompletionTaskBase<T> implements Runnable {
 
     public abstract void doIteration(final T item);
 
+    protected CallContext createCallContext(final String taskName, final InternalTenantContext tenantContext) {
+        final UUID tenantId = nonEntityDao.retrieveIdFromObject(tenantContext.getTenantRecordId(), ObjectType.TENANT, controllerDispatcher.getCacheController(CacheType.OBJECT_ID));
+        final CallContext callContext = new DefaultCallContext(tenantId, taskName, CallOrigin.INTERNAL, UserType.SYSTEM, UUID.randomUUID(), clock);
+        return callContext;
+    }
+
+
     protected DateTime getCreatedDateBefore() {
         final long delayBeforeNowMs = paymentConfig.getJanitorPendingCleanupTime().getMillis();
         return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/ErroredPaymentTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/ErroredPaymentTask.java
new file mode 100644
index 0000000..fd6d9f3
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/ErroredPaymentTask.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core.janitor;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.killbill.automaton.StateMachine;
+import org.killbill.billing.ObjectType;
+import org.killbill.billing.account.api.AccountInternalApi;
+import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.core.sm.PaymentEnteringStateCallback;
+import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
+import org.killbill.billing.payment.core.sm.PluginControlledPaymentAutomatonRunner;
+import org.killbill.billing.payment.core.sm.RetryStateMachineHelper;
+import org.killbill.billing.payment.dao.PaymentDao;
+import org.killbill.billing.payment.dao.PaymentMethodModelDao;
+import org.killbill.billing.payment.dao.PaymentModelDao;
+import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
+import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
+import org.killbill.billing.payment.plugin.api.PaymentPluginApiException;
+import org.killbill.billing.payment.plugin.api.PaymentTransactionInfoPlugin;
+import org.killbill.billing.util.cache.CacheControllerDispatcher;
+import org.killbill.billing.util.callcontext.CallContext;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.billing.util.dao.NonEntityDao;
+import org.killbill.clock.Clock;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class ErroredPaymentTask extends CompletionTaskBase<PaymentModelDao> {
+
+    // We could configure all that if this becomes useful but we also want to avoid a flurry of parameters.
+    private static final int SAFETY_DELAY_MS = (3 * 60 * 1000); // 3 minutes
+    private final int OLDER_PAYMENTS_IN_DAYS = 3; // don't look at ERRORED payment older than 3 days
+    private final int MAX_ITEMS_PER_LOOP = 100; // Limit of items per iteration
+
+    public ErroredPaymentTask(final Janitor janitor, final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
+                                 final NonEntityDao nonEntityDao, final PaymentDao paymentDao, final Clock clock,
+                                 final PaymentStateMachineHelper paymentStateMachineHelper, final RetryStateMachineHelper retrySMHelper, final CacheControllerDispatcher controllerDispatcher, final AccountInternalApi accountInternalApi,
+                                 final PluginControlledPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
+        super(janitor, internalCallContextFactory, paymentConfig, nonEntityDao, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, controllerDispatcher, accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
+    }
+
+    @Override
+    public List<PaymentModelDao> getItemsForIteration() {
+        // In theory this should be the plugin timeout but we add a 3 minutes delay for safety.
+        int delayBeforeNow = (int) paymentConfig.getPaymentPluginTimeout().getMillis() + SAFETY_DELAY_MS;
+        final DateTime createdBeforeDate = clock.getUTCNow().minusMillis(delayBeforeNow);
+
+        // We want to avoid iterating on the same failed payments -- if for some reasons they can't fix themselves.
+        final DateTime createdAfterDate = clock.getUTCNow().minusDays(OLDER_PAYMENTS_IN_DAYS);
+
+        final List<PaymentModelDao> result = paymentDao.getPaymentsByStates(paymentStateMachineHelper.getErroredStateNames(), createdBeforeDate, createdAfterDate, MAX_ITEMS_PER_LOOP, completionTaskCallContext);
+        return result;
+    }
+
+    @Override
+    public void doIteration(final PaymentModelDao item) {
+
+        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(item.getAccountId(), item.getId(), ObjectType.PAYMENT);
+        final CallContext callContext = createCallContext("ErroredPaymentTask", internalTenantContext);
+        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(item.getAccountId(), callContext);
+
+        final List<PaymentTransactionModelDao> transactions = paymentDao.getTransactionsForPayment(item.getId(), internalTenantContext);
+        Preconditions.checkState(! transactions.isEmpty(), "Janitor ErroredPaymentTask found item " + item.getId() + " with no transactions, skipping");
+
+        // We look for latest transaction in an UNKNOWN state, if not we skip
+        final PaymentTransactionModelDao unknownTransaction = transactions.get(transactions.size() - 1);
+        if (unknownTransaction.getTransactionStatus() != TransactionStatus.UNKNOWN) {
+            return;
+        }
+
+        final PaymentMethodModelDao paymentMethod = paymentDao.getPaymentMethod(item.getPaymentMethodId(), internalCallContext);
+        final PaymentPluginApi paymentPluginApi = getPaymentPluginApi(item, paymentMethod.getPluginName());
+
+
+        PaymentTransactionInfoPlugin pluginErroredTransaction = null;
+        try {
+            final List<PaymentTransactionInfoPlugin> result = paymentPluginApi.getPaymentInfo(item.getAccountId(), item.getId(), ImmutableList.<PluginProperty>of(), callContext);
+
+            pluginErroredTransaction = Iterables.tryFind(result, new Predicate<PaymentTransactionInfoPlugin>() {
+                @Override
+                public boolean apply(@Nullable final PaymentTransactionInfoPlugin input) {
+                    return input.getKbTransactionPaymentId().equals(unknownTransaction.getId());
+                }
+            }).orNull();
+        } catch (PaymentPluginApiException ignored) {
+
+        }
+
+        // Compute new transactionStatus based on pluginInfo state; and if that did not change, bail early.
+        final TransactionStatus transactionStatus = PaymentEnteringStateCallback.paymentPluginStatusToTransactionStatus(pluginErroredTransaction);
+        if (transactionStatus == unknownTransaction.getTransactionStatus()) {
+            return;
+        }
+
+        // This piece of logic is obviously outside of the state machine, and this is a bit of a hack; at least all the paymentStates internal config is
+        // kept into paymentStateMachineHelper.
+        final String newPaymentState;
+        switch (transactionStatus) {
+            case PENDING:
+                newPaymentState = paymentStateMachineHelper.getPendingStateForTransaction(unknownTransaction.getTransactionType());
+                break;
+            case SUCCESS:
+                newPaymentState = paymentStateMachineHelper.getSuccessfulStateForTransaction(unknownTransaction.getTransactionType());
+                break;
+            case PAYMENT_FAILURE:
+                newPaymentState = paymentStateMachineHelper.getFailureStateForTransaction(unknownTransaction.getTransactionType());
+                break;
+            case PLUGIN_FAILURE:
+            case UNKNOWN:
+            default:
+                newPaymentState = paymentStateMachineHelper.getErroredStateForTransaction(unknownTransaction.getTransactionType());
+                break;
+        }
+        final String lastSuccessPaymentState = paymentStateMachineHelper.isSuccessState(newPaymentState) ? newPaymentState : null;
+
+
+        final BigDecimal processedAmount = pluginErroredTransaction != null ?  pluginErroredTransaction.getAmount() : null;
+        final Currency processedCurrency = pluginErroredTransaction != null ? pluginErroredTransaction.getCurrency() : null;
+        final String gatewayErrorCode = pluginErroredTransaction != null ? pluginErroredTransaction.getGatewayErrorCode() : null;
+        final String gatewayError = pluginErroredTransaction != null ? pluginErroredTransaction.getGatewayError() : null;
+
+        paymentDao.updatePaymentAndTransactionOnCompletion(item.getAccountId(), item.getId(), unknownTransaction.getTransactionType(), newPaymentState, lastSuccessPaymentState,
+                                                           unknownTransaction.getId(), transactionStatus, processedAmount, processedCurrency, gatewayErrorCode, gatewayError, internalCallContext);
+
+    }
+
+    private PaymentPluginApi getPaymentPluginApi(final PaymentModelDao item, final String pluginName) {
+        final PaymentPluginApi pluginApi = pluginRegistry.getServiceForName(pluginName);
+        Preconditions.checkState(pluginApi != null, "Janitor ErroredPaymentTask cannot retrieve PaymentPluginApi " + item.getId() + ", skipping");
+        return pluginApi;
+    }
+}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
index f176745..0bc4c0e 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
@@ -24,10 +24,13 @@ import javax.inject.Inject;
 import javax.inject.Named;
 
 import org.killbill.billing.account.api.AccountInternalApi;
+import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PluginControlledPaymentAutomatonRunner;
 import org.killbill.billing.payment.core.sm.RetryStateMachineHelper;
 import org.killbill.billing.payment.dao.PaymentDao;
 import org.killbill.billing.payment.glue.PaymentModule;
+import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.config.PaymentConfig;
@@ -49,6 +52,7 @@ public class Janitor {
     private final PaymentConfig paymentConfig;
     private final PendingTransactionTask pendingTransactionTask;
     private final AttemptCompletionTask attemptCompletionTask;
+    private final ErroredPaymentTask erroredPaymentCompletionTask;
 
     private volatile boolean isStopped;
 
@@ -61,14 +65,18 @@ public class Janitor {
                    final InternalCallContextFactory internalCallContextFactory,
                    final PluginControlledPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner,
                    @Named(PaymentModule.JANITOR_EXECUTOR_NAMED) final ScheduledExecutorService janitorExecutor,
+                   final PaymentStateMachineHelper paymentSMHelper,
                    final RetryStateMachineHelper retrySMHelper,
-                   final CacheControllerDispatcher controllerDispatcher) {
+                   final CacheControllerDispatcher controllerDispatcher,
+                   final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
         this.janitorExecutor = janitorExecutor;
         this.paymentConfig = paymentConfig;
-        this.pendingTransactionTask = new PendingTransactionTask(this, internalCallContextFactory, paymentConfig, nonEntityDao, paymentDao, clock, retrySMHelper,
-                                                                 controllerDispatcher, accountInternalApi, pluginControlledPaymentAutomatonRunner);
-        this.attemptCompletionTask = new AttemptCompletionTask(this, internalCallContextFactory, paymentConfig, nonEntityDao, paymentDao, clock, retrySMHelper,
-                                                                 controllerDispatcher, accountInternalApi, pluginControlledPaymentAutomatonRunner);
+        this.pendingTransactionTask = new PendingTransactionTask(this, internalCallContextFactory, paymentConfig, nonEntityDao, paymentDao, clock, paymentSMHelper, retrySMHelper,
+                                                                 controllerDispatcher, accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
+        this.attemptCompletionTask = new AttemptCompletionTask(this, internalCallContextFactory, paymentConfig, nonEntityDao, paymentDao, clock, paymentSMHelper, retrySMHelper,
+                                                               controllerDispatcher, accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
+        this.erroredPaymentCompletionTask = new ErroredPaymentTask(this, internalCallContextFactory, paymentConfig, nonEntityDao, paymentDao, clock, paymentSMHelper, retrySMHelper,
+                                                               controllerDispatcher, accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
         this.isStopped = false;
     }
 
@@ -87,6 +95,11 @@ public class Janitor {
         final TimeUnit attemptCompletionRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
         final long attemptCompletionPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
         janitorExecutor.scheduleAtFixedRate(attemptCompletionTask, attemptCompletionPeriod, attemptCompletionPeriod, attemptCompletionRateUnit);
+
+        // Start task for completing incomplete payment attempts
+        final TimeUnit erroredCompletionRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
+        final long erroredCompletionPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
+        janitorExecutor.scheduleAtFixedRate(erroredPaymentCompletionTask, erroredCompletionPeriod, erroredCompletionPeriod, erroredCompletionRateUnit);
     }
 
     public void stop() {
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingTransactionTask.java
index 992f9b5..99369a2 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingTransactionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/PendingTransactionTask.java
@@ -20,10 +20,13 @@ package org.killbill.billing.payment.core.janitor;
 import java.util.List;
 
 import org.killbill.billing.account.api.AccountInternalApi;
+import org.killbill.billing.osgi.api.OSGIServiceRegistration;
 import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PluginControlledPaymentAutomatonRunner;
 import org.killbill.billing.payment.core.sm.RetryStateMachineHelper;
 import org.killbill.billing.payment.dao.PaymentDao;
+import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.config.PaymentConfig;
@@ -41,10 +44,10 @@ final class PendingTransactionTask extends CompletionTaskBase<Integer> {
     private final List<Integer> itemsForIterations;
 
     public PendingTransactionTask(final Janitor janitor, final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
-                                 final NonEntityDao nonEntityDao, final PaymentDao paymentDao, final Clock clock,
+                                 final NonEntityDao nonEntityDao, final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
                                  final RetryStateMachineHelper retrySMHelper, final CacheControllerDispatcher controllerDispatcher, final AccountInternalApi accountInternalApi,
-                                 final PluginControlledPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner) {
-        super(janitor, internalCallContextFactory, paymentConfig, nonEntityDao, paymentDao, clock, retrySMHelper, controllerDispatcher, accountInternalApi, pluginControlledPaymentAutomatonRunner);
+                                 final PluginControlledPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
+        super(janitor, internalCallContextFactory, paymentConfig, nonEntityDao, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, controllerDispatcher, accountInternalApi, pluginControlledPaymentAutomatonRunner, pluginRegistry);
         this.itemsForIterations = ImmutableList.of(new Integer(1));
     }
 
@@ -55,7 +58,9 @@ final class PendingTransactionTask extends CompletionTaskBase<Integer> {
 
     @Override
     public void doIteration(final Integer item) {
-        int result = paymentDao.failOldPendingTransactions(TransactionStatus.PLUGIN_FAILURE, getCreatedDateBefore(), fakeCallContext);
+
+        // TODO this is needs to be fixed see- #230
+        int result = paymentDao.failOldPendingTransactions(TransactionStatus.PLUGIN_FAILURE, getCreatedDateBefore(), completionTaskCallContext);
         if (result > 0) {
             log.info("Janitor PendingTransactionTask moved " + result + " PENDING payments ->  PLUGIN_FAILURE");
         }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentEnteringStateCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentEnteringStateCallback.java
index ec869d4..e31416e 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentEnteringStateCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentEnteringStateCallback.java
@@ -52,7 +52,7 @@ public abstract class PaymentEnteringStateCallback implements EnteringStateCallb
         // If the transaction was not created -- for instance we had an exception in leavingState callback then we bail; if not, then update state:
         if (paymentStateContext.getPaymentTransactionModelDao() != null && paymentStateContext.getPaymentTransactionModelDao().getId() != null) {
             final PaymentTransactionInfoPlugin paymentInfoPlugin = paymentStateContext.getPaymentInfoPlugin();
-            final TransactionStatus transactionStatus = paymentPluginStatusToTransactionStatus(paymentInfoPlugin, operationResult);
+            final TransactionStatus transactionStatus = paymentPluginStatusToTransactionStatus(paymentInfoPlugin);
             // The bus event will be posted from the transaction
             daoHelper.processPaymentInfoPlugin(transactionStatus, paymentInfoPlugin, newState.getName());
         } else if (!paymentStateContext.isApiPayment()) {
@@ -75,22 +75,19 @@ public abstract class PaymentEnteringStateCallback implements EnteringStateCallb
         }
     }
 
-    private TransactionStatus paymentPluginStatusToTransactionStatus(@Nullable final PaymentTransactionInfoPlugin paymentInfoPlugin, final OperationResult operationResult) {
-        if (paymentInfoPlugin == null) {
-            if (OperationResult.EXCEPTION.equals(operationResult)) {
-                // We got an exception during the plugin call
-                return TransactionStatus.PLUGIN_FAILURE;
-            } else {
-                // The plugin completed the call but returned null?! Bad plugin...
-                return TransactionStatus.PLUGIN_FAILURE;
-            }
-        }
+    public static TransactionStatus paymentPluginStatusToTransactionStatus(@Nullable final PaymentTransactionInfoPlugin paymentInfoPlugin) {
 
-        if (paymentInfoPlugin.getStatus() == null) {
-            // The plugin completed the call but returned an incomplete PaymentInfoPlugin?! Bad plugin...
-            return TransactionStatus.UNKNOWN;
+        //
+        // paymentInfoPlugin when we got an exception from the plugin, or if the plugin behaves badly
+        // and decides to return null; in all cases this is seen as a PLUGIN_FAILURE
+        //
+        if (paymentInfoPlugin == null || paymentInfoPlugin.getStatus() == null) {
+                return TransactionStatus.PLUGIN_FAILURE;
         }
 
+        //
+        // The plugin returned a status or it timedout and we added manually a UNKNOWN status to end up here
+        //
         switch (paymentInfoPlugin.getStatus()) {
             case PROCESSED:
                 return TransactionStatus.SUCCESS;
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentOperation.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentOperation.java
index 518042c..1a44ca5 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentOperation.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentOperation.java
@@ -94,6 +94,18 @@ public abstract class PaymentOperation extends OperationCallbackBase<PaymentTran
     @Override
     protected OperationException wrapTimeoutException(final PaymentStateContext paymentStateContext, final TimeoutException e) {
         logger.error("Plugin call TIMEOUT for account {}", paymentStateContext.getAccount().getExternalKey());
+
+        final PaymentTransactionInfoPlugin paymentInfoPlugin = new DefaultNoOpPaymentInfoPlugin(paymentStateContext.getPaymentId(),
+                                                                                                paymentStateContext.getTransactionId(),
+                                                                                                paymentStateContext.getTransactionType(),
+                                                                                                paymentStateContext.getPaymentTransactionModelDao().getProcessedAmount(),
+                                                                                                paymentStateContext.getPaymentTransactionModelDao().getProcessedCurrency(),
+                                                                                                paymentStateContext.getPaymentTransactionModelDao().getEffectiveDate(),
+                                                                                                paymentStateContext.getPaymentTransactionModelDao().getCreatedDate(),
+                                                                                                PaymentPluginStatus.UNDEFINED,
+                                                                                                null);
+
+        paymentStateContext.setPaymentInfoPlugin(paymentInfoPlugin);
         return new OperationException(e, OperationResult.EXCEPTION);
     }
 
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentStateMachineHelper.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentStateMachineHelper.java
index 763ec9c..bc58d65 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentStateMachineHelper.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/PaymentStateMachineHelper.java
@@ -33,6 +33,9 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
+/**
+ * This class needs to know about the payment state machine xml file. All the knowledge about the xml file is encapsulated here.
+ */
 public class PaymentStateMachineHelper {
 
     private static final String BIG_BANG_STATE_MACHINE_NAME = "BIG_BANG";
@@ -44,17 +47,35 @@ public class PaymentStateMachineHelper {
     private static final String VOID_STATE_MACHINE_NAME = "VOID";
     private static final String CHARGEBACK_STATE_MACHINE_NAME = "CHARGEBACK";
 
-
-    private static final String BIG_BANG_INIT_STATE_NAME = "BIG_BANG_INIT";
-    private static final String AUTHORIZE_INIT_STATE_NAME = "AUTH_INIT";
-    private static final String CAPTURE_INIT_STATE_NAME = "CAPTURE_INIT";
-    private static final String PURCHASE_INIT_STATE_NAME = "PURCHASE_INIT";
-    private static final String REFUND_INIT_STATE_NAME = "REFUND_INIT";
-    private static final String CREDIT_INIT_STATE_NAME = "CREDIT_INIT";
-    private static final String VOID_INIT_STATE_NAME = "VOID_INIT";
-    private static final String CHARGEBACK_INIT_STATE_NAME = "CHARGEBACK_INIT";
-
+    private static final String BIG_BANG_INIT = "BIG_BANG_INIT";
+
+    private static final String AUTHORIZE_SUCCESS = "AUTH_SUCCESS";
+    private static final String CAPTURE_SUCCESS = "CAPTURE_SUCCESS";
+    private static final String PURCHASE_SUCCESS = "PURCHASE_SUCCESS";
+    private static final String REFUND_SUCCESS = "REFUND_SUCCESS";
+    private static final String CREDIT_SUCCESS = "CREDIT_SUCCESS";
+    private static final String VOID_SUCCESS = "VOID_SUCCESS";
+    private static final String CHARGEBACK_SUCCESS = "CHARGEBACK_SUCCESS";
+
+    private static final String AUTHORIZE_PENDING = "AUTHORIZE_PENDING";
+
+    private static final String AUTHORIZE_FAILED = "AUTH_FAILED";
+    private static final String CAPTURE_FAILED = "CAPTURE_FAILED";
+    private static final String PURCHASE_FAILED = "PURCHASE_FAILED";
+    private static final String REFUND_FAILED = "REFUND_FAILED";
+    private static final String CREDIT_FAILED = "CREDIT_FAILED";
+    private static final String VOID_FAILED = "VOID_FAILED";
+    private static final String CHARGEBACK_FAILED = "CHARGEBACK_FAILED";
+
+    private static final String AUTH_ERRORED = "AUTH_ERRORED";
+    private static final String CAPTURE_ERRORED = "CAPTURE_ERRORED";
+    private static final String PURCHASE_ERRORED = "PURCHASE_ERRORED";
+    private static final String REFUND_ERRORED = "REFUND_ERRORED";
+    private static final String CREDIT_ERRORED = "CREDIT_ERRORED";
+    private static final String VOID_ERRORED = "VOID_ERRORED";
+    private static final String CHARGEBACK_ERRORED = "CHARGEBACK_ERRORED";
     private final StateMachineConfig stateMachineConfig;
+    private final String[] errorStateNames = {AUTH_ERRORED, CAPTURE_ERRORED, PURCHASE_ERRORED, REFUND_ERRORED, CREDIT_ERRORED, VOID_ERRORED, CHARGEBACK_ERRORED};
 
     @Inject
     public PaymentStateMachineHelper(@javax.inject.Named(PaymentModule.STATE_MACHINE_PAYMENT) final StateMachineConfig stateMachineConfig) {
@@ -62,12 +83,85 @@ public class PaymentStateMachineHelper {
     }
 
     public State getState(final String stateName) throws MissingEntryException {
-        final StateMachine stateMachine  = stateMachineConfig.getStateMachineForState(stateName);
+        final StateMachine stateMachine = stateMachineConfig.getStateMachineForState(stateName);
         return stateMachine.getState(stateName);
     }
 
     public String getInitStateNameForTransaction() {
-        return BIG_BANG_INIT_STATE_NAME;
+        return BIG_BANG_INIT;
+    }
+
+    public String getSuccessfulStateForTransaction(final TransactionType transactionType) {
+        switch (transactionType) {
+            case AUTHORIZE:
+                return AUTHORIZE_SUCCESS;
+            case CAPTURE:
+                return CAPTURE_SUCCESS;
+            case PURCHASE:
+                return PURCHASE_SUCCESS;
+            case REFUND:
+                return REFUND_SUCCESS;
+            case CREDIT:
+                return CREDIT_SUCCESS;
+            case VOID:
+                return VOID_SUCCESS;
+            case CHARGEBACK:
+                return CHARGEBACK_SUCCESS;
+            default:
+                throw new IllegalStateException("Unsupported transaction type " + transactionType);
+        }
+    }
+
+    public String getPendingStateForTransaction(final TransactionType transactionType) {
+        switch (transactionType) {
+            case AUTHORIZE:
+                return AUTHORIZE_PENDING;
+            default:
+                throw new IllegalStateException("Unsupported transaction type " + transactionType);
+        }
+    }
+
+
+    public String getErroredStateForTransaction(final TransactionType transactionType) {
+        switch (transactionType) {
+            case AUTHORIZE:
+                return AUTH_ERRORED;
+            case CAPTURE:
+                return CAPTURE_ERRORED;
+            case PURCHASE:
+                return PURCHASE_ERRORED;
+            case REFUND:
+                return REFUND_ERRORED;
+            case CREDIT:
+                return CREDIT_ERRORED;
+            case VOID:
+                return VOID_ERRORED;
+            case CHARGEBACK:
+                return CHARGEBACK_ERRORED;
+            default:
+                throw new IllegalStateException("Unsupported transaction type " + transactionType);
+        }
+    }
+
+    public String getFailureStateForTransaction(final TransactionType transactionType) {
+        switch (transactionType) {
+            case AUTHORIZE:
+                return AUTHORIZE_FAILED;
+            case CAPTURE:
+                return CAPTURE_FAILED;
+            case PURCHASE:
+                return PURCHASE_FAILED;
+            case REFUND:
+                return REFUND_FAILED;
+            case CREDIT:
+                return CREDIT_FAILED;
+            case VOID:
+                return VOID_FAILED;
+            case CHARGEBACK:
+                return CHARGEBACK_FAILED;
+            default:
+                throw new IllegalStateException("Unsupported transaction type " + transactionType);
+        }
     }
 
     public StateMachine getStateMachineForStateName(final String stateName) throws MissingEntryException {
@@ -118,4 +212,9 @@ public class PaymentStateMachineHelper {
         }).orNull();
         return transition != null ? transition.getFinalState() : null;
     }
+
+    public String[] getErroredStateNames() {
+        return errorStateNames;
+    }
+
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryStateMachineHelper.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryStateMachineHelper.java
index 30bb1ed..0f261cc 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryStateMachineHelper.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/RetryStateMachineHelper.java
@@ -37,6 +37,7 @@ public class RetryStateMachineHelper {
     private static final String INIT_STATE_NAME = "INIT";
     private static final String RETRIED_STATE_NAME = "RETRIED";
 
+
     private final StateMachineConfig retryStateMachineConfig;
     private final StateMachine retryStateMachine;
     private final Operation retryOperation;
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index 6423866..6719247 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 
 import javax.annotation.Nullable;
 import javax.inject.Inject;
+import javax.management.ImmutableDescriptor;
 
 import org.joda.time.DateTime;
 import org.killbill.billing.callcontext.InternalCallContext;
@@ -60,6 +61,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
 
 public class DefaultPaymentDao implements PaymentDao {
 
@@ -165,6 +167,8 @@ public class DefaultPaymentDao implements PaymentDao {
                             return input.getId().toString();
                         }
                     });
+
+
                     return transactional.failOldPendingTransactions(oldPendingTransactionIds, TransactionStatus.PAYMENT_FAILURE.toString(), context);
                 }
                 return 0;
@@ -322,6 +326,16 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
+    public List<PaymentModelDao> getPaymentsByStates(final String[] states, final DateTime createdBeforeDate, final DateTime createdAfterDate, final int limit, final InternalTenantContext context) {
+        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentModelDao>>() {
+            @Override
+            public List<PaymentModelDao> inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                return entitySqlDaoWrapperFactory.become(PaymentSqlDao.class).getPaymentsByStates(ImmutableList.copyOf(states), createdBeforeDate.toDate(), createdAfterDate.toDate(), context, limit);
+            }
+        });
+    }
+
+    @Override
     public List<PaymentTransactionModelDao> getTransactionsForAccount(final UUID accountId, final InternalTenantContext context) {
         Preconditions.checkArgument(context.getAccountRecordId() != null);
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentTransactionModelDao>>() {
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
index 8055938..1119b85 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
@@ -30,13 +30,13 @@ import org.killbill.billing.util.entity.Pagination;
 
 public interface PaymentDao {
 
-    public int failOldPendingTransactions(final TransactionStatus newTransactionStatus, final DateTime createdBeforeDate, final InternalCallContext context);
+    public int failOldPendingTransactions(TransactionStatus newTransactionStatus, DateTime createdBeforeDate, InternalCallContext context);
 
     public PaymentAttemptModelDao insertPaymentAttemptWithProperties(PaymentAttemptModelDao attempt, InternalCallContext context);
 
     public void updatePaymentAttempt(UUID paymentAttemptId, UUID transactionId, String state, InternalCallContext context);
 
-    public List<PaymentAttemptModelDao> getPaymentAttemptsByState(String stateName, final DateTime createdBeforeDate, InternalTenantContext context);
+    public List<PaymentAttemptModelDao> getPaymentAttemptsByState(String stateName, DateTime createdBeforeDate, InternalTenantContext context);
 
     public List<PaymentAttemptModelDao> getPaymentAttempts(String paymentExternalKey, InternalTenantContext context);
 
@@ -54,7 +54,7 @@ public interface PaymentDao {
 
     public PaymentTransactionModelDao updatePaymentWithNewTransaction(UUID paymentId, PaymentTransactionModelDao paymentTransaction, InternalCallContext context);
 
-    public void updatePaymentAndTransactionOnCompletion(UUID accountId, UUID paymentId, final TransactionType transactionType, String currentPaymentStateName, String lastPaymentSuccessStateName, UUID transactionId,
+    public void updatePaymentAndTransactionOnCompletion(UUID accountId, UUID paymentId, TransactionType transactionType, String currentPaymentStateName, String lastPaymentSuccessStateName, UUID transactionId,
                                                         TransactionStatus paymentStatus, BigDecimal processedAmount, Currency processedCurrency,
                                                         String gatewayErrorCode, String gatewayErrorMsg, InternalCallContext context);
 
@@ -64,6 +64,8 @@ public interface PaymentDao {
 
     public List<PaymentModelDao> getPaymentsForAccount(UUID accountId, InternalTenantContext context);
 
+    public List<PaymentModelDao> getPaymentsByStates(String [] states, DateTime createdBeforeDate, DateTime createdAfterDate, int limit, InternalTenantContext context);
+
     public List<PaymentTransactionModelDao> getTransactionsForAccount(UUID accountId, InternalTenantContext context);
 
     public List<PaymentTransactionModelDao> getTransactionsForPayment(UUID paymentId, InternalTenantContext context);
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
index 432d130..4d8dd0c 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
@@ -16,7 +16,10 @@
 
 package org.killbill.billing.payment.dao;
 
+import java.util.Collection;
+import java.util.Date;
 import java.util.Iterator;
+import java.util.List;
 
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
@@ -30,6 +33,7 @@ import org.skife.jdbi.v2.sqlobject.Bind;
 import org.skife.jdbi.v2.sqlobject.BindBean;
 import org.skife.jdbi.v2.sqlobject.SqlQuery;
 import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Define;
 
 @EntitySqlDaoStringTemplate
 public interface PaymentSqlDao extends EntitySqlDao<PaymentModelDao, Payment> {
@@ -56,6 +60,14 @@ public interface PaymentSqlDao extends EntitySqlDao<PaymentModelDao, Payment> {
     @SqlQuery
     public PaymentModelDao getPaymentByExternalKey(@Bind("externalKey") final String externalKey,
                                                    @BindBean final InternalTenantContext context);
+
+    @SqlQuery
+    public List<PaymentModelDao> getPaymentsByStates(@StateCollectionBinder final Collection<String> states,
+                                                     @Bind("createdBeforeDate") final Date createdBeforeDate,
+                                                     @Bind("createdAfterDate") final Date createdAfterDate,
+                                                     @BindBean final InternalTenantContext context,
+                                                     @Bind("limit") final int limit);
+
     @SqlQuery
     @SmartFetchSize(shouldStream = true)
     public Iterator<PaymentModelDao> getByPluginName(@Bind("pluginName") final String pluginName,
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/StateCollectionBinder.java b/payment/src/main/java/org/killbill/billing/payment/dao/StateCollectionBinder.java
new file mode 100644
index 0000000..8aac0c9
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/StateCollectionBinder.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2014 Groupon, Inc
+ * Copyright 2014 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.dao;
+
+import java.lang.annotation.Annotation;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.Collection;
+
+import org.killbill.billing.payment.dao.StateCollectionBinder.StateCollectionBinderFactory;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.sqlobject.Binder;
+import org.skife.jdbi.v2.sqlobject.BinderFactory;
+import org.skife.jdbi.v2.sqlobject.BindingAnnotation;
+
+
+@BindingAnnotation(StateCollectionBinderFactory.class)
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.PARAMETER})
+public @interface StateCollectionBinder {
+
+    public static class StateCollectionBinderFactory implements BinderFactory {
+
+        @Override
+        public Binder build(Annotation annotation) {
+            return new Binder<StateCollectionBinder, Collection<String>>() {
+
+                @Override
+                public void bind(SQLStatement<?> query, StateCollectionBinder bind, Collection<String> states) {
+                    query.define("states", states);
+
+                    int idx = 0;
+                    for (String state : states) {
+                        query.bind("state_" + idx, state);
+                        idx++;
+                    }
+                }
+            };
+        }
+    }
+}
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentSqlDao.sql.stg
index 51ca239..3a96d8f 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentSqlDao.sql.stg
@@ -101,3 +101,15 @@ join payment_methods pm on pm.id = t.payment_method_id
 where pm.plugin_name = :pluginName
 ;
 >>
+
+getPaymentsByStates(states) ::= <<
+select
+<allTableFields("t.")>
+from <tableName()> t
+where
+created_date >= :createdAfterDate
+and created_date \<= :createdBeforeDate
+and state_name in (<states: {state | :state_<i0>}; separator="," >)
+limit :limit
+;
+>>
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
index 2207741..2adef8b 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
@@ -220,6 +220,11 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
+    public List<PaymentModelDao> getPaymentsByStates(final String[] states, final DateTime createdBeforeDate, final DateTime createdAfterDate, final int limit, final InternalTenantContext context) {
+        return null;
+    }
+
+    @Override
     public List<PaymentTransactionModelDao> getTransactionsForAccount(final UUID accountId, final InternalTenantContext context) {
         synchronized (this) {
             return ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
index d8e3b15..69045eb 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
@@ -165,7 +165,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         assertEquals(transactions.size(), 2);
 
         paymentDao.updatePaymentAndTransactionOnCompletion(accountId, savedPayment.getId(), savedTransactionModelDao2.getTransactionType(), "AUTH_ABORTED", "AUTH_SUCCESS", transactionModelDao2.getId(), TransactionStatus.SUCCESS,
-                                                                 BigDecimal.ONE, Currency.USD, null, "nothing", internalCallContext);
+                                                           BigDecimal.ONE, Currency.USD, null, "nothing", internalCallContext);
 
         final PaymentModelDao savedPayment4 = paymentDao.getPayment(savedPayment.getId(), internalCallContext);
         assertEquals(savedPayment4.getId(), paymentModelDao.getId());
@@ -188,7 +188,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         assertEquals(savedTransactionModelDao4.getGatewayErrorMsg(), "nothing");
 
         paymentDao.updatePaymentAndTransactionOnCompletion(accountId, savedPayment.getId(), savedTransactionModelDao2.getTransactionType(), "AUTH_ABORTED", null, transactionModelDao2.getId(), TransactionStatus.SUCCESS,
-                                                                 BigDecimal.ONE, Currency.USD, null, "nothing", internalCallContext);
+                                                           BigDecimal.ONE, Currency.USD, null, "nothing", internalCallContext);
 
         final PaymentModelDao savedPayment4Again = paymentDao.getPayment(savedPayment.getId(), internalCallContext);
         assertEquals(savedPayment4Again.getId(), paymentModelDao.getId());
@@ -196,7 +196,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         assertEquals(savedPayment4Again.getLastSuccessStateName(), "AUTH_SUCCESS");
 
         paymentDao.updatePaymentAndTransactionOnCompletion(accountId, savedPayment.getId(), savedTransactionModelDao2.getTransactionType(), "AUTH_ABORTED", "AUTH_SUCCESS", transactionModelDao2.getId(), TransactionStatus.SUCCESS,
-                                                                 BigDecimal.ONE, Currency.USD, null, "nothing", internalCallContext);
+                                                           BigDecimal.ONE, Currency.USD, null, "nothing", internalCallContext);
 
         final PaymentModelDao savedPayment4Final = paymentDao.getPayment(savedPayment.getId(), internalCallContext);
         assertEquals(savedPayment4Final.getId(), paymentModelDao.getId());
@@ -285,11 +285,10 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         clock.addDays(1);
         final DateTime newTime = clock.getUTCNow();
 
-
         final InternalCallContext internalCallContextWithNewTime = new InternalCallContext(InternalCallContextFactory.INTERNAL_TENANT_RECORD_ID, 1687L, UUID.randomUUID(),
-                                                                                        UUID.randomUUID().toString(), CallOrigin.TEST,
-                                                                                        UserType.TEST, "Testing", "This is a test",
-                                                                                        newTime, newTime);
+                                                                                           UUID.randomUUID().toString(), CallOrigin.TEST,
+                                                                                           UserType.TEST, "Testing", "This is a test",
+                                                                                           newTime, newTime);
 
         final PaymentTransactionModelDao transaction4 = new PaymentTransactionModelDao(initialTime, initialTime, null, transactionExternalKey4,
                                                                                        paymentModelDao.getId(), TransactionType.AUTHORIZE, newTime,
@@ -297,18 +296,20 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
                                                                                        "pending", "");
         paymentDao.updatePaymentWithNewTransaction(paymentModelDao.getId(), transaction4, internalCallContextWithNewTime);
 
-
         final List<PaymentTransactionModelDao> result = getPendingTransactions(paymentModelDao.getId());
         Assert.assertEquals(result.size(), 3);
 
-
         paymentDao.failOldPendingTransactions(TransactionStatus.PAYMENT_FAILURE, newTime, internalCallContext);
 
         final List<PaymentTransactionModelDao> result2 = getPendingTransactions(paymentModelDao.getId());
         Assert.assertEquals(result2.size(), 1);
 
         // Just to guarantee that next clock.getUTCNow() > newTime
-        try { Thread.sleep(1000); } catch (InterruptedException e) {};
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+        ;
 
         paymentDao.failOldPendingTransactions(TransactionStatus.PAYMENT_FAILURE, clock.getUTCNow(), internalCallContextWithNewTime);
 
@@ -317,8 +318,146 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
 
     }
 
+    @Test(groups = "slow")
+    public void testPaymentByStates() {
+
+        final UUID paymentMethodId = UUID.randomUUID();
+        final UUID accountId = UUID.randomUUID();
+        final String externalKey1 = "XXhhhhooo1";
+        final String transactionExternalKey1 = "transactionXX1";
+
+        final String externalKey2 = "XXhhhhooo2";
+        final String transactionExternalKey2 = "transactionXX2";
+
+        final String externalKey3 = "XXhhhhooo3";
+        final String transactionExternalKey3 = "transactionXX3";
+
+        final String externalKey4 = "XXhhhhooo4";
+        final String transactionExternalKey4 = "transactionXX4";
+
+        final String externalKey5 = "XXhhhhooo5";
+        final String transactionExternalKey5 = "transactionXX5";
+
+        final DateTime createdAfterDate = clock.getUTCNow().minusDays(10);
+        final DateTime createdBeforeDate = clock.getUTCNow().minusDays(1);
+
+        // Right before createdAfterDate, so should not be returned
+        final DateTime createdDate1 = createdAfterDate.minusHours(1);
+        final PaymentModelDao paymentModelDao1 = new PaymentModelDao(createdDate1, createdDate1, accountId, paymentMethodId, externalKey1);
+        paymentModelDao1.setStateName("AUTH_ERRORED");
+        final PaymentTransactionModelDao transaction1 = new PaymentTransactionModelDao(createdDate1, createdDate1, null, transactionExternalKey1,
+                                                                                       paymentModelDao1.getId(), TransactionType.AUTHORIZE, createdDate1,
+                                                                                       TransactionStatus.UNKNOWN, BigDecimal.TEN, Currency.AED,
+                                                                                       "unknown", "");
+
+        final InternalCallContext context1 = new InternalCallContext(internalCallContext.getTenantRecordId(),
+                                                                     internalCallContext.getAccountRecordId(),
+                                                                     internalCallContext.getUserToken(),
+                                                                     internalCallContext.getCreatedBy(),
+                                                                     internalCallContext.getCallOrigin(),
+                                                                     internalCallContext.getContextUserType(),
+                                                                     internalCallContext.getReasonCode(),
+                                                                     internalCallContext.getComments(),
+                                                                     createdDate1,
+                                                                     createdDate1);
+        paymentDao.insertPaymentWithFirstTransaction(paymentModelDao1, transaction1, context1);
+
+
+        // Right after createdAfterDate, so it should  be returned
+        final DateTime createdDate2 = createdAfterDate.plusHours(1);
+        final PaymentModelDao paymentModelDao2 = new PaymentModelDao(createdDate2, createdDate2, accountId, paymentMethodId, externalKey2);
+        paymentModelDao2.setStateName("CAPTURE_ERRORED");
+        final PaymentTransactionModelDao transaction2 = new PaymentTransactionModelDao(createdDate2, createdDate2, null, transactionExternalKey2,
+                                                                                       paymentModelDao2.getId(), TransactionType.AUTHORIZE, createdDate2,
+                                                                                       TransactionStatus.UNKNOWN, BigDecimal.TEN, Currency.AED,
+                                                                                       "unknown", "");
+
+        final InternalCallContext context2 = new InternalCallContext(internalCallContext.getTenantRecordId(),
+                                                                     internalCallContext.getAccountRecordId(),
+                                                                     internalCallContext.getUserToken(),
+                                                                     internalCallContext.getCreatedBy(),
+                                                                     internalCallContext.getCallOrigin(),
+                                                                     internalCallContext.getContextUserType(),
+                                                                     internalCallContext.getReasonCode(),
+                                                                     internalCallContext.getComments(),
+                                                                     createdDate2,
+                                                                     createdDate2);
+        paymentDao.insertPaymentWithFirstTransaction(paymentModelDao2, transaction2, context2);
+
+        // Right before createdBeforeDate, so it should be returned
+        final DateTime createdDate3 = createdBeforeDate.minusDays(1);
+        final PaymentModelDao paymentModelDao3 = new PaymentModelDao(createdDate3, createdDate3, accountId, paymentMethodId, externalKey3);
+        paymentModelDao3.setStateName("CAPTURE_ERRORED");
+        final PaymentTransactionModelDao transaction3 = new PaymentTransactionModelDao(createdDate3, createdDate3, null, transactionExternalKey3,
+                                                                                       paymentModelDao3.getId(), TransactionType.AUTHORIZE, createdDate3,
+                                                                                       TransactionStatus.UNKNOWN, BigDecimal.TEN, Currency.AED,
+                                                                                       "unknown", "");
+
+        final InternalCallContext context3 = new InternalCallContext(internalCallContext.getTenantRecordId(),
+                                                                     internalCallContext.getAccountRecordId(),
+                                                                     internalCallContext.getUserToken(),
+                                                                     internalCallContext.getCreatedBy(),
+                                                                     internalCallContext.getCallOrigin(),
+                                                                     internalCallContext.getContextUserType(),
+                                                                     internalCallContext.getReasonCode(),
+                                                                     internalCallContext.getComments(),
+                                                                     createdDate3,
+                                                                     createdDate3);
+
+        paymentDao.insertPaymentWithFirstTransaction(paymentModelDao3, transaction3, context3);
+
+
+        // Right before createdBeforeDate but with a SUCCESS state so it should NOT be returned
+        final DateTime createdDate4 = createdBeforeDate.minusDays(1);
+        final PaymentModelDao paymentModelDao4 = new PaymentModelDao(createdDate4, createdDate4, accountId, paymentMethodId, externalKey4);
+        paymentModelDao4.setStateName("CAPTURE_SUCCESS");
+        final PaymentTransactionModelDao transaction4 = new PaymentTransactionModelDao(createdDate4, createdDate4, null, transactionExternalKey4,
+                                                                                       paymentModelDao4.getId(), TransactionType.AUTHORIZE, createdDate4,
+                                                                                       TransactionStatus.UNKNOWN, BigDecimal.TEN, Currency.AED,
+                                                                                       "unknown", "");
+
+        final InternalCallContext context4 = new InternalCallContext(internalCallContext.getTenantRecordId(),
+                                                                     internalCallContext.getAccountRecordId(),
+                                                                     internalCallContext.getUserToken(),
+                                                                     internalCallContext.getCreatedBy(),
+                                                                     internalCallContext.getCallOrigin(),
+                                                                     internalCallContext.getContextUserType(),
+                                                                     internalCallContext.getReasonCode(),
+                                                                     internalCallContext.getComments(),
+                                                                     createdDate4,
+                                                                     createdDate4);
+
+        paymentDao.insertPaymentWithFirstTransaction(paymentModelDao4, transaction4, context4);
+
+        // Right after createdBeforeDate, so it should NOT be returned
+        final DateTime createdDate5 = createdBeforeDate.plusDays(1);
+        final PaymentModelDao paymentModelDao5 = new PaymentModelDao(createdDate5, createdDate5, accountId, paymentMethodId, externalKey5);
+        paymentModelDao5.setStateName("CAPTURE_ERRORED");
+        final PaymentTransactionModelDao transaction5 = new PaymentTransactionModelDao(createdDate5, createdDate5, null, transactionExternalKey5,
+                                                                                       paymentModelDao5.getId(), TransactionType.AUTHORIZE, createdDate5,
+                                                                                       TransactionStatus.UNKNOWN, BigDecimal.TEN, Currency.AED,
+                                                                                       "unknown", "");
+
+        final InternalCallContext context5 = new InternalCallContext(internalCallContext.getTenantRecordId(),
+                                                                     internalCallContext.getAccountRecordId(),
+                                                                     internalCallContext.getUserToken(),
+                                                                     internalCallContext.getCreatedBy(),
+                                                                     internalCallContext.getCallOrigin(),
+                                                                     internalCallContext.getContextUserType(),
+                                                                     internalCallContext.getReasonCode(),
+                                                                     internalCallContext.getComments(),
+                                                                     createdDate5,
+                                                                     createdDate5);
+
+        paymentDao.insertPaymentWithFirstTransaction(paymentModelDao5, transaction5, context5);
+
+        final String[] errorStates = {"AUTH_ERRORED", "CAPTURE_ERRORED", "REFUND_ERRORED", "CREDIT_ERRORED"};
+        final List<PaymentModelDao> result = paymentDao.getPaymentsByStates(errorStates, createdBeforeDate, createdAfterDate, 10, internalCallContext);
+        assertEquals(result.size(), 2);
+    }
+
     private List<PaymentTransactionModelDao> getPendingTransactions(final UUID paymentId) {
-        final List<PaymentTransactionModelDao> total =  paymentDao.getTransactionsForPayment(paymentId, internalCallContext);
+        final List<PaymentTransactionModelDao> total = paymentDao.getTransactionsForPayment(paymentId, internalCallContext);
         return ImmutableList.copyOf(Iterables.filter(total, new Predicate<PaymentTransactionModelDao>() {
             @Override
             public boolean apply(final PaymentTransactionModelDao input) {
diff --git a/util/src/main/java/org/killbill/billing/util/config/InvoiceConfig.java b/util/src/main/java/org/killbill/billing/util/config/InvoiceConfig.java
index 6f8f4bc..c32ec52 100644
--- a/util/src/main/java/org/killbill/billing/util/config/InvoiceConfig.java
+++ b/util/src/main/java/org/killbill/billing/util/config/InvoiceConfig.java
@@ -34,7 +34,7 @@ public interface InvoiceConfig extends KillbillConfig {
 
     @Config("org.killbill.invoice.usage.insert.zero.amount")
     @Default("true")
-    @Description("Whether to suppress usage items with a zero amount")
+    @Description("Whether to insert usage items with a zero amount")
     public boolean isInsertZeroUsageItems();
 
 }