killbill-memoizeit

Changes

Details

diff --git a/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java
index 3b5889f..c261a13 100644
--- a/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java
@@ -13,19 +13,9 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
-package org.killbill.billing.events;
-
-import java.util.UUID;
 
-import org.killbill.billing.payment.api.TransactionType;
-
-public interface PaymentErrorInternalEvent extends BusInternalEvent {
+package org.killbill.billing.events;
 
+public interface PaymentErrorInternalEvent extends PaymentInternalEvent {
     public String getMessage();
-
-    public UUID getAccountId();
-
-    public UUID getPaymentId();
-
-    public TransactionType getTransactionType();
 }
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java
index f1265b9..2e30ab6 100644
--- a/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java
@@ -16,27 +16,5 @@
 
 package org.killbill.billing.events;
 
-import java.math.BigDecimal;
-import java.util.UUID;
-
-import org.joda.time.DateTime;
-import org.killbill.billing.catalog.api.Currency;
-import org.killbill.billing.payment.api.TransactionStatus;
-import org.killbill.billing.payment.api.TransactionType;
-
-public interface PaymentInfoInternalEvent extends BusInternalEvent {
-
-    public UUID getPaymentId();
-
-    public UUID getAccountId();
-
-    public BigDecimal getAmount();
-
-    public Currency getCurrency();
-
-    public DateTime getEffectiveDate();
-
-    public TransactionStatus getStatus();
-
-    public TransactionType getTransactionType();
+public interface PaymentInfoInternalEvent extends PaymentInternalEvent {
 }
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentInternalEvent.java
new file mode 100644
index 0000000..63d38e4
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/events/PaymentInternalEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.events;
+
+import java.math.BigDecimal;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.api.TransactionType;
+
+public interface PaymentInternalEvent extends BusInternalEvent {
+
+    public UUID getAccountId();
+
+    public UUID getPaymentId();
+
+    public UUID getPaymentTransactionId();
+
+    public BigDecimal getAmount();
+
+    public Currency getCurrency();
+
+    public TransactionStatus getStatus();
+
+    public TransactionType getTransactionType();
+
+    public DateTime getEffectiveDate();
+}
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java
index a0198cc..be1a80b 100644
--- a/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java
@@ -13,19 +13,9 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
-package org.killbill.billing.events;
-
-import java.util.UUID;
 
-import org.killbill.billing.payment.api.TransactionType;
-
-public interface PaymentPluginErrorInternalEvent extends BusInternalEvent {
+package org.killbill.billing.events;
 
+public interface PaymentPluginErrorInternalEvent extends PaymentInternalEvent {
     public String getMessage();
-
-    public UUID getAccountId();
-
-    public UUID getPaymentId();
-
-    public TransactionType getTransactionType();
 }
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
index 86d77d9..ca63e2b 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
@@ -37,6 +37,7 @@ import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
 import org.killbill.billing.events.RepairSubscriptionInternalEvent;
 import org.killbill.billing.util.config.InvoiceConfig;
 
+import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 
@@ -60,6 +61,7 @@ public class InvoiceListener {
         this.clock = clock;
     }
 
+    @AllowConcurrentEvents
     @Subscribe
     public void handleRepairSubscriptionEvent(final RepairSubscriptionInternalEvent event) {
 
@@ -71,6 +73,7 @@ public class InvoiceListener {
         }
     }
 
+    @AllowConcurrentEvents
     @Subscribe
     public void handleSubscriptionTransition(final EffectiveSubscriptionInternalEvent event) {
 
@@ -89,6 +92,7 @@ public class InvoiceListener {
         }
     }
 
+    @AllowConcurrentEvents
     @Subscribe
     public void handleEntitlementTransition(final EffectiveEntitlementInternalEvent event) {
 
@@ -100,6 +104,7 @@ public class InvoiceListener {
         }
     }
 
+    @AllowConcurrentEvents
     @Subscribe
     public void handleBlockingStateTransition(final BlockingTransitionInternalEvent event) {
 
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
index e293356..cb6ed8c 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
@@ -31,6 +31,7 @@ import org.killbill.clock.Clock;
 import org.killbill.billing.events.ControlTagDeletionInternalEvent;
 import org.killbill.billing.util.tag.ControlTagType;
 
+import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 
@@ -51,6 +52,7 @@ public class InvoiceTagHandler {
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
+    @AllowConcurrentEvents
     @Subscribe
     public void process_AUTO_INVOICING_OFF_removal(final ControlTagDeletionInternalEvent event) {
 
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/util/KillbillEventHandler.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/util/KillbillEventHandler.java
index cae76d5..1b4a1d4 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/util/KillbillEventHandler.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/util/KillbillEventHandler.java
@@ -24,6 +24,7 @@ import org.killbill.billing.events.BusInternalEvent;
 import org.killbill.billing.util.userrequest.CompletionUserRequest;
 import org.killbill.billing.util.userrequest.CompletionUserRequestNotifier;
 
+import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 
 public class KillbillEventHandler {
@@ -55,6 +56,7 @@ public class KillbillEventHandler {
     /*
      * Killbill server event handler
      */
+    @AllowConcurrentEvents
     @Subscribe
     public void handleSubscriptionevents(final BusInternalEvent event) {
         final List<CompletionUserRequestNotifier> runningWaiters = new ArrayList<CompletionUserRequestNotifier>();
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/listener/OverdueListener.java b/overdue/src/main/java/org/killbill/billing/overdue/listener/OverdueListener.java
index a8d11b9..fa2e581 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/listener/OverdueListener.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/listener/OverdueListener.java
@@ -50,6 +50,7 @@ import org.killbill.clock.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 
@@ -73,6 +74,7 @@ public class OverdueListener {
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
+    @AllowConcurrentEvents
     @Subscribe
     public void handle_OVERDUE_ENFORCEMENT_OFF_Insert(final ControlTagCreationInternalEvent event) {
         if (event.getTagDefinition().getName().equals(ControlTagType.OVERDUE_ENFORCEMENT_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
@@ -80,6 +82,7 @@ public class OverdueListener {
         }
     }
 
+    @AllowConcurrentEvents
     @Subscribe
     public void handle_OVERDUE_ENFORCEMENT_OFF_Removal(final ControlTagDeletionInternalEvent event) {
         if (event.getTagDefinition().getName().equals(ControlTagType.OVERDUE_ENFORCEMENT_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
@@ -87,18 +90,21 @@ public class OverdueListener {
         }
     }
 
+    @AllowConcurrentEvents
     @Subscribe
     public void handlePaymentInfoEvent(final PaymentInfoInternalEvent event) {
         log.debug("Received PaymentInfo event {}", event);
         insertBusEventIntoNotificationQueue(event.getAccountId(), event, OverdueAsyncBusNotificationAction.REFRESH, event.getSearchKey2());
     }
 
+    @AllowConcurrentEvents
     @Subscribe
     public void handlePaymentErrorEvent(final PaymentErrorInternalEvent event) {
         log.debug("Received PaymentError event {}", event);
         insertBusEventIntoNotificationQueue(event.getAccountId(), event, OverdueAsyncBusNotificationAction.REFRESH, event.getSearchKey2());
     }
 
+    @AllowConcurrentEvents
     @Subscribe
     public void handleInvoiceAdjustmentEvent(final InvoiceAdjustmentInternalEvent event) {
         log.debug("Received InvoiceAdjustment event {}", event);
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java
index d81ebad..48d93b7 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java
@@ -16,55 +16,36 @@
 
 package org.killbill.billing.payment.api;
 
+import java.math.BigDecimal;
 import java.util.UUID;
 
-import org.killbill.billing.events.BusEventBase;
+import org.joda.time.DateTime;
+import org.killbill.billing.catalog.api.Currency;
 import org.killbill.billing.events.PaymentErrorInternalEvent;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErrorInternalEvent {
+public class DefaultPaymentErrorEvent extends DefaultPaymentInternalEvent implements PaymentErrorInternalEvent {
 
     private final String message;
-    private final UUID accountId;
-    private final UUID paymentId;
-    private final TransactionType transactionType;
 
     @JsonCreator
     public DefaultPaymentErrorEvent(@JsonProperty("accountId") final UUID accountId,
                                     @JsonProperty("paymentId") final UUID paymentId,
-                                    @JsonProperty("transactionType")  final TransactionType transactionType,
+                                    @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
+                                    @JsonProperty("amount") final BigDecimal amount,
+                                    @JsonProperty("currency") final Currency currency,
+                                    @JsonProperty("status") final TransactionStatus status,
+                                    @JsonProperty("transactionType") final TransactionType transactionType,
+                                    @JsonProperty("effectiveDate") final DateTime effectiveDate,
                                     @JsonProperty("message") final String message,
                                     @JsonProperty("searchKey1") final Long searchKey1,
                                     @JsonProperty("searchKey2") final Long searchKey2,
                                     @JsonProperty("userToken") final UUID userToken) {
-        super(searchKey1, searchKey2, userToken);
+        super(accountId, paymentId, paymentTransactionId, amount, currency, status, transactionType, effectiveDate, searchKey1, searchKey2, userToken);
         this.message = message;
-        this.accountId = accountId;
-        this.paymentId = paymentId;
-        this.transactionType = transactionType;
-    }
-
-    @Override
-    public String getMessage() {
-        return message;
-    }
-
-    @Override
-    public UUID getAccountId() {
-        return accountId;
-    }
-
-    @Override
-    public UUID getPaymentId() {
-        return paymentId;
-    }
-
-    @Override
-    public TransactionType getTransactionType() {
-        return transactionType;
     }
 
     @JsonIgnore
@@ -74,49 +55,12 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
     }
 
     @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder("DefaultPaymentErrorEvent{");
-        sb.append("message='").append(message).append('\'');
-        sb.append(", accountId=").append(accountId);
-        sb.append(", paymentId=").append(paymentId);
-        sb.append(", transactionType=").append(transactionType);
-        sb.append('}');
-        return sb.toString();
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof DefaultPaymentErrorEvent)) {
-            return false;
-        }
-
-        final DefaultPaymentErrorEvent that = (DefaultPaymentErrorEvent) o;
-
-        if (accountId != null ? !accountId.equals(that.accountId) : that.accountId != null) {
-            return false;
-        }
-        if (transactionType != null ? !transactionType.equals(that.transactionType) : that.transactionType != null) {
-            return false;
-        }
-        if (message != null ? !message.equals(that.message) : that.message != null) {
-            return false;
-        }
-        if (paymentId != null ? !paymentId.equals(that.paymentId) : that.paymentId != null) {
-            return false;
-        }
-
-        return true;
+    public String getMessage() {
+        return message;
     }
 
     @Override
-    public int hashCode() {
-        int result = message != null ? message.hashCode() : 0;
-        result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
-        result = 31 * result + (transactionType != null ? transactionType.hashCode() : 0);
-        result = 31 * result + (paymentId != null ? paymentId.hashCode() : 0);
-        return result;
+    protected Class getPaymentInternalEventClass() {
+        return DefaultPaymentErrorEvent.class;
     }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java
index d3d73b2..3caea55 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java
@@ -21,58 +21,25 @@ import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.killbill.billing.catalog.api.Currency;
-import org.killbill.billing.events.BusEventBase;
 import org.killbill.billing.events.PaymentInfoInternalEvent;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfoInternalEvent {
+public class DefaultPaymentInfoEvent extends DefaultPaymentInternalEvent implements PaymentInfoInternalEvent {
 
-    private final UUID accountId;
-    private final UUID paymentId;
-    private final BigDecimal amount;
-    private final Currency currency;
-    private final TransactionStatus status;
-    private final TransactionType transactionType;
-    private final DateTime effectiveDate;
-
-    @JsonCreator
     public DefaultPaymentInfoEvent(@JsonProperty("accountId") final UUID accountId,
                                    @JsonProperty("paymentId") final UUID paymentId,
+                                   @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
                                    @JsonProperty("amount") final BigDecimal amount,
                                    @JsonProperty("currency") final Currency currency,
                                    @JsonProperty("status") final TransactionStatus status,
-                                   @JsonProperty("transactionType")  final TransactionType transactionType,
-                                   @JsonProperty("extFirstPaymentRefId") final String extFirstPaymentRefId /* TODO for backward compatibility only */,
-                                   @JsonProperty("extSecondPaymentRefId") final String extSecondPaymentRefId /* TODO for backward compatibility only */,
+                                   @JsonProperty("transactionType") final TransactionType transactionType,
                                    @JsonProperty("effectiveDate") final DateTime effectiveDate,
                                    @JsonProperty("searchKey1") final Long searchKey1,
                                    @JsonProperty("searchKey2") final Long searchKey2,
                                    @JsonProperty("userToken") final UUID userToken) {
-        super(searchKey1, searchKey2, userToken);
-        this.accountId = accountId;
-        this.paymentId = paymentId;
-        this.amount = amount;
-        this.currency = currency;
-        this.status = status;
-        this.transactionType = transactionType;
-        this.effectiveDate = effectiveDate;
-    }
-
-    public DefaultPaymentInfoEvent(final UUID accountId,
-                                   final UUID paymentId,
-                                   final BigDecimal amount,
-                                   final Currency currency,
-                                   final TransactionStatus status,
-                                   final TransactionType transactionType,
-                                   final DateTime effectiveDate,
-                                   final Long searchKey1,
-                                   final Long searchKey2,
-                                   final UUID userToken) {
-        this(accountId, paymentId, amount, currency, status, transactionType, null, null,
-             effectiveDate, searchKey1, searchKey2, userToken);
+        super(accountId, paymentId, paymentTransactionId, amount, currency, status, transactionType, effectiveDate, searchKey1, searchKey2, userToken);
     }
 
     @JsonIgnore
@@ -81,131 +48,9 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
         return BusInternalEventType.PAYMENT_INFO;
     }
 
-    @Override
-    public UUID getAccountId() {
-        return accountId;
-    }
-
-
-    @Override
-    public BigDecimal getAmount() {
-        return amount;
-    }
-
-    @Override
-    public Currency getCurrency() {
-        return currency;
-    }
-
-    @Override
-    public DateTime getEffectiveDate() {
-        return effectiveDate;
-    }
-
-    @Override
-    public UUID getPaymentId() {
-        return paymentId;
-    }
-
-    @Override
-    public TransactionType getTransactionType() {
-        return transactionType;
-    }
-
-    @Override
-    public TransactionStatus getStatus() {
-        return status;
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("DefaultPaymentInfoEvent");
-        sb.append("{accountId=").append(accountId);
-        sb.append(", paymentId=").append(paymentId);
-        sb.append(", amount=").append(amount);
-        sb.append(", currency=").append(currency);
-        sb.append(", status=").append(status);
-        sb.append(", transactionType=").append(transactionType);
-        sb.append(", effectiveDate=").append(effectiveDate);
-        sb.append('}');
-        return sb.toString();
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result
-                 + ((accountId == null) ? 0 : accountId.hashCode());
-        result = prime * result + ((amount == null) ? 0 : amount.hashCode());
-        result = prime * result
-                 + ((effectiveDate == null) ? 0 : effectiveDate.hashCode());
-        result = prime * result
-                 + ((paymentId == null) ? 0 : paymentId.hashCode());
-        result = prime * result
-                 + ((currency == null) ? 0 : currency.hashCode());
-        result = prime * result + ((status == null) ? 0 : status.hashCode());
-        return result;
-    }
 
     @Override
-    public boolean equals(final Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-        final DefaultPaymentInfoEvent other = (DefaultPaymentInfoEvent) obj;
-        if (accountId == null) {
-            if (other.accountId != null) {
-                return false;
-            }
-        } else if (!accountId.equals(other.accountId)) {
-            return false;
-        }
-        if (transactionType == null) {
-            if (other.transactionType != null) {
-                return false;
-            }
-        } else if (!transactionType.equals(other.transactionType)) {
-            return false;
-        }
-        if (amount == null) {
-            if (other.amount != null) {
-                return false;
-            }
-        } else if (amount.compareTo(other.amount) != 0) {
-            return false;
-        }
-        if (effectiveDate == null) {
-            if (other.effectiveDate != null) {
-                return false;
-            }
-        } else if (effectiveDate.compareTo(other.effectiveDate) != 0) {
-            return false;
-        }
-        if (paymentId == null) {
-            if (other.paymentId != null) {
-                return false;
-            }
-        } else if (!paymentId.equals(other.paymentId)) {
-            return false;
-        }
-        if (currency == null) {
-            if (other.currency != null) {
-                return false;
-            }
-        } else if (!currency.equals(other.currency)) {
-            return false;
-        }
-        if (status != other.status) {
-            return false;
-        }
-        return true;
+    protected Class getPaymentInternalEventClass() {
+        return DefaultPaymentInfoEvent.class;
     }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInternalEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInternalEvent.java
new file mode 100644
index 0000000..84b2e36
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInternalEvent.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.api;
+
+import java.math.BigDecimal;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.events.BusEventBase;
+import org.killbill.billing.events.PaymentInternalEvent;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public abstract class DefaultPaymentInternalEvent extends BusEventBase implements PaymentInternalEvent {
+
+    private final UUID accountId;
+    private final UUID paymentId;
+    private final UUID paymentTransactionId;
+    private final BigDecimal amount;
+    private final Currency currency;
+    private final TransactionStatus status;
+    private final TransactionType transactionType;
+    private final DateTime effectiveDate;
+
+    @JsonCreator
+    public DefaultPaymentInternalEvent(@JsonProperty("accountId") final UUID accountId,
+                                       @JsonProperty("paymentId") final UUID paymentId,
+                                       @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
+                                       @JsonProperty("amount") final BigDecimal amount,
+                                       @JsonProperty("currency") final Currency currency,
+                                       @JsonProperty("status") final TransactionStatus status,
+                                       @JsonProperty("transactionType") final TransactionType transactionType,
+                                       @JsonProperty("effectiveDate") final DateTime effectiveDate,
+                                       @JsonProperty("searchKey1") final Long searchKey1,
+                                       @JsonProperty("searchKey2") final Long searchKey2,
+                                       @JsonProperty("userToken") final UUID userToken) {
+        super(searchKey1, searchKey2, userToken);
+        this.accountId = accountId;
+        this.paymentId = paymentId;
+        this.paymentTransactionId = paymentTransactionId;
+        this.amount = amount;
+        this.currency = currency;
+        this.status = status;
+        this.transactionType = transactionType;
+        this.effectiveDate = effectiveDate;
+    }
+
+    @Override
+    public UUID getAccountId() {
+        return accountId;
+    }
+
+    @Override
+    public BigDecimal getAmount() {
+        return amount;
+    }
+
+    @Override
+    public Currency getCurrency() {
+        return currency;
+    }
+
+    @Override
+    public DateTime getEffectiveDate() {
+        return effectiveDate;
+    }
+
+    @Override
+    public UUID getPaymentId() {
+        return paymentId;
+    }
+
+    @Override
+    public UUID getPaymentTransactionId() {
+        return paymentTransactionId;
+    }
+
+    @Override
+    public TransactionType getTransactionType() {
+        return transactionType;
+    }
+
+    @Override
+    public TransactionStatus getStatus() {
+        return status;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append(getPaymentInternalEventClass().toString());
+        sb.append(" {accountId=").append(accountId);
+        sb.append(", paymentId=").append(paymentId);
+        sb.append(", paymentTransactionId=").append(paymentTransactionId);
+        sb.append(", amount=").append(amount);
+        sb.append(", currency=").append(currency);
+        sb.append(", status=").append(status);
+        sb.append(", transactionType=").append(transactionType);
+        sb.append(", effectiveDate=").append(effectiveDate);
+        sb.append('}');
+        return sb.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result
+                 + ((accountId == null) ? 0 : accountId.hashCode());
+        result = prime * result + ((amount == null) ? 0 : amount.hashCode());
+        result = prime * result
+                 + ((effectiveDate == null) ? 0 : effectiveDate.hashCode());
+        result = prime * result
+                 + ((paymentId == null) ? 0 : paymentId.hashCode());
+        result = prime * result
+                 + ((paymentTransactionId == null) ? 0 : paymentTransactionId.hashCode());
+        result = prime * result
+                 + ((currency == null) ? 0 : currency.hashCode());
+        result = prime * result + ((status == null) ? 0 : status.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final DefaultPaymentInternalEvent other = (DefaultPaymentInternalEvent) obj;
+        if (accountId == null) {
+            if (other.accountId != null) {
+                return false;
+            }
+        } else if (!accountId.equals(other.accountId)) {
+            return false;
+        }
+        if (transactionType == null) {
+            if (other.transactionType != null) {
+                return false;
+            }
+        } else if (!transactionType.equals(other.transactionType)) {
+            return false;
+        }
+        if (amount == null) {
+            if (other.amount != null) {
+                return false;
+            }
+        } else if (amount.compareTo(other.amount) != 0) {
+            return false;
+        }
+        if (effectiveDate == null) {
+            if (other.effectiveDate != null) {
+                return false;
+            }
+        } else if (effectiveDate.compareTo(other.effectiveDate) != 0) {
+            return false;
+        }
+        if (paymentId == null) {
+            if (other.paymentId != null) {
+                return false;
+            }
+        } else if (!paymentId.equals(other.paymentId)) {
+            return false;
+        }
+        if (paymentTransactionId == null) {
+            if (other.paymentTransactionId != null) {
+                return false;
+            }
+        } else if (!paymentTransactionId.equals(other.paymentTransactionId)) {
+            return false;
+        }
+        if (currency == null) {
+            if (other.currency != null) {
+                return false;
+            }
+        } else if (!currency.equals(other.currency)) {
+            return false;
+        }
+        if (status != other.status) {
+            return false;
+        }
+        return true;
+    }
+
+    protected abstract Class getPaymentInternalEventClass();
+}
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java
index 5ef1ac2..dee9ada 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java
@@ -16,54 +16,36 @@
 
 package org.killbill.billing.payment.api;
 
+import java.math.BigDecimal;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.events.PaymentPluginErrorInternalEvent;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.killbill.billing.events.BusEventBase;
-import org.killbill.billing.events.PaymentPluginErrorInternalEvent;
-
-import java.util.UUID;
 
-public class DefaultPaymentPluginErrorEvent extends BusEventBase implements PaymentPluginErrorInternalEvent {
+public class DefaultPaymentPluginErrorEvent extends DefaultPaymentInternalEvent implements PaymentPluginErrorInternalEvent {
 
     private final String message;
-    private final UUID accountId;
-    private final UUID paymentId;
-    private final TransactionType transactionType;
 
     @JsonCreator
     public DefaultPaymentPluginErrorEvent(@JsonProperty("accountId") final UUID accountId,
                                           @JsonProperty("paymentId") final UUID paymentId,
-                                          @JsonProperty("transactionType")  final TransactionType transactionType,
+                                          @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
+                                          @JsonProperty("amount") final BigDecimal amount,
+                                          @JsonProperty("currency") final Currency currency,
+                                          @JsonProperty("status") final TransactionStatus status,
+                                          @JsonProperty("transactionType") final TransactionType transactionType,
+                                          @JsonProperty("effectiveDate") final DateTime effectiveDate,
                                           @JsonProperty("message") final String message,
                                           @JsonProperty("searchKey1") final Long searchKey1,
                                           @JsonProperty("searchKey2") final Long searchKey2,
                                           @JsonProperty("userToken") final UUID userToken) {
-        super(searchKey1, searchKey2, userToken);
+        super(accountId, paymentId, paymentTransactionId, amount, currency, status, transactionType, effectiveDate, searchKey1, searchKey2, userToken);
         this.message = message;
-        this.accountId = accountId;
-        this.paymentId = paymentId;
-        this.transactionType = transactionType;
-    }
-
-    @Override
-    public String getMessage() {
-        return message;
-    }
-
-    @Override
-    public UUID getAccountId() {
-        return accountId;
-    }
-
-    @Override
-    public UUID getPaymentId() {
-        return paymentId;
-    }
-
-    @Override
-    public TransactionType getTransactionType() {
-        return transactionType;
     }
 
     @JsonIgnore
@@ -73,38 +55,12 @@ public class DefaultPaymentPluginErrorEvent extends BusEventBase implements Paym
     }
 
     @Override
-    public boolean equals(final Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof DefaultPaymentPluginErrorEvent)) {
-            return false;
-        }
-
-        final DefaultPaymentPluginErrorEvent that = (DefaultPaymentPluginErrorEvent) o;
-
-        if (accountId != null ? !accountId.equals(that.accountId) : that.accountId != null) {
-            return false;
-        }
-        if (transactionType != null ? !transactionType.equals(that.transactionType) : that.transactionType != null) {
-            return false;
-        }
-        if (message != null ? !message.equals(that.message) : that.message != null) {
-            return false;
-        }
-        if (paymentId != null ? !paymentId.equals(that.paymentId) : that.paymentId != null) {
-            return false;
-        }
-
-        return true;
+    public String getMessage() {
+        return message;
     }
 
     @Override
-    public int hashCode() {
-        int result = message != null ? message.hashCode() : 0;
-        result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
-        result = 31 * result + (transactionType != null ? transactionType.hashCode() : 0);
-        result = 31 * result + (paymentId != null ? paymentId.hashCode() : 0);
-        return result;
+    protected Class getPaymentInternalEventClass() {
+        return DefaultPaymentPluginErrorEvent.class;
     }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
index ac1a218..60281b4 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
@@ -17,12 +17,17 @@
 
 package org.killbill.billing.payment.core.janitor;
 
+import java.io.IOException;
 import java.util.List;
 
+import org.killbill.billing.account.api.Account;
+import org.killbill.billing.account.api.AccountApiException;
 import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.callcontext.DefaultCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.PaymentInternalEvent;
 import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.core.ProcessorBase;
 import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
 import org.killbill.billing.payment.core.sm.PaymentStateMachineHelper;
 import org.killbill.billing.payment.dao.PaymentDao;
@@ -34,7 +39,12 @@ import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.TenantContext;
 import org.killbill.billing.util.callcontext.UserType;
 import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.billing.util.globallocker.LockerType;
 import org.killbill.clock.Clock;
+import org.killbill.commons.locker.GlobalLock;
+import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.commons.locker.LockFailedException;
+import org.killbill.notificationq.api.NotificationQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,13 +60,16 @@ abstract class CompletionTaskBase<T> implements Runnable {
     protected final PaymentControlStateMachineHelper retrySMHelper;
     protected final AccountInternalApi accountInternalApi;
     protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
+    protected final GlobalLocker locker;
+
+    protected NotificationQueue janitorQueue;
 
     private volatile boolean isStopped;
 
     public CompletionTaskBase(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
                               final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
                               final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
-                              final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
+                              final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry, final GlobalLocker locker) {
         this.internalCallContextFactory = internalCallContextFactory;
         this.paymentConfig = paymentConfig;
         this.paymentDao = paymentDao;
@@ -65,6 +78,7 @@ abstract class CompletionTaskBase<T> implements Runnable {
         this.retrySMHelper = retrySMHelper;
         this.accountInternalApi = accountInternalApi;
         this.pluginRegistry = pluginRegistry;
+        this.locker = locker;
         this.isStopped = false;
     }
 
@@ -74,7 +88,7 @@ abstract class CompletionTaskBase<T> implements Runnable {
             log.info("Janitor was requested to stop");
             return;
         }
-        final List<T> items = getItemsForIteration();
+        final Iterable<T> items = getItemsForIteration();
         for (final T item : items) {
             if (isStopped) {
                 log.info("Janitor was requested to stop");
@@ -92,10 +106,38 @@ abstract class CompletionTaskBase<T> implements Runnable {
         this.isStopped = true;
     }
 
-    public abstract List<T> getItemsForIteration();
+    public abstract Iterable<T> getItemsForIteration();
 
     public abstract void doIteration(final T item);
 
+    public abstract void processPaymentEvent(final PaymentInternalEvent event, final NotificationQueue janitorQueue) throws IOException;
+
+    public void attachJanitorQueue(final NotificationQueue janitorQueue) {
+        this.janitorQueue = janitorQueue;
+    }
+
+    public interface JanitorIterationCallback {
+        public <T> T doIteration();
+    }
+
+    protected <T> T doJanitorOperationWithAccountLock(final JanitorIterationCallback callback, final InternalTenantContext internalTenantContext) {
+        GlobalLock lock = null;
+        try {
+            final Account account = accountInternalApi.getAccountByRecordId(internalTenantContext.getAccountRecordId(), internalTenantContext);
+            lock = locker.lockWithNumberOfTries(LockerType.ACCNT_INV_PAY.toString(), account.getExternalKey(), ProcessorBase.NB_LOCK_TRY);
+            return callback.doIteration();
+        } catch (AccountApiException e) {
+            log.warn(String.format("Janitor failed to retrieve account with recordId %s", internalTenantContext.getAccountRecordId()), e);
+        } catch (LockFailedException e) {
+            log.warn(String.format("Janitor failed to lock account with recordId %s", internalTenantContext.getAccountRecordId()), e);
+        } finally {
+            if (lock != null) {
+                lock.release();
+            }
+        }
+        return null;
+    }
+
     protected CallContext createCallContext(final String taskName, final InternalTenantContext internalTenantContext) {
         final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
         return new DefaultCallContext(tenantContext.getTenantId(), taskName, CallOrigin.INTERNAL, UserType.SYSTEM, UUIDs.randomUUID(), clock);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
index ddd73dc..01a9101 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
@@ -17,6 +17,7 @@
 
 package org.killbill.billing.payment.core.janitor;
 
+import java.io.IOException;
 import java.util.List;
 
 import javax.inject.Inject;
@@ -27,6 +28,7 @@ import org.killbill.billing.account.api.AccountApiException;
 import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.PaymentInternalEvent;
 import org.killbill.billing.osgi.api.OSGIServiceRegistration;
 import org.killbill.billing.payment.api.PaymentApiException;
 import org.killbill.billing.payment.api.TransactionStatus;
@@ -43,7 +45,10 @@ import org.killbill.billing.payment.plugin.api.PaymentPluginApi;
 import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.billing.util.entity.Pagination;
 import org.killbill.clock.Clock;
+import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.notificationq.api.NotificationQueue;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
@@ -56,28 +61,36 @@ import com.google.common.collect.Iterables;
  */
 public class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAttemptModelDao> {
 
+    //
+    // Each paymentAttempt *should* transition to a new state, so fetching a limited size will still allow us to progress (as opposed to fetching the same entries over and over)
+    // We also don't expect to see too many entries in the INIT state.
+    //
+    private final static long MAX_ATTEMPTS_PER_ITERATIONS = 1000L;
+
     private final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner;
 
     @Inject
     public IncompletePaymentAttemptTask(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
                                         final PaymentDao paymentDao, final Clock clock, final PaymentStateMachineHelper paymentStateMachineHelper,
                                         final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
-                                        final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner, final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
-        super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry);
+                                        final PluginRoutingPaymentAutomatonRunner pluginControlledPaymentAutomatonRunner,
+                                        final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry, final GlobalLocker locker) {
+        super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry, locker);
         this.pluginControlledPaymentAutomatonRunner = pluginControlledPaymentAutomatonRunner;
     }
 
     @Override
-    public List<PaymentAttemptModelDao> getItemsForIteration() {
-        final List<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByStateAcrossTenants(retrySMHelper.getInitialState().getName(), getCreatedDateBefore());
-        if (!incompleteAttempts.isEmpty()) {
-            log.info("Janitor AttemptCompletionTask start run: found {} incomplete attempts", incompleteAttempts.size());
+    public Iterable<PaymentAttemptModelDao> getItemsForIteration() {
+        final Pagination<PaymentAttemptModelDao> incompleteAttempts = paymentDao.getPaymentAttemptsByStateAcrossTenants(retrySMHelper.getInitialState().getName(), getCreatedDateBefore(), 0L, MAX_ATTEMPTS_PER_ITERATIONS);
+        if (incompleteAttempts.getTotalNbRecords() > 0) {
+            log.info("Janitor AttemptCompletionTask start run: found {} incomplete attempts", incompleteAttempts.getTotalNbRecords());
         }
         return incompleteAttempts;
     }
 
     @Override
     public void doIteration(final PaymentAttemptModelDao attempt) {
+        // We don't grab account lock here as the lock will be taken when calling the completeRun API.
         final InternalTenantContext tenantContext = internalCallContextFactory.createInternalTenantContext(attempt.getTenantRecordId(), attempt.getAccountRecordId());
         final CallContext callContext = createCallContext("AttemptCompletionJanitorTask", tenantContext);
         final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(attempt.getAccountId(), callContext);
@@ -132,6 +145,11 @@ public class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAtte
         }
     }
 
+    @Override
+    public void processPaymentEvent(final PaymentInternalEvent event, final NotificationQueue janitorQueue) {
+        // Nothing
+    }
+
     private DateTime getCreatedDateBefore() {
         final long delayBeforeNowMs = paymentConfig.getIncompleteAttemptsTimeSpanDelay().getMillis();
         return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
index f7d87a1..1547008 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
@@ -17,9 +17,12 @@
 
 package org.killbill.billing.payment.core.janitor;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.UUID;
 
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import org.joda.time.DateTime;
@@ -27,6 +30,7 @@ import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.events.PaymentInternalEvent;
 import org.killbill.billing.osgi.api.OSGIServiceRegistration;
 import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.payment.api.TransactionStatus;
@@ -46,7 +50,12 @@ import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.TenantContext;
 import org.killbill.billing.util.config.PaymentConfig;
 import org.killbill.clock.Clock;
+import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.skife.config.TimeSpan;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
@@ -59,72 +68,108 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
                                                                                                           .add(TransactionStatus.PENDING)
                                                                                                           .add(TransactionStatus.UNKNOWN)
                                                                                                           .build();
-    private static final int MAX_ITEMS_PER_LOOP = 100;
 
     @Inject
     public IncompletePaymentTransactionTask(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
                                             final PaymentDao paymentDao, final Clock clock,
                                             final PaymentStateMachineHelper paymentStateMachineHelper, final PaymentControlStateMachineHelper retrySMHelper, final AccountInternalApi accountInternalApi,
-                                            final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry) {
-        super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry);
+                                            final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry, final GlobalLocker locker) {
+        super(internalCallContextFactory, paymentConfig, paymentDao, clock, paymentStateMachineHelper, retrySMHelper, accountInternalApi, pluginRegistry, locker);
     }
 
     @Override
-    public List<PaymentTransactionModelDao> getItemsForIteration() {
-        final List<PaymentTransactionModelDao> result = paymentDao.getByTransactionStatusAcrossTenants(TRANSACTION_STATUSES_TO_CONSIDER, getCreatedDateBefore(), getCreatedDateAfter(), MAX_ITEMS_PER_LOOP);
-        if (!result.isEmpty()) {
-            log.info("Janitor IncompletePaymentTransactionTask start run: found {} pending/unknown payments", result.size());
-        }
-        return result;
+    public Iterable<PaymentTransactionModelDao> getItemsForIteration() {
+        // This is not triggered by Janitor proper but instead relies on bus event + notificationQ
+        return ImmutableList.of();
     }
 
     @Override
     public void doIteration(final PaymentTransactionModelDao paymentTransaction) {
-        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(paymentTransaction.getTenantRecordId(), paymentTransaction.getAccountRecordId());
-        final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
-        final PaymentModelDao payment = paymentDao.getPayment(paymentTransaction.getPaymentId(), internalTenantContext);
-
-        final PaymentMethodModelDao paymentMethod = paymentDao.getPaymentMethod(payment.getPaymentMethodId(), internalTenantContext);
-        final PaymentPluginApi paymentPluginApi = getPaymentPluginApi(payment, paymentMethod.getPluginName());
-
-        final PaymentTransactionInfoPlugin undefinedPaymentTransaction = new DefaultNoOpPaymentInfoPlugin(payment.getId(),
-                                                                                                          paymentTransaction.getId(),
-                                                                                                          paymentTransaction.getTransactionType(),
-                                                                                                          paymentTransaction.getAmount(),
-                                                                                                          paymentTransaction.getCurrency(),
-                                                                                                          paymentTransaction.getCreatedDate(),
-                                                                                                          paymentTransaction.getCreatedDate(),
-                                                                                                          PaymentPluginStatus.UNDEFINED,
-                                                                                                          null);
-        PaymentTransactionInfoPlugin paymentTransactionInfoPlugin;
-        try {
-            final List<PaymentTransactionInfoPlugin> result = paymentPluginApi.getPaymentInfo(payment.getAccountId(), payment.getId(), ImmutableList.<PluginProperty>of(), tenantContext);
-            paymentTransactionInfoPlugin = Iterables.tryFind(result, new Predicate<PaymentTransactionInfoPlugin>() {
-                @Override
-                public boolean apply(final PaymentTransactionInfoPlugin input) {
-                    return input.getKbTransactionPaymentId().equals(paymentTransaction.getId());
-                }
-            }).or(new Supplier<PaymentTransactionInfoPlugin>() {
-                @Override
-                public PaymentTransactionInfoPlugin get() {
-                    return undefinedPaymentTransaction;
+        // Nothing
+    }
+
+    public void processNotification(final JanitorNotificationKey notificationKey, final UUID userToken, final Long accountRecordId, final long tenantRecordId) {
+
+        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId);
+        doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
+            @Override
+            public Void doIteration() {
+
+                // State may have changed since we originally retrieved with no lock
+                final PaymentTransactionModelDao rehydratedPaymentTransaction = paymentDao.getPaymentTransaction(notificationKey.getUuidKey(), internalTenantContext);
+
+                final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
+                final PaymentModelDao payment = paymentDao.getPayment(rehydratedPaymentTransaction.getPaymentId(), internalTenantContext);
+
+                final PaymentMethodModelDao paymentMethod = paymentDao.getPaymentMethod(payment.getPaymentMethodId(), internalTenantContext);
+                final PaymentPluginApi paymentPluginApi = getPaymentPluginApi(payment, paymentMethod.getPluginName());
+
+                final PaymentTransactionInfoPlugin undefinedPaymentTransaction = new DefaultNoOpPaymentInfoPlugin(payment.getId(),
+                                                                                                                  rehydratedPaymentTransaction.getId(),
+                                                                                                                  rehydratedPaymentTransaction.getTransactionType(),
+                                                                                                                  rehydratedPaymentTransaction.getAmount(),
+                                                                                                                  rehydratedPaymentTransaction.getCurrency(),
+                                                                                                                  rehydratedPaymentTransaction.getCreatedDate(),
+                                                                                                                  rehydratedPaymentTransaction.getCreatedDate(),
+                                                                                                                  PaymentPluginStatus.UNDEFINED,
+                                                                                                                  null);
+                PaymentTransactionInfoPlugin paymentTransactionInfoPlugin;
+                try {
+                    final List<PaymentTransactionInfoPlugin> result = paymentPluginApi.getPaymentInfo(payment.getAccountId(), payment.getId(), ImmutableList.<PluginProperty>of(), tenantContext);
+                    paymentTransactionInfoPlugin = Iterables.tryFind(result, new Predicate<PaymentTransactionInfoPlugin>() {
+                        @Override
+                        public boolean apply(final PaymentTransactionInfoPlugin input) {
+                            return input.getKbTransactionPaymentId().equals(rehydratedPaymentTransaction.getId());
+                        }
+                    }).or(new Supplier<PaymentTransactionInfoPlugin>() {
+                        @Override
+                        public PaymentTransactionInfoPlugin get() {
+                            return undefinedPaymentTransaction;
+                        }
+                    });
+                } catch (final Exception e) {
+                    paymentTransactionInfoPlugin = undefinedPaymentTransaction;
                 }
-            });
-        } catch (final Exception e) {
-            paymentTransactionInfoPlugin = undefinedPaymentTransaction;
-        }
+                updatePaymentAndTransactionIfNeeded(payment, notificationKey.getAttemptNumber(), userToken, rehydratedPaymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+                return null;
+            }
+        }, internalTenantContext);
 
-        updatePaymentAndTransactionIfNeeded(payment, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
     }
 
-    public boolean updatePaymentAndTransactionIfNeeded(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
-        final CallContext callContext = createCallContext("IncompletePaymentTransactionTask", internalTenantContext);
+    @Override
+    public void processPaymentEvent(final PaymentInternalEvent event, final NotificationQueue janitorQueue) {
+        if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(event.getStatus())) {
+            return;
+        }
+        insertNewNotificationForUnresolvedTransactionIfNeeded(event.getPaymentTransactionId(), 1, event.getUserToken(), event.getSearchKey1(), event.getSearchKey2());
+    }
 
+    public boolean updatePaymentAndTransactionIfNeededWithAccountLock(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
         // Can happen in the GET case, see PaymentProcessor#toPayment
         if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(paymentTransaction.getTransactionStatus())) {
             // Nothing to do
             return false;
         }
+        final Boolean result = doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
+            @Override
+            public Boolean doIteration() {
+                return updatePaymentAndTransactionInternal(payment, null, null, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+            }
+        }, internalTenantContext);
+        return result != null && result;
+    }
+
+    private boolean updatePaymentAndTransactionIfNeeded(final PaymentModelDao payment, final int attemptNumber, final UUID userToken, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
+        if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(paymentTransaction.getTransactionStatus())) {
+            // Nothing to do
+            return false;
+        }
+        return updatePaymentAndTransactionInternal(payment, attemptNumber, userToken, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+    }
+
+    private boolean updatePaymentAndTransactionInternal(final PaymentModelDao payment, @Nullable final Integer attemptNumber, @Nullable final UUID userToken, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
+        final CallContext callContext = createCallContext("IncompletePaymentTransactionTask", internalTenantContext);
 
         // First obtain the new transactionStatus,
         // Then compute the new paymentState; this one is mostly interesting in case of success (to compute the lastSuccessPaymentState below)
@@ -146,6 +191,7 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
                 log.info("Janitor IncompletePaymentTransactionTask unable to repair payment {}, transaction {}: {} -> {}",
                          payment.getId(), paymentTransaction.getId(), paymentTransaction.getTransactionStatus(), transactionStatus);
                 // We can't get anything interesting from the plugin...
+                insertNewNotificationForUnresolvedTransactionIfNeeded(paymentTransaction.getId(), attemptNumber, userToken, internalTenantContext.getAccountRecordId(), internalTenantContext.getTenantRecordId());
                 return false;
         }
 
@@ -170,6 +216,7 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
                                                            paymentTransaction.getId(), transactionStatus, processedAmount, processedCurrency, gatewayErrorCode, gatewayError, internalCallContext);
 
         return true;
+
     }
 
     // Keep the existing currentTransactionStatus if we can't obtain a better answer from the plugin; if not, return the newTransactionStatus
@@ -190,13 +237,27 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
         return pluginApi;
     }
 
-    private DateTime getCreatedDateBefore() {
-        final long delayBeforeNowMs = paymentConfig.getIncompleteTransactionsTimeSpanDelay().getMillis();
-        return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
+    @VisibleForTesting
+    DateTime getNextNotificationTime(@Nullable final Integer attemptNumber) {
+
+        final List<TimeSpan> retries = paymentConfig.getIncompleteTransactionsRetries();
+        if (attemptNumber == null || attemptNumber > retries.size()) {
+            return null;
+        }
+        final TimeSpan nextDelay = retries.get(attemptNumber - 1);
+        return clock.getUTCNow().plusMillis((int) nextDelay.getMillis());
     }
 
-    private DateTime getCreatedDateAfter() {
-        final long delayBeforeNowMs = paymentConfig.getIncompleteTransactionsTimeSpanGiveup().getMillis();
-        return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
+    private void insertNewNotificationForUnresolvedTransactionIfNeeded(final UUID paymentTransactionId, @Nullable final Integer attemptNumber, @Nullable final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+        final NotificationEvent key = new JanitorNotificationKey(paymentTransactionId, IncompletePaymentTransactionTask.class.toString(), attemptNumber);
+        final DateTime notificationTime = getNextNotificationTime(attemptNumber);
+        // Will be null in the GET path or when we run out opf attempts..
+        if (notificationTime != null) {
+            try {
+                janitorQueue.recordFutureNotification(notificationTime, key, userToken, accountRecordId, tenantRecordId);
+            } catch (IOException e) {
+                log.warn("Janitor IncompletePaymentTransactionTask : Failed to insert future notification for paymentTransactionId = {}: {}", paymentTransactionId, e.getMessage());
+            }
+        }
     }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
index 032b1a0..3b06de1 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
@@ -17,14 +17,24 @@
 
 package org.killbill.billing.payment.core.janitor;
 
+import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.inject.Inject;
 import javax.inject.Named;
 
+import org.joda.time.DateTime;
+import org.killbill.billing.events.PaymentInternalEvent;
+import org.killbill.billing.payment.glue.DefaultPaymentService;
 import org.killbill.billing.payment.glue.PaymentModule;
 import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,19 +46,25 @@ public class Janitor {
     private static final Logger log = LoggerFactory.getLogger(Janitor.class);
 
     private static final int TERMINATION_TIMEOUT_SEC = 5;
+    public static final String QUEUE_NAME = "janitor";
 
+    private final NotificationQueueService notificationQueueService;
     private final ScheduledExecutorService janitorExecutor;
     private final PaymentConfig paymentConfig;
     private final IncompletePaymentAttemptTask incompletePaymentAttemptTask;
     private final IncompletePaymentTransactionTask incompletePaymentTransactionTask;
 
+    private NotificationQueue janitorQueue;
+
     private volatile boolean isStopped;
 
     @Inject
     public Janitor(final PaymentConfig paymentConfig,
+                   final NotificationQueueService notificationQueueService,
                    @Named(PaymentModule.JANITOR_EXECUTOR_NAMED) final ScheduledExecutorService janitorExecutor,
                    final IncompletePaymentAttemptTask incompletePaymentAttemptTask,
                    final IncompletePaymentTransactionTask incompletePaymentTransactionTask) {
+        this.notificationQueueService = notificationQueueService;
         this.janitorExecutor = janitorExecutor;
         this.paymentConfig = paymentConfig;
         this.incompletePaymentAttemptTask = incompletePaymentAttemptTask;
@@ -56,12 +72,36 @@ public class Janitor {
         this.isStopped = false;
     }
 
+    public void initialize() throws NotificationQueueAlreadyExists {
+        janitorQueue = notificationQueueService.createNotificationQueue(DefaultPaymentService.SERVICE_NAME,
+                                                                        QUEUE_NAME,
+                                                                        new NotificationQueueHandler() {
+                                                                            @Override
+                                                                            public void handleReadyNotification(final NotificationEvent notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                if (!(notificationKey instanceof JanitorNotificationKey)) {
+                                                                                    log.error("Janitor service received an unexpected event type {}" + notificationKey.getClass().getName());
+                                                                                    return;
+
+                                                                                }
+                                                                                final JanitorNotificationKey janitorKey = (JanitorNotificationKey) notificationKey;
+                                                                                if (janitorKey.getTaskName().equals(incompletePaymentTransactionTask.getClass().toString())) {
+                                                                                    incompletePaymentTransactionTask.processNotification(janitorKey, userToken, accountRecordId, tenantRecordId);
+                                                                                }
+                                                                            }
+                                                                        }
+                                                                       );
+        incompletePaymentTransactionTask.attachJanitorQueue(janitorQueue);
+        incompletePaymentAttemptTask.attachJanitorQueue(janitorQueue);
+    }
+
     public void start() {
         if (isStopped) {
             log.warn("Janitor is not a restartable service, and was already started, aborting");
             return;
         }
 
+        janitorQueue.startQueue();
+
         // Start task for completing incomplete payment attempts
         final TimeUnit attemptCompletionRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
         final long attemptCompletionPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
@@ -73,7 +113,7 @@ public class Janitor {
         janitorExecutor.scheduleAtFixedRate(incompletePaymentTransactionTask, erroredCompletionPeriod, erroredCompletionPeriod, erroredCompletionRateUnit);
     }
 
-    public void stop() {
+    public void stop() throws NoSuchNotificationQueue {
         if (isStopped) {
             log.warn("Janitor is already in a stopped state");
             return;
@@ -93,6 +133,11 @@ public class Janitor {
             if (!success) {
                 log.warn("Janitor failed to complete termination within " + TERMINATION_TIMEOUT_SEC + "sec");
             }
+
+            if (janitorQueue != null) {
+                janitorQueue.stopQueue();
+                notificationQueueService.deleteNotificationQueue(DefaultPaymentService.SERVICE_NAME, QUEUE_NAME);
+            }
         } catch (final InterruptedException e) {
             Thread.currentThread().interrupt();
             log.warn("Janitor stop sequence got interrupted");
@@ -100,4 +145,9 @@ public class Janitor {
             isStopped = true;
         }
     }
+
+    public void processPaymentEvent(final PaymentInternalEvent event) {
+        incompletePaymentAttemptTask.processPaymentEvent(event, janitorQueue);
+        incompletePaymentTransactionTask.processPaymentEvent(event, janitorQueue);
+    }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java
new file mode 100644
index 0000000..b0f2c1d
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core.janitor;
+
+import java.util.UUID;
+
+import org.killbill.notificationq.DefaultUUIDNotificationKey;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class JanitorNotificationKey extends DefaultUUIDNotificationKey {
+
+    private final Integer attemptNumber;
+    private final String taskName;
+
+    @JsonCreator
+    public JanitorNotificationKey(@JsonProperty("uuidKey") final UUID uuidKey,
+                                  @JsonProperty("taskName") final String taskName,
+                                  @JsonProperty("attemptNumber") final Integer attemptNumber) {
+        super(uuidKey);
+        this.attemptNumber = attemptNumber;
+        this.taskName = taskName;
+    }
+
+    public Integer getAttemptNumber() {
+        return attemptNumber;
+    }
+
+    public String getTaskName() {
+        return taskName;
+    }
+}
\ No newline at end of file
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
index cf6afd1..0af54d5 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
@@ -416,7 +416,7 @@ public class PaymentProcessor extends ProcessorBase {
             if (paymentTransactionInfoPlugin != null) {
                 // Make sure to invoke the Janitor task in case the plugin fixes its state on the fly
                 // See https://github.com/killbill/killbill/issues/341
-                final boolean hasChanged = incompletePaymentTransactionTask.updatePaymentAndTransactionIfNeeded(newPaymentModelDao, newPaymentTransactionModelDao, paymentTransactionInfoPlugin, internalTenantContext);
+                final boolean hasChanged = incompletePaymentTransactionTask.updatePaymentAndTransactionIfNeededWithAccountLock(newPaymentModelDao, newPaymentTransactionModelDao, paymentTransactionInfoPlugin, internalTenantContext);
                 if (hasChanged) {
                     newPaymentModelDao = paymentDao.getPayment(newPaymentModelDao.getId(), internalTenantContext);
                     newPaymentTransactionModelDao = paymentDao.getPaymentTransaction(newPaymentTransactionModelDao.getId(), internalTenantContext);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java b/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
index 77a1265..ae4e3a2 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/ProcessorBase.java
@@ -67,7 +67,7 @@ import com.google.common.collect.Iterables;
 
 public abstract class ProcessorBase {
 
-    private static final int NB_LOCK_TRY = 5;
+    public static final int NB_LOCK_TRY = 5;
 
     protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
     protected final AccountInternalApi accountInternalApi;
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java
index 5b340aa..8cdc6eb 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java
@@ -65,7 +65,12 @@ public abstract class PaymentEnteringStateCallback implements EnteringStateCallb
             //
             final BusInternalEvent event = new DefaultPaymentErrorEvent(paymentStateContext.getAccount().getId(),
                                                                         null,
+                                                                        null,
+                                                                        paymentStateContext.getAmount(),
+                                                                        paymentStateContext.getCurrency(),
+                                                                        null,
                                                                         paymentStateContext.getTransactionType(),
+                                                                        null,
                                                                         "Early abortion of payment transaction",
                                                                         paymentStateContext.getInternalCallContext().getAccountRecordId(),
                                                                         paymentStateContext.getInternalCallContext().getTenantRecordId(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index 3b41115..ff64501 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -20,11 +20,9 @@ package org.killbill.billing.payment.dao;
 
 import java.math.BigDecimal;
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.Date;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 import javax.annotation.Nullable;
@@ -41,10 +39,12 @@ import org.killbill.billing.payment.api.DefaultPaymentInfoEvent;
 import org.killbill.billing.payment.api.DefaultPaymentPluginErrorEvent;
 import org.killbill.billing.payment.api.Payment;
 import org.killbill.billing.payment.api.PaymentMethod;
+import org.killbill.billing.payment.api.PaymentTransaction;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.dao.NonEntityDao;
+import org.killbill.billing.util.entity.Entity;
 import org.killbill.billing.util.entity.Pagination;
 import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper;
 import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.PaginationIteratorBuilder;
@@ -58,7 +58,6 @@ import org.skife.jdbi.v2.IDBI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -122,14 +121,24 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate) {
-        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentAttemptModelDao>>() {
-            @Override
-            public List<PaymentAttemptModelDao> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
-                final PaymentAttemptSqlDao transactional = entitySqlDaoWrapperFactory.become(PaymentAttemptSqlDao.class);
-                return transactional.getByStateNameAcrossTenants(stateName, createdBeforeDate.toDate());
-            }
-        });
+    public Pagination<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate, final Long offset, final Long limit) {
+
+        final Date createdBefore = createdBeforeDate.toDate();
+        return paginationHelper.getPagination(PaymentAttemptSqlDao.class, new PaginationIteratorBuilder<PaymentAttemptModelDao, Entity, PaymentAttemptSqlDao>() {
+                                                  @Override
+                                                  public Long getCount(final PaymentAttemptSqlDao sqlDao, final InternalTenantContext context) {
+                                                      return sqlDao.getCountByStateNameAcrossTenants(stateName, createdBefore);
+                                                  }
+                                                  @Override
+                                                  public Iterator<PaymentAttemptModelDao> build(final PaymentAttemptSqlDao sqlDao, final Long limit, final InternalTenantContext context) {
+                                                      return sqlDao.getByStateNameAcrossTenants(stateName, createdBefore, offset, limit);
+                                                  }
+                                              },
+                                              offset,
+                                              limit,
+                                              null
+                                             );
+
     }
 
     @Override
@@ -157,18 +166,29 @@ public class DefaultPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, final DateTime createdBeforeDate, final DateTime createdAfterDate, final int limit) {
-        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<PaymentTransactionModelDao>>() {
-            @Override
-            public List<PaymentTransactionModelDao> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
-                final TransactionSqlDao transactional = entitySqlDaoWrapperFactory.become(TransactionSqlDao.class);
+    public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, final DateTime createdBeforeDate, final DateTime createdAfterDate, final Long offset, final Long limit) {
 
-                final Collection<String> allTransactionStatus = ImmutableList.copyOf(Iterables.transform(transactionStatuses,  Functions.toStringFunction()));
-                 return transactional.getByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBeforeDate.toDate(), createdAfterDate.toDate(), limit);
-            }
-        });
-    }
+        final Collection<String> allTransactionStatus = ImmutableList.copyOf(Iterables.transform(transactionStatuses, Functions.toStringFunction()));
+        final Date createdBefore = createdBeforeDate.toDate();
+        final Date createdAfter = createdAfterDate.toDate();
+
+        return paginationHelper.getPagination(TransactionSqlDao.class,
+                                              new PaginationIteratorBuilder<PaymentTransactionModelDao, PaymentTransaction, TransactionSqlDao>() {
+                                                  @Override
+                                                  public Long getCount(final TransactionSqlDao sqlDao, final InternalTenantContext context) {
+                                                      return sqlDao.getCountByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBefore, createdAfter);
+                                                  }
 
+                                                  @Override
+                                                  public Iterator<PaymentTransactionModelDao> build(final TransactionSqlDao sqlDao, final Long limit, final InternalTenantContext context) {
+                                                      return sqlDao.getByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBefore, createdAfter, offset, limit);
+                                                  }
+                                              },
+                                              offset,
+                                              limit,
+                                              null
+                                             );
+    }
 
     @Override
     public List<PaymentTransactionModelDao> getPaymentTransactionsByExternalKey(final String transactionExternalKey, final InternalTenantContext context) {
@@ -281,7 +301,7 @@ public class DefaultPaymentDao implements PaymentDao {
                 } else {
                     entitySqlDaoWrapperFactory.become(PaymentSqlDao.class).updatePaymentStateName(paymentId.toString(), currentPaymentStateName, context);
                 }
-                postPaymentEventFromTransaction(accountId, transactionStatus, transactionType, paymentId, processedAmount, processedCurrency, clock.getUTCNow(), gatewayErrorCode, entitySqlDaoWrapperFactory, context);
+                postPaymentEventFromTransaction(accountId, transactionStatus, transactionType, paymentId, transactionId, processedAmount, processedCurrency, clock.getUTCNow(), gatewayErrorCode, entitySqlDaoWrapperFactory, context);
                 return null;
             }
         });
@@ -541,6 +561,7 @@ public class DefaultPaymentDao implements PaymentDao {
                                                  final TransactionStatus transactionStatus,
                                                  final TransactionType transactionType,
                                                  final UUID paymentId,
+                                                 final UUID transactionId,
                                                  final BigDecimal processedAmount,
                                                  final Currency processedCurrency,
                                                  final DateTime effectiveDate,
@@ -554,6 +575,7 @@ public class DefaultPaymentDao implements PaymentDao {
             case PENDING:
                 event = new DefaultPaymentInfoEvent(accountId,
                                                     paymentId,
+                                                    transactionId,
                                                     processedAmount,
                                                     processedCurrency,
                                                     transactionStatus,
@@ -567,7 +589,12 @@ public class DefaultPaymentDao implements PaymentDao {
             case PAYMENT_FAILURE:
                 event = new DefaultPaymentErrorEvent(accountId,
                                                      paymentId,
+                                                     transactionId,
+                                                     processedAmount,
+                                                     processedCurrency,
+                                                     transactionStatus,
                                                      transactionType,
+                                                     effectiveDate,
                                                      gatewayErrorCode,
                                                      context.getAccountRecordId(),
                                                      context.getTenantRecordId(),
@@ -578,7 +605,12 @@ public class DefaultPaymentDao implements PaymentDao {
             default:
                 event = new DefaultPaymentPluginErrorEvent(accountId,
                                                            paymentId,
+                                                           transactionId,
+                                                           processedAmount,
+                                                           processedCurrency,
+                                                           transactionStatus,
                                                            transactionType,
+                                                           effectiveDate,
                                                            gatewayErrorCode,
                                                            context.getAccountRecordId(),
                                                            context.getTenantRecordId(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
index 544e02e..cc2959e 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
@@ -17,6 +17,7 @@
 package org.killbill.billing.payment.dao;
 
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 
 import org.killbill.billing.callcontext.InternalCallContext;
@@ -50,7 +51,13 @@ public interface PaymentAttemptSqlDao extends EntitySqlDao<PaymentAttemptModelDa
                                                          @BindBean final InternalTenantContext context);
 
     @SqlQuery
-    List<PaymentAttemptModelDao> getByStateNameAcrossTenants(@Bind("stateName") final String stateName,
-                                                             @Bind("createdBeforeDate") final Date createdBeforeDate);
+    Long getCountByStateNameAcrossTenants(@Bind("stateName") final String stateName,
+                                          @Bind("createdBeforeDate") final Date createdBeforeDate);
+
+    @SqlQuery
+    Iterator<PaymentAttemptModelDao> getByStateNameAcrossTenants(@Bind("stateName") final String stateName,
+                                                                 @Bind("createdBeforeDate") final Date createdBeforeDate,
+                                                                 @Bind("offset") final Long offset,
+                                                                 @Bind("rowCount") final Long rowCount);
 
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
index 2ac7ea1..535a623 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentDao.java
@@ -30,13 +30,13 @@ import org.killbill.billing.util.entity.Pagination;
 
 public interface PaymentDao {
 
-    public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, int limit);
+    public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, final Long offset, final Long limit);
 
     public PaymentAttemptModelDao insertPaymentAttemptWithProperties(PaymentAttemptModelDao attempt, InternalCallContext context);
 
     public void updatePaymentAttempt(UUID paymentAttemptId, UUID transactionId, String state, InternalCallContext context);
 
-    public List<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(String stateName, DateTime createdBeforeDate);
+    public Pagination<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(String stateName, DateTime createdBeforeDate, final Long offset, final Long limit);
 
     public List<PaymentAttemptModelDao> getPaymentAttempts(String paymentExternalKey, InternalTenantContext context);
 
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
index 9e9ed12..0f5475b 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
@@ -70,8 +70,8 @@ public interface PaymentSqlDao extends EntitySqlDao<PaymentModelDao, Payment> {
     @SqlQuery
     @SmartFetchSize(shouldStream = true)
     public Iterator<PaymentModelDao> getByPluginName(@Bind("pluginName") final String pluginName,
-                                                           @Bind("offset") final Long offset,
-                                                           @Bind("rowCount") final Long rowCount,
+                                                     @Bind("offset") final Long offset,
+                                                     @Bind("rowCount") final Long rowCount,
                                                            @BindBean final InternalTenantContext context);
 
     @SqlQuery
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
index d291ca0..d31d589 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
@@ -19,6 +19,7 @@ package org.killbill.billing.payment.dao;
 import java.math.BigDecimal;
 import java.util.Collection;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
@@ -52,10 +53,16 @@ public interface TransactionSqlDao extends EntitySqlDao<PaymentTransactionModelD
                                                                          @BindBean final InternalTenantContext context);
 
     @SqlQuery
-    List<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
+    Long getCountByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
+                                                           @Bind("createdBeforeDate") final Date createdBeforeDate,
+                                                           @Bind("createdAfterDate") final Date createdAfterDate);
+
+    @SqlQuery
+    Iterator<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
                                                                                   @Bind("createdBeforeDate") final Date createdBeforeDate,
                                                                                   @Bind("createdAfterDate") final Date createdAfterDate,
-                                                                                  @Bind("limit") final int limit);
+                                                                                  @Bind("offset") final Long offset,
+                                                                                  @Bind("rowCount") final Long rowCount);
 
     @SqlQuery
     public List<PaymentTransactionModelDao> getByPaymentId(@Bind("paymentId") final UUID paymentId,
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java b/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
index 5cf77f0..d003f74 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
@@ -20,7 +20,7 @@ package org.killbill.billing.payment.glue;
 
 import org.killbill.billing.payment.api.PaymentApi;
 import org.killbill.billing.payment.api.PaymentService;
-import org.killbill.billing.payment.bus.InvoiceHandler;
+import org.killbill.billing.payment.bus.PaymentBusEventHandler;
 import org.killbill.billing.payment.invoice.PaymentTagHandler;
 import org.killbill.billing.payment.core.janitor.Janitor;
 import org.killbill.billing.payment.retry.DefaultRetryService;
@@ -40,7 +40,7 @@ public class DefaultPaymentService implements PaymentService {
 
     public static final String SERVICE_NAME = "payment-service";
 
-    private final InvoiceHandler invoiceHandler;
+    private final PaymentBusEventHandler paymentBusEventHandler;
     private final PaymentTagHandler tagHandler;
     private final PersistentBus eventBus;
     private final PaymentApi api;
@@ -48,13 +48,13 @@ public class DefaultPaymentService implements PaymentService {
     private final Janitor janitor;
 
     @Inject
-    public DefaultPaymentService(final InvoiceHandler invoiceHandler,
+    public DefaultPaymentService(final PaymentBusEventHandler paymentBusEventHandler,
                                  final PaymentTagHandler tagHandler,
                                  final PaymentApi api,
                                  final DefaultRetryService retryService,
                                  final PersistentBus eventBus,
                                  final Janitor janitor) {
-        this.invoiceHandler = invoiceHandler;
+        this.paymentBusEventHandler = paymentBusEventHandler;
         this.tagHandler = tagHandler;
         this.eventBus = eventBus;
         this.api = api;
@@ -70,12 +70,13 @@ public class DefaultPaymentService implements PaymentService {
     @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
     public void initialize() throws NotificationQueueAlreadyExists {
         try {
-            eventBus.register(invoiceHandler);
+            eventBus.register(paymentBusEventHandler);
             eventBus.register(tagHandler);
         } catch (final PersistentBus.EventBusException e) {
             log.error("Unable to register with the EventBus!", e);
         }
-        retryService.initialize(SERVICE_NAME);
+        retryService.initialize();
+        janitor.initialize();
     }
 
     @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
@@ -87,7 +88,7 @@ public class DefaultPaymentService implements PaymentService {
     @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
     public void stop() throws NoSuchNotificationQueue {
         try {
-            eventBus.unregister(invoiceHandler);
+            eventBus.unregister(paymentBusEventHandler);
             eventBus.unregister(tagHandler);
         } catch (final PersistentBus.EventBusException e) {
             throw new RuntimeException("Unable to unregister to the EventBus!", e);
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
index 8053d33..a602405 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
@@ -36,7 +36,7 @@ import org.killbill.billing.payment.api.DefaultPaymentGatewayApi;
 import org.killbill.billing.payment.api.PaymentApi;
 import org.killbill.billing.payment.api.PaymentGatewayApi;
 import org.killbill.billing.payment.api.PaymentService;
-import org.killbill.billing.payment.bus.InvoiceHandler;
+import org.killbill.billing.payment.bus.PaymentBusEventHandler;
 import org.killbill.billing.payment.core.PaymentGatewayProcessor;
 import org.killbill.billing.payment.core.PaymentMethodProcessor;
 import org.killbill.billing.payment.core.PaymentProcessor;
@@ -182,7 +182,7 @@ public class PaymentModule extends KillBillModule {
         bind(PaymentApi.class).to(DefaultPaymentApi.class).asEagerSingleton();
         bind(PaymentGatewayApi.class).to(DefaultPaymentGatewayApi.class).asEagerSingleton();
         bind(AdminPaymentApi.class).to(DefaultAdminPaymentApi.class).asEagerSingleton();
-        bind(InvoiceHandler.class).asEagerSingleton();
+        bind(PaymentBusEventHandler.class).asEagerSingleton();
         bind(PaymentTagHandler.class).asEagerSingleton();
         bind(PaymentService.class).to(DefaultPaymentService.class).asEagerSingleton();
         installPaymentProviderPlugins(paymentConfig);
diff --git a/payment/src/main/java/org/killbill/billing/payment/invoice/PaymentTagHandler.java b/payment/src/main/java/org/killbill/billing/payment/invoice/PaymentTagHandler.java
index 792ae1c..3c1f13c 100644
--- a/payment/src/main/java/org/killbill/billing/payment/invoice/PaymentTagHandler.java
+++ b/payment/src/main/java/org/killbill/billing/payment/invoice/PaymentTagHandler.java
@@ -35,6 +35,7 @@ import org.killbill.clock.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 
@@ -63,6 +64,8 @@ public class PaymentTagHandler {
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
+
+    @AllowConcurrentEvents
     @Subscribe
     public void process_AUTO_PAY_OFF_removal(final ControlTagDeletionInternalEvent event) {
 
diff --git a/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java
index 7370f39..a5ec4ce 100644
--- a/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java
@@ -44,10 +44,10 @@ import com.google.inject.Inject;
 public abstract class BaseRetryService implements RetryService {
 
     private static final Logger log = LoggerFactory.getLogger(BaseRetryService.class);
-    private static final String PAYMENT_RETRY_SERVICE = "PaymentRetryService";
 
     private final NotificationQueueService notificationQueueService;
     private final InternalCallContextFactory internalCallContextFactory;
+    private final String paymentRetryService;
 
     private NotificationQueue retryQueue;
 
@@ -55,11 +55,12 @@ public abstract class BaseRetryService implements RetryService {
                             final InternalCallContextFactory internalCallContextFactory) {
         this.notificationQueueService = notificationQueueService;
         this.internalCallContextFactory = internalCallContextFactory;
+        this.paymentRetryService = DefaultPaymentService.SERVICE_NAME + "-" + getQueueName();
     }
 
     @Override
-    public void initialize(final String svcName) throws NotificationQueueAlreadyExists {
-        retryQueue = notificationQueueService.createNotificationQueue(svcName,
+    public void initialize() throws NotificationQueueAlreadyExists {
+        retryQueue = notificationQueueService.createNotificationQueue(DefaultPaymentService.SERVICE_NAME,
                                                                       getQueueName(),
                                                                       new NotificationQueueHandler() {
                                                                           @Override
@@ -69,7 +70,7 @@ public abstract class BaseRetryService implements RetryService {
                                                                                   return;
                                                                               }
                                                                               final PaymentRetryNotificationKey key = (PaymentRetryNotificationKey) notificationKey;
-                                                                              final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
+                                                                              final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, paymentRetryService, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
                                                                               retryPaymentTransaction(key.getAttemptId(), key.getPaymentControlPluginNames(), callContext);
                                                                           }
                                                                       }
@@ -132,7 +133,8 @@ public abstract class BaseRetryService implements RetryService {
         }
 
         protected InternalCallContext createCallContextFromPaymentId(final ObjectType objectType, final UUID objectId, final Long tenantRecordId) {
-            return internalCallContextFactory.createInternalCallContext(objectId, objectType, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, null, tenantRecordId);
+            final String paymentRetryService = DefaultPaymentService.SERVICE_NAME + "-" + getQueueName();
+            return internalCallContextFactory.createInternalCallContext(objectId, objectType, paymentRetryService, CallOrigin.INTERNAL, UserType.SYSTEM, null, tenantRecordId);
         }
 
         public abstract String getQueueName();
diff --git a/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java b/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java
index ea320f1..03bdeee 100644
--- a/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java
@@ -28,7 +28,7 @@ import org.killbill.notificationq.api.NotificationQueueService.NotificationQueue
 
 public interface RetryService {
 
-    public void initialize(final String svcName) throws NotificationQueueAlreadyExists;
+    public void initialize() throws NotificationQueueAlreadyExists;
 
     public void start();
 
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
index c1f77a9..83860cc 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
@@ -73,6 +73,17 @@ where state_name = :stateName
 and created_date \< :createdBeforeDate
 <andCheckSoftDeletionWithComma("")>
 <defaultOrderBy()>
+limit :offset, :rowCount
+;
+>>
+
+getCountByStateNameAcrossTenants() ::= <<
+select
+count(1) as count
+from <tableName()>
+where state_name = :stateName
+and created_date \< :createdBeforeDate
+<andCheckSoftDeletionWithComma("")>
 ;
 >>
 
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
index 1d54780..7d36552 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
@@ -90,7 +90,18 @@ created_date >= :createdAfterDate
 and created_date \< :createdBeforeDate
 and transaction_status in (<statuses: {status | :status_<i0>}; separator="," >)
 <defaultOrderBy()>
-limit :limit
+limit :offset, :rowCount
+;
+>>
+
+getCountByTransactionStatusPriorDateAcrossTenants(statuses) ::= <<
+select
+count(1) as count
+from <tableName()>
+where
+created_date >= :createdAfterDate
+and created_date \< :createdBeforeDate
+and transaction_status in (<statuses: {status | :status_<i0>}; separator="," >)
 ;
 >>
 
diff --git a/payment/src/main/resources/org/killbill/billing/payment/PaymentStates.xml b/payment/src/main/resources/org/killbill/billing/payment/PaymentStates.xml
index db9d165..e6f68d2 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/PaymentStates.xml
+++ b/payment/src/main/resources/org/killbill/billing/payment/PaymentStates.xml
@@ -184,6 +184,7 @@
         <stateMachine name="REFUND">
             <states>
                 <state name="REFUND_INIT"/>
+                <state name="REFUND_PENDING"/>
                 <state name="REFUND_SUCCESS"/>
                 <state name="REFUND_FAILED"/>
                 <state name="REFUND_ERRORED"/>
@@ -204,6 +205,30 @@
                 <transition>
                     <initialState>REFUND_INIT</initialState>
                     <operation>OP_REFUND</operation>
+                    <operationResult>PENDING</operationResult>
+                    <finalState>REFUND_PENDING</finalState>
+                </transition>
+                <transition>
+                    <initialState>REFUND_PENDING</initialState>
+                    <operation>OP_REFUND</operation>
+                    <operationResult>SUCCESS</operationResult>
+                    <finalState>REFUND_SUCCESS</finalState>
+                </transition>
+                <transition>
+                    <initialState>REFUND_PENDING</initialState>
+                    <operation>OP_REFUND</operation>
+                    <operationResult>FAILURE</operationResult>
+                    <finalState>REFUND_FAILED</finalState>
+                </transition>
+                <transition>
+                    <initialState>REFUND_PENDING</initialState>
+                    <operation>OP_REFUND</operation>
+                    <operationResult>EXCEPTION</operationResult>
+                    <finalState>REFUND_ERRORED</finalState>
+                </transition>
+                <transition>
+                    <initialState>REFUND_INIT</initialState>
+                    <operation>OP_REFUND</operation>
                     <operationResult>EXCEPTION</operationResult>
                     <finalState>REFUND_ERRORED</finalState>
                 </transition>
@@ -216,6 +241,7 @@
         <stateMachine name="CREDIT">
             <states>
                 <state name="CREDIT_INIT"/>
+                <state name="CREDIT_PENDING"/>
                 <state name="CREDIT_SUCCESS"/>
                 <state name="CREDIT_FAILED"/>
                 <state name="CREDIT_ERRORED"/>
@@ -236,6 +262,30 @@
                 <transition>
                     <initialState>CREDIT_INIT</initialState>
                     <operation>OP_CREDIT</operation>
+                    <operationResult>PENDING</operationResult>
+                    <finalState>CREDIT_PENDING</finalState>
+                </transition>
+                <transition>
+                    <initialState>CREDIT_PENDING</initialState>
+                    <operation>OP_CREDIT</operation>
+                    <operationResult>SUCCESS</operationResult>
+                    <finalState>CREDIT_SUCCESS</finalState>
+                </transition>
+                <transition>
+                    <initialState>CREDIT_PENDING</initialState>
+                    <operation>OP_CREDIT</operation>
+                    <operationResult>FAILURE</operationResult>
+                    <finalState>CREDIT_FAILED</finalState>
+                </transition>
+                <transition>
+                    <initialState>CREDIT_PENDING</initialState>
+                    <operation>OP_CREDIT</operation>
+                    <operationResult>EXCEPTION</operationResult>
+                    <finalState>CREDIT_ERRORED</finalState>
+                </transition>
+                <transition>
+                    <initialState>CREDIT_INIT</initialState>
+                    <operation>OP_CREDIT</operation>
                     <operationResult>EXCEPTION</operationResult>
                     <finalState>CREDIT_ERRORED</finalState>
                 </transition>
diff --git a/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java b/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java
index 809199f..257c157 100644
--- a/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java
+++ b/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java
@@ -34,7 +34,7 @@ public class TestEventJson extends PaymentTestSuiteNoDB {
 
     @Test(groups = "fast")
     public void testPaymentErrorEvent() throws Exception {
-        final PaymentErrorInternalEvent e = new DefaultPaymentErrorEvent(UUID.randomUUID(), UUID.randomUUID(), TransactionType.PURCHASE, "no message", 1L, 2L, UUID.randomUUID());
+        final PaymentErrorInternalEvent e = new DefaultPaymentErrorEvent(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), BigDecimal.ONE, Currency.USD, TransactionStatus.SUCCESS, TransactionType.PURCHASE, new DateTime(), "no message", 1L, 2L, UUID.randomUUID());
         final String json = mapper.writeValueAsString(e);
 
         final Class<?> claz = Class.forName(DefaultPaymentErrorEvent.class.getName());
@@ -44,7 +44,7 @@ public class TestEventJson extends PaymentTestSuiteNoDB {
 
     @Test(groups = "fast")
     public void testPaymentInfoEvent() throws Exception {
-        final PaymentInfoInternalEvent e = new DefaultPaymentInfoEvent(UUID.randomUUID(), UUID.randomUUID(), new BigDecimal(12.9), Currency.EUR, TransactionStatus.SUCCESS,
+        final PaymentInfoInternalEvent e = new DefaultPaymentInfoEvent(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), new BigDecimal(12.9), Currency.EUR, TransactionStatus.SUCCESS,
                                                                        TransactionType.PURCHASE, new DateTime(), 1L, 2L, UUID.randomUUID());
         final String json = mapper.writeValueAsString(e);
 
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTask.java b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTask.java
new file mode 100644
index 0000000..ec7a4ee
--- /dev/null
+++ b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTask.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core.janitor;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.payment.PaymentTestSuiteNoDB;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.inject.Inject;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+public class TestIncompletePaymentTransactionTask extends PaymentTestSuiteNoDB {
+
+    @Inject
+    protected IncompletePaymentTransactionTask incompletePaymentTransactionTask;
+
+    @Test(groups = "fast")
+    public void testGetNextNotificationTime() {
+        assertNull(incompletePaymentTransactionTask.getNextNotificationTime(null));
+        final DateTime initTime = clock.getUTCNow();
+
+        // Based on config "15s,1m,3m,1h,1d,1d,1d,1d,1d"
+        for (int i = 1; i < 10; i++) {
+            final DateTime nextTime = incompletePaymentTransactionTask.getNextNotificationTime(i);
+            assertNotNull(nextTime);
+            assertTrue(nextTime.compareTo(initTime) > 0);
+            if (i == 0) {
+                assertTrue(nextTime.compareTo(initTime.plusSeconds(3).plusSeconds(1)) < 0);
+            } else if (i == 1) {
+                assertTrue(nextTime.compareTo(initTime.plusMinutes(1).plusSeconds(1)) < 0);
+            } else if (i == 2) {
+                assertTrue(nextTime.compareTo(initTime.plusMinutes(3).plusSeconds(1)) < 0);
+            } else if (i == 3) {
+                assertTrue(nextTime.compareTo(initTime.plusHours(1).plusSeconds(1)) < 0);
+            } else if (i == 4) {
+                assertTrue(nextTime.compareTo(initTime.plusDays(1).plusSeconds(1)) < 0);
+            } else if (i == 5) {
+                assertTrue(nextTime.compareTo(initTime.plusDays(1).plusSeconds(1)) < 0);
+            } else if (i == 6) {
+                assertTrue(nextTime.compareTo(initTime.plusDays(1).plusSeconds(1)) < 0);
+            } else if (i == 7) {
+                assertTrue(nextTime.compareTo(initTime.plusDays(1).plusSeconds(1)) < 0);
+            } else if (i == 8) {
+                assertTrue(nextTime.compareTo(initTime.plusDays(1).plusSeconds(1)) < 0);
+            }
+        }
+        assertNull(incompletePaymentTransactionTask.getNextNotificationTime(10));
+    }
+}
\ No newline at end of file
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
index 21aee20..c761832 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/MockPaymentDao.java
@@ -36,6 +36,7 @@ import org.killbill.billing.catalog.api.Currency;
 import org.killbill.billing.dao.MockNonEntityDao;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.util.entity.DefaultPagination;
 import org.killbill.billing.util.entity.Pagination;
 
 import com.google.common.base.Predicate;
@@ -64,8 +65,8 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, int limit) {
-        return ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
+    public Pagination<PaymentTransactionModelDao> getByTransactionStatusAcrossTenants(final Iterable<TransactionStatus> transactionStatuses, DateTime createdBeforeDate, DateTime createdAfterDate, Long offset, Long limit) {
+        final List<PaymentTransactionModelDao> result=  ImmutableList.copyOf(Iterables.filter(transactions.values(), new Predicate<PaymentTransactionModelDao>() {
             @Override
             public boolean apply(final PaymentTransactionModelDao input) {
                 return Iterables.any(transactionStatuses, new Predicate<TransactionStatus>() {
@@ -76,6 +77,7 @@ public class MockPaymentDao implements PaymentDao {
                 });
             }
         }));
+        return new DefaultPagination<PaymentTransactionModelDao>(new Long(result.size()), result.iterator());
     }
 
     @Override
@@ -108,7 +110,7 @@ public class MockPaymentDao implements PaymentDao {
     }
 
     @Override
-    public List<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate) {
+    public Pagination<PaymentAttemptModelDao> getPaymentAttemptsByStateAcrossTenants(final String stateName, final DateTime createdBeforeDate, final Long offset, final Long limit) {
         return null;
     }
 
diff --git a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
index 358cf2b..9afbb45 100644
--- a/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
+++ b/payment/src/test/java/org/killbill/billing/payment/dao/TestPaymentDao.java
@@ -18,6 +18,7 @@ package org.killbill.billing.payment.dao;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
@@ -32,6 +33,7 @@ import org.killbill.billing.payment.dao.PluginPropertySerializer.PluginPropertyS
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.billing.util.entity.Pagination;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -298,7 +300,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         final List<PaymentTransactionModelDao> result = getPendingTransactions(paymentModelDao.getId());
         Assert.assertEquals(result.size(), 3);
 
-        final List<PaymentTransactionModelDao> transactions1 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), newTime, initialTime, 3);
+        final Iterable<PaymentTransactionModelDao> transactions1 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), newTime, initialTime, 0L, 3L);
         for (PaymentTransactionModelDao paymentTransaction : transactions1) {
             final String newPaymentState = "XXX_FAILED";
             paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
@@ -316,7 +318,7 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         }
         ;
 
-        final List<PaymentTransactionModelDao> transactions2 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), clock.getUTCNow(), initialTime, 1);
+        final Iterable<PaymentTransactionModelDao> transactions2 = paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.PENDING), clock.getUTCNow(), initialTime, 0L, 1L);
         for (PaymentTransactionModelDao paymentTransaction : transactions2) {
             final String newPaymentState = "XXX_FAILED";
             paymentDao.updatePaymentAndTransactionOnCompletion(payment.getAccountId(), payment.getId(), paymentTransaction.getTransactionType(), newPaymentState, payment.getLastSuccessStateName(),
@@ -467,7 +469,46 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         assertEquals(result.size(), 2);
     }
 
+
     @Test(groups = "slow")
+    public void testPaginationForPaymentByStatesAcrossTenants() {
+        // Right before createdAfterDate, so should not be returned
+        final DateTime createdDate1 = clock.getUTCNow().minusHours(1);
+
+        final int NB_ENTRIES = 30;
+        for (int i = 0; i < NB_ENTRIES; i++) {
+            final PaymentModelDao paymentModelDao1 = new PaymentModelDao(createdDate1, createdDate1, UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID().toString());
+            final PaymentTransactionModelDao transaction1 = new PaymentTransactionModelDao(createdDate1, createdDate1, null, UUID.randomUUID().toString(),
+                                                                                           paymentModelDao1.getId(), TransactionType.AUTHORIZE, createdDate1,
+                                                                                           TransactionStatus.UNKNOWN, BigDecimal.TEN, Currency.AED,
+                                                                                           "unknown", "");
+
+            final InternalCallContext context1 = new InternalCallContext(1L,
+                                                                         1L,
+                                                                         internalCallContext.getUserToken(),
+                                                                         internalCallContext.getCreatedBy(),
+                                                                         internalCallContext.getCallOrigin(),
+                                                                         internalCallContext.getContextUserType(),
+                                                                         internalCallContext.getReasonCode(),
+                                                                         internalCallContext.getComments(),
+                                                                         createdDate1,
+                                                                         createdDate1);
+            paymentDao.insertPaymentWithFirstTransaction(paymentModelDao1, transaction1, context1);
+        }
+
+        final Pagination<PaymentTransactionModelDao> result =  paymentDao.getByTransactionStatusAcrossTenants(ImmutableList.of(TransactionStatus.UNKNOWN), clock.getUTCNow(), createdDate1, 0L, new Long(NB_ENTRIES));
+        Assert.assertEquals(result.getTotalNbRecords(), new Long(NB_ENTRIES));
+
+        final Iterator<PaymentTransactionModelDao> iterator = result.iterator();
+        for (int i = 0; i < NB_ENTRIES; i++) {
+            System.out.println("i = " + i);
+            Assert.assertTrue(iterator.hasNext());
+            final PaymentTransactionModelDao nextEntry = iterator.next();
+            Assert.assertEquals(nextEntry.getTransactionStatus(), TransactionStatus.UNKNOWN);
+        }
+    }
+
+        @Test(groups = "slow")
     public void testPaymentAttemptsByStateAcrossTenants() {
 
         final UUID paymentMethodId = UUID.randomUUID();
@@ -519,8 +560,8 @@ public class TestPaymentDao extends PaymentTestSuiteWithEmbeddedDB {
         paymentDao.insertPaymentAttemptWithProperties(attempt2, context2);
 
 
-        final List<PaymentAttemptModelDao> result = paymentDao.getPaymentAttemptsByStateAcrossTenants(stateName, createdBeforeDate);
-        Assert.assertEquals(result.size(), 2);
+        final Pagination<PaymentAttemptModelDao> result = paymentDao.getPaymentAttemptsByStateAcrossTenants(stateName, createdBeforeDate, 0L, 2L);
+        Assert.assertEquals(result.getTotalNbRecords().longValue(), 2L);
     }
 
 
diff --git a/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java b/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java
index 7ad8a57..b7ba850 100644
--- a/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java
@@ -20,6 +20,7 @@ package org.killbill.billing.payment.glue;
 
 import org.killbill.billing.GuicyKillbillTestWithEmbeddedDBModule;
 import org.killbill.billing.account.glue.DefaultAccountModule;
+import org.killbill.billing.api.TestApiListener;
 import org.killbill.billing.platform.api.KillbillConfigSource;
 import org.killbill.billing.util.glue.NonEntityDaoModule;
 import org.killbill.clock.Clock;
@@ -35,7 +36,7 @@ public class TestPaymentModuleWithEmbeddedDB extends TestPaymentModule {
         install(new GuicyKillbillTestWithEmbeddedDBModule(configSource));
         install(new NonEntityDaoModule(configSource));
         install(new DefaultAccountModule(configSource));
-
+        bind(TestApiListener.class).asEagerSingleton();
         super.configure();
     }
 }
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
index bc68a36..cca3b59 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -23,13 +23,19 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
 
 import org.joda.time.LocalDate;
 import org.killbill.billing.account.api.Account;
+import org.killbill.billing.api.TestApiListener;
+import org.killbill.billing.api.TestApiListener.NextEvent;
+import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.catalog.api.Currency;
 import org.killbill.billing.invoice.api.Invoice;
 import org.killbill.billing.invoice.api.InvoiceApiException;
 import org.killbill.billing.invoice.api.InvoiceItem;
+import org.killbill.billing.payment.api.DefaultPayment;
 import org.killbill.billing.payment.api.Payment;
 import org.killbill.billing.payment.api.PaymentApiException;
 import org.killbill.billing.payment.api.PaymentOptions;
@@ -37,13 +43,21 @@ import org.killbill.billing.payment.api.PaymentTransaction;
 import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.payment.bus.PaymentBusEventHandler;
 import org.killbill.billing.payment.core.janitor.Janitor;
 import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
 import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
+import org.killbill.billing.payment.glue.DefaultPaymentService;
 import org.killbill.billing.payment.invoice.InvoicePaymentRoutingPluginApi;
 import org.killbill.billing.payment.provider.MockPaymentProviderPlugin;
 import org.killbill.billing.platform.api.KillbillConfigSource;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.bus.api.PersistentBus.EventBusException;
+import org.killbill.commons.profiling.Profiling;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.testng.Assert;
@@ -53,11 +67,16 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 
+import static com.jayway.awaitility.Awaitility.await;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
 
 public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
 
@@ -75,6 +94,14 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
 
     @Inject
     private Janitor janitor;
+    @Inject
+    private PaymentBusEventHandler handler;
+    @Inject
+    protected TestApiListener testListener;
+    @Inject
+    protected InternalCallContextFactory internalCallContextFactory;
+    @Inject
+    protected NotificationQueueService notificationQueueService;
 
     private MockPaymentProviderPlugin mockPaymentProviderPlugin;
 
@@ -93,6 +120,7 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
     protected void beforeClass() throws Exception {
         super.beforeClass();
         mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
+        janitor.initialize();
         janitor.start();
     }
 
@@ -104,12 +132,17 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
     @BeforeMethod(groups = "slow")
     public void beforeMethod() throws Exception {
         super.beforeMethod();
+        eventBus.register(handler);
+        testListener.reset();
+        eventBus.register(testListener);
         mockPaymentProviderPlugin.clear();
         account = testHelper.createTestAccount("bobo@gmail.com", true);
     }
 
     @AfterMethod(groups = "slow")
     public void afterMethod() throws Exception {
+        eventBus.unregister(handler);
+        eventBus.unregister(testListener);
         super.afterMethod();
     }
 
@@ -233,20 +266,23 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
         final String paymentExternalKey = "qwru";
         final String transactionExternalKey = "lkjdsf";
 
+        testListener.pushExpectedEvent(NextEvent.PAYMENT);
         final Payment payment = paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
                                                                transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
+        testListener.assertListenerStatus();
 
         // Artificially move the transaction status to UNKNOWN
+        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
         final String paymentStateName = paymentSMHelper.getErroredStateForTransaction(TransactionType.AUTHORIZE).toString();
+        testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
         paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
                                                            payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
                                                            "foo", "bar", internalCallContext);
-        // The UnknownPaymentTransactionTask will look for UNKNOWN payment that *just happened* , and that are not too old (less than 7 days)
-        clock.addDays(1);
-        try {
-            Thread.sleep(1500);
-        } catch (InterruptedException e) {
-        }
+        testListener.assertListenerStatus();
+
+        // Move clock for notification to be processed
+        clock.addDeltaFromReality(5 * 60 * 1000);
+        assertNotificationsCompleted(internalCallContext, 5);
 
         final Payment updatedPayment = paymentApi.getPayment(payment.getId(), false, ImmutableList.<PluginProperty>of(), callContext);
         assertEquals(updatedPayment.getTransactions().get(0).getTransactionStatus(), TransactionStatus.SUCCESS);
@@ -261,23 +297,26 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
 
         // Make sure the state as seen by the plugin will be in PaymentPluginStatus.ERROR, which will be returned later to Janitor
         mockPaymentProviderPlugin.makeNextPaymentFailWithError();
+        testListener.pushExpectedEvent(NextEvent.PAYMENT_ERROR);
         final Payment payment = paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
                                                                transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
+        testListener.assertListenerStatus();
 
         // Artificially move the transaction status to UNKNOWN
+        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
         final String paymentStateName = paymentSMHelper.getErroredStateForTransaction(TransactionType.AUTHORIZE).toString();
+        testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
         paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
                                                            payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
                                                            "foo", "bar", internalCallContext);
+        testListener.assertListenerStatus();
 
         final List<PaymentTransactionModelDao> paymentTransactionHistoryBeforeJanitor = getPaymentTransactionHistory(transactionExternalKey);
         Assert.assertEquals(paymentTransactionHistoryBeforeJanitor.size(), 3);
 
-        clock.addDays(1);
-        try {
-            Thread.sleep(1500);
-        } catch (InterruptedException e) {
-        }
+        // Move clock for notification to be processed
+        clock.addDeltaFromReality(5 * 60 * 1000);
+        assertNotificationsCompleted(internalCallContext, 5);
 
         // Proves the Janitor ran (and updated the transaction)
         final List<PaymentTransactionModelDao> paymentTransactionHistoryAfterJanitor = getPaymentTransactionHistory(transactionExternalKey);
@@ -299,26 +338,33 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
         // Make sure the state as seen by the plugin will be in PaymentPluginStatus.ERROR, which will be returned later to Janitor
         mockPaymentProviderPlugin.makeNextPaymentFailWithException();
         try {
+            testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
             paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
                                            transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
         } catch (PaymentApiException ignore) {
+            testListener.assertListenerStatus();
         }
         final Payment payment = paymentApi.getPaymentByExternalKey(paymentExternalKey, false, ImmutableList.<PluginProperty>of(), callContext);
 
         // Artificially move the transaction status to UNKNOWN
+        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
+
         final String paymentStateName = paymentSMHelper.getErroredStateForTransaction(TransactionType.AUTHORIZE).toString();
+        testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
         paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
                                                            payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
                                                            "foo", "bar", internalCallContext);
+        testListener.assertListenerStatus();
+
+        // Move clock for notification to be processed
+        clock.addDeltaFromReality(5 * 60 * 1000);
+        // NO because we will keep retrying as we can't fix it...
+        //assertNotificationsCompleted(internalCallContext, 5);
 
         final List<PaymentTransactionModelDao> paymentTransactionHistoryBeforeJanitor = getPaymentTransactionHistory(transactionExternalKey);
         Assert.assertEquals(paymentTransactionHistoryBeforeJanitor.size(), 3);
 
-        clock.addDays(1);
-        try {
-            Thread.sleep(1500);
-        } catch (InterruptedException e) {
-        }
+
 
         // Nothing new happened
         final List<PaymentTransactionModelDao> paymentTransactionHistoryAfterJanitor = getPaymentTransactionHistory(transactionExternalKey);
@@ -326,31 +372,36 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
     }
 
     @Test(groups = "slow")
-    public void testPendingEntries() throws PaymentApiException, EventBusException {
+    public void testPendingEntries() throws PaymentApiException, EventBusException, NoSuchNotificationQueue {
 
         final BigDecimal requestedAmount = BigDecimal.TEN;
         final String paymentExternalKey = "jhj44";
         final String transactionExternalKey = "4jhjj2";
 
+        testListener.pushExpectedEvent(NextEvent.PAYMENT);
         final Payment payment = paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
                                                                transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
+        testListener.assertListenerStatus();
+
+        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
 
         // Artificially move the transaction status to PENDING
         final String paymentStateName = paymentSMHelper.getPendingStateForTransaction(TransactionType.AUTHORIZE).toString();
+        testListener.pushExpectedEvent(NextEvent.PAYMENT);
         paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
                                                            payment.getTransactions().get(0).getId(), TransactionStatus.PENDING, requestedAmount, account.getCurrency(),
                                                            "loup", "chat", internalCallContext);
-        clock.addDays(1);
-        try {
-            Thread.sleep(1500);
-        } catch (InterruptedException e) {
-        }
+        testListener.assertListenerStatus();
+
+        // Move clock for notification to be processed
+        clock.addDeltaFromReality(5 * 60 * 1000);
 
+        assertNotificationsCompleted(internalCallContext, 5);
         final Payment updatedPayment = paymentApi.getPayment(payment.getId(), false, ImmutableList.<PluginProperty>of(), callContext);
         Assert.assertEquals(updatedPayment.getTransactions().get(0).getTransactionStatus(), TransactionStatus.SUCCESS);
-
     }
 
+
     private List<PluginProperty> createPropertiesForInvoice(final Invoice invoice) {
         final List<PluginProperty> result = new ArrayList<PluginProperty>();
         result.add(new PluginProperty(InvoicePaymentRoutingPluginApi.PROP_IPCD_INVOICE_ID, invoice.getId().toString(), false));
@@ -385,5 +436,19 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
             }
         });
     }
+
+    private void assertNotificationsCompleted(final InternalCallContext internalCallContext, final long timeoutSec) {
+        try {
+            await().atMost(timeoutSec, SECONDS).until(new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    final List<NotificationEventWithMetadata<NotificationEvent>> notifications = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+                    return notifications.isEmpty();
+                }
+            });
+        } catch (final Exception e) {
+            fail("Test failed ", e);
+        }
+    }
 }
 
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java b/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
index 2a1ae78..dcf9a9a 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
@@ -70,7 +70,7 @@ public class TestRetryService extends PaymentTestSuiteNoDB {
         ((MockPaymentDao) paymentDao).reset();
         mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
         mockPaymentProviderPlugin.clear();
-        retryService.initialize(DefaultPaymentService.SERVICE_NAME);
+        retryService.initialize();
         retryService.start();
 
     }
diff --git a/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java
index dfa6f2c..d821177 100644
--- a/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java
+++ b/profiles/killbill/src/main/java/org/killbill/billing/server/notifications/PushNotificationListener.java
@@ -44,6 +44,7 @@ import com.ning.http.client.Response;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 
 public class PushNotificationListener {
@@ -70,6 +71,7 @@ public class PushNotificationListener {
         this.mapper = mapper;
     }
 
+    @AllowConcurrentEvents
     @Subscribe
     public void triggerPushNotifications(final ExtBusEvent event) {
         final TenantContext context = contextFactory.createTenantContext(event.getTenantId());
diff --git a/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java b/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
index e0fb8bf..7b630c6 100644
--- a/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
+++ b/util/src/main/java/org/killbill/billing/util/config/PaymentConfig.java
@@ -66,16 +66,10 @@ public interface PaymentConfig extends KillbillConfig {
     @Description("Delay before which unresolved attempt should be retried")
     public TimeSpan getIncompleteAttemptsTimeSpanDelay();
 
-    @Config("org.killbill.payment.janitor.transactions.delay")
-    @Default("3m")
+    @Config("org.killbill.payment.janitor.transactions.retries")
+    @Default("15s,1m,3m,1h,1d,1d,1d,1d,1d")
     @Description("Delay before which unresolved transactions should be retried")
-    public TimeSpan getIncompleteTransactionsTimeSpanDelay();
-
-
-    @Config("org.killbill.payment.janitor.transactions.giveup")
-    @Default("7d")
-    @Description("Delay after which unresolved transactions should be abandoned")
-    public TimeSpan getIncompleteTransactionsTimeSpanGiveup();
+    public List<TimeSpan> getIncompleteTransactionsRetries();
 
     @Config("org.killbill.payment.janitor.rate")
     @Default("1h")
diff --git a/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java b/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java
index 48c23f1..1f9a9fd 100644
--- a/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java
+++ b/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java
@@ -18,6 +18,8 @@ package org.killbill.billing.util.entity.dao;
 
 import java.util.Iterator;
 
+import javax.annotation.Nullable;
+
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.util.entity.DefaultPagination;
 import org.killbill.billing.util.entity.Entity;
@@ -35,7 +37,7 @@ public class DefaultPaginationSqlDaoHelper {
                                                                                                                      final PaginationIteratorBuilder<M, E, S> paginationIteratorBuilder,
                                                                                                                      final Long offset,
                                                                                                                      final Long limit,
-                                                                                                                     final InternalTenantContext context) {
+                                                                                                                     @Nullable final InternalTenantContext context) {
         // Note: the connection will be busy as we stream the results out: hence we cannot use
         // SQL_CALC_FOUND_ROWS / FOUND_ROWS on the actual query.
         // We still need to know the actual number of results, mainly for the UI so that it knows if it needs to fetch
@@ -51,7 +53,7 @@ public class DefaultPaginationSqlDaoHelper {
         // We usually always want to wrap our queries in an EntitySqlDaoTransactionWrapper... except here.
         // Since we want to stream the results out, we don't want to auto-commit when this method returns.
         final EntitySqlDao<M, E> sqlDao = transactionalSqlDao.onDemandForStreamingResults(sqlDaoClazz);
-        final Long totalCount = sqlDao.getCount(context);
+        final Long totalCount = context !=  null ? sqlDao.getCount(context) : null;
         final Iterator<M> results = paginationIteratorBuilder.build((S) sqlDao, limit, context);
 
         return new DefaultPagination<M>(offset, limit, count, totalCount, results);