killbill-aplcache

Attempt to fix #343 and #342 (re-opened) Code is in place and

7/10/2015 1:59:45 AM

Changes

Details

diff --git a/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java
index 3b5889f..9ef4716 100644
--- a/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/PaymentErrorInternalEvent.java
@@ -17,9 +17,10 @@ package org.killbill.billing.events;
 
 import java.util.UUID;
 
+import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
 
-public interface PaymentErrorInternalEvent extends BusInternalEvent {
+public interface PaymentErrorInternalEvent extends PaymentInternalEvent {
 
     public String getMessage();
 
@@ -27,5 +28,9 @@ public interface PaymentErrorInternalEvent extends BusInternalEvent {
 
     public UUID getPaymentId();
 
+    public UUID getPaymentTransactionId();
+
+    public TransactionStatus getStatus();
+
     public TransactionType getTransactionType();
 }
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java
index f1265b9..4669d1d 100644
--- a/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/PaymentInfoInternalEvent.java
@@ -24,11 +24,7 @@ import org.killbill.billing.catalog.api.Currency;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
 
-public interface PaymentInfoInternalEvent extends BusInternalEvent {
-
-    public UUID getPaymentId();
-
-    public UUID getAccountId();
+public interface PaymentInfoInternalEvent extends PaymentInternalEvent {
 
     public BigDecimal getAmount();
 
@@ -36,7 +32,4 @@ public interface PaymentInfoInternalEvent extends BusInternalEvent {
 
     public DateTime getEffectiveDate();
 
-    public TransactionStatus getStatus();
-
-    public TransactionType getTransactionType();
 }
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentInternalEvent.java
new file mode 100644
index 0000000..6a442de
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/events/PaymentInternalEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.events;
+
+import java.util.UUID;
+
+import org.killbill.billing.payment.api.TransactionStatus;
+import org.killbill.billing.payment.api.TransactionType;
+
+public interface PaymentInternalEvent extends BusInternalEvent {
+
+    public UUID getAccountId();
+
+    public UUID getPaymentId();
+
+    public UUID getPaymentTransactionId();
+
+    public TransactionType getTransactionType();
+
+    public TransactionStatus getStatus();
+
+}
diff --git a/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java b/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java
index a0198cc..be1a80b 100644
--- a/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java
+++ b/api/src/main/java/org/killbill/billing/events/PaymentPluginErrorInternalEvent.java
@@ -13,19 +13,9 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
-package org.killbill.billing.events;
-
-import java.util.UUID;
 
-import org.killbill.billing.payment.api.TransactionType;
-
-public interface PaymentPluginErrorInternalEvent extends BusInternalEvent {
+package org.killbill.billing.events;
 
+public interface PaymentPluginErrorInternalEvent extends PaymentInternalEvent {
     public String getMessage();
-
-    public UUID getAccountId();
-
-    public UUID getPaymentId();
-
-    public TransactionType getTransactionType();
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java
index d81ebad..030eb96 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentErrorEvent.java
@@ -30,11 +30,15 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
     private final String message;
     private final UUID accountId;
     private final UUID paymentId;
+    private final UUID paymentTransactionId;
+    private final TransactionStatus status;
     private final TransactionType transactionType;
 
     @JsonCreator
     public DefaultPaymentErrorEvent(@JsonProperty("accountId") final UUID accountId,
                                     @JsonProperty("paymentId") final UUID paymentId,
+                                    @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
+                                    @JsonProperty("status") final TransactionStatus status,
                                     @JsonProperty("transactionType")  final TransactionType transactionType,
                                     @JsonProperty("message") final String message,
                                     @JsonProperty("searchKey1") final Long searchKey1,
@@ -44,6 +48,8 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
         this.message = message;
         this.accountId = accountId;
         this.paymentId = paymentId;
+        this.paymentTransactionId = paymentTransactionId;
+        this.status = status;
         this.transactionType = transactionType;
     }
 
@@ -67,6 +73,16 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
         return transactionType;
     }
 
+    @Override
+    public UUID getPaymentTransactionId() {
+        return paymentTransactionId;
+    }
+
+    @Override
+    public TransactionStatus getStatus() {
+        return status;
+    }
+
     @JsonIgnore
     @Override
     public BusInternalEventType getBusEventType() {
@@ -79,6 +95,8 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
         sb.append("message='").append(message).append('\'');
         sb.append(", accountId=").append(accountId);
         sb.append(", paymentId=").append(paymentId);
+        sb.append(", paymentTransactionId=").append(paymentTransactionId);
+        sb.append(", status=").append(status);
         sb.append(", transactionType=").append(transactionType);
         sb.append('}');
         return sb.toString();
@@ -107,7 +125,12 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
         if (paymentId != null ? !paymentId.equals(that.paymentId) : that.paymentId != null) {
             return false;
         }
-
+        if (paymentTransactionId != null ? !paymentTransactionId.equals(that.paymentTransactionId) : that.paymentTransactionId != null) {
+            return false;
+        }
+        if (status != null ? !status.equals(that.status) : that.status != null) {
+            return false;
+        }
         return true;
     }
 
@@ -117,6 +140,8 @@ public class DefaultPaymentErrorEvent extends BusEventBase implements PaymentErr
         result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
         result = 31 * result + (transactionType != null ? transactionType.hashCode() : 0);
         result = 31 * result + (paymentId != null ? paymentId.hashCode() : 0);
+        result = 31 * result + (paymentTransactionId != null ? paymentTransactionId.hashCode() : 0);
+        result = 31 * result + (status != null ? status.hashCode() : 0);
         return result;
     }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java
index d3d73b2..9dd0eb5 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentInfoEvent.java
@@ -32,6 +32,7 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
 
     private final UUID accountId;
     private final UUID paymentId;
+    private final UUID paymentTransactionId;
     private final BigDecimal amount;
     private final Currency currency;
     private final TransactionStatus status;
@@ -41,6 +42,7 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
     @JsonCreator
     public DefaultPaymentInfoEvent(@JsonProperty("accountId") final UUID accountId,
                                    @JsonProperty("paymentId") final UUID paymentId,
+                                   @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
                                    @JsonProperty("amount") final BigDecimal amount,
                                    @JsonProperty("currency") final Currency currency,
                                    @JsonProperty("status") final TransactionStatus status,
@@ -54,6 +56,7 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
         super(searchKey1, searchKey2, userToken);
         this.accountId = accountId;
         this.paymentId = paymentId;
+        this.paymentTransactionId = paymentTransactionId;
         this.amount = amount;
         this.currency = currency;
         this.status = status;
@@ -63,6 +66,7 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
 
     public DefaultPaymentInfoEvent(final UUID accountId,
                                    final UUID paymentId,
+                                   final UUID paymentTransactionId,
                                    final BigDecimal amount,
                                    final Currency currency,
                                    final TransactionStatus status,
@@ -71,7 +75,7 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
                                    final Long searchKey1,
                                    final Long searchKey2,
                                    final UUID userToken) {
-        this(accountId, paymentId, amount, currency, status, transactionType, null, null,
+        this(accountId, paymentId, paymentTransactionId, amount, currency, status, transactionType, null, null,
              effectiveDate, searchKey1, searchKey2, userToken);
     }
 
@@ -108,6 +112,11 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
     }
 
     @Override
+    public UUID getPaymentTransactionId() {
+        return paymentTransactionId;
+    }
+
+    @Override
     public TransactionType getTransactionType() {
         return transactionType;
     }
@@ -123,6 +132,7 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
         sb.append("DefaultPaymentInfoEvent");
         sb.append("{accountId=").append(accountId);
         sb.append(", paymentId=").append(paymentId);
+        sb.append(", paymentTransactionId=").append(paymentTransactionId);
         sb.append(", amount=").append(amount);
         sb.append(", currency=").append(currency);
         sb.append(", status=").append(status);
@@ -144,6 +154,8 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
         result = prime * result
                  + ((paymentId == null) ? 0 : paymentId.hashCode());
         result = prime * result
+                 + ((paymentTransactionId == null) ? 0 : paymentTransactionId.hashCode());
+        result = prime * result
                  + ((currency == null) ? 0 : currency.hashCode());
         result = prime * result + ((status == null) ? 0 : status.hashCode());
         return result;
@@ -196,6 +208,13 @@ public class DefaultPaymentInfoEvent extends BusEventBase implements PaymentInfo
         } else if (!paymentId.equals(other.paymentId)) {
             return false;
         }
+        if (paymentTransactionId == null) {
+            if (other.paymentTransactionId != null) {
+                return false;
+            }
+        } else if (!paymentTransactionId.equals(other.paymentTransactionId)) {
+            return false;
+        }
         if (currency == null) {
             if (other.currency != null) {
                 return false;
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java
index 5ef1ac2..10c1fc3 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentPluginErrorEvent.java
@@ -29,11 +29,15 @@ public class DefaultPaymentPluginErrorEvent extends BusEventBase implements Paym
     private final String message;
     private final UUID accountId;
     private final UUID paymentId;
+    private final UUID paymentTransactionId;
+    private final TransactionStatus status;
     private final TransactionType transactionType;
 
     @JsonCreator
     public DefaultPaymentPluginErrorEvent(@JsonProperty("accountId") final UUID accountId,
                                           @JsonProperty("paymentId") final UUID paymentId,
+                                          @JsonProperty("paymentTransactionId") final UUID paymentTransactionId,
+                                          @JsonProperty("status") final TransactionStatus status,
                                           @JsonProperty("transactionType")  final TransactionType transactionType,
                                           @JsonProperty("message") final String message,
                                           @JsonProperty("searchKey1") final Long searchKey1,
@@ -43,6 +47,8 @@ public class DefaultPaymentPluginErrorEvent extends BusEventBase implements Paym
         this.message = message;
         this.accountId = accountId;
         this.paymentId = paymentId;
+        this.paymentTransactionId = paymentTransactionId;
+        this.status = status;
         this.transactionType = transactionType;
     }
 
@@ -66,6 +72,16 @@ public class DefaultPaymentPluginErrorEvent extends BusEventBase implements Paym
         return transactionType;
     }
 
+    @Override
+    public UUID getPaymentTransactionId() {
+        return paymentTransactionId;
+    }
+
+    @Override
+    public TransactionStatus getStatus() {
+        return status;
+    }
+
     @JsonIgnore
     @Override
     public BusInternalEventType getBusEventType() {
@@ -95,6 +111,12 @@ public class DefaultPaymentPluginErrorEvent extends BusEventBase implements Paym
         if (paymentId != null ? !paymentId.equals(that.paymentId) : that.paymentId != null) {
             return false;
         }
+        if (paymentTransactionId != null ? !paymentTransactionId.equals(that.paymentTransactionId) : that.paymentTransactionId != null) {
+            return false;
+        }
+        if (status != null ? !status.equals(that.status) : that.status != null) {
+            return false;
+        }
 
         return true;
     }
@@ -105,6 +127,8 @@ public class DefaultPaymentPluginErrorEvent extends BusEventBase implements Paym
         result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
         result = 31 * result + (transactionType != null ? transactionType.hashCode() : 0);
         result = 31 * result + (paymentId != null ? paymentId.hashCode() : 0);
+        result = 31 * result + (paymentTransactionId != null ? paymentTransactionId.hashCode() : 0);
+        result = 31 * result + (status != null ? status.hashCode() : 0);
         return result;
     }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
index 7df99f1..60281b4 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/CompletionTaskBase.java
@@ -17,6 +17,7 @@
 
 package org.killbill.billing.payment.core.janitor;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.killbill.billing.account.api.Account;
@@ -24,6 +25,7 @@ import org.killbill.billing.account.api.AccountApiException;
 import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.callcontext.DefaultCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.PaymentInternalEvent;
 import org.killbill.billing.osgi.api.OSGIServiceRegistration;
 import org.killbill.billing.payment.core.ProcessorBase;
 import org.killbill.billing.payment.core.sm.PaymentControlStateMachineHelper;
@@ -42,6 +44,7 @@ import org.killbill.clock.Clock;
 import org.killbill.commons.locker.GlobalLock;
 import org.killbill.commons.locker.GlobalLocker;
 import org.killbill.commons.locker.LockFailedException;
+import org.killbill.notificationq.api.NotificationQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +62,8 @@ abstract class CompletionTaskBase<T> implements Runnable {
     protected final OSGIServiceRegistration<PaymentPluginApi> pluginRegistry;
     protected final GlobalLocker locker;
 
+    protected NotificationQueue janitorQueue;
+
     private volatile boolean isStopped;
 
     public CompletionTaskBase(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
@@ -105,8 +110,14 @@ abstract class CompletionTaskBase<T> implements Runnable {
 
     public abstract void doIteration(final T item);
 
+    public abstract void processPaymentEvent(final PaymentInternalEvent event, final NotificationQueue janitorQueue) throws IOException;
+
+    public void attachJanitorQueue(final NotificationQueue janitorQueue) {
+        this.janitorQueue = janitorQueue;
+    }
+
     public interface JanitorIterationCallback {
-        public <T>  T doIteration();
+        public <T> T doIteration();
     }
 
     protected <T> T doJanitorOperationWithAccountLock(final JanitorIterationCallback callback, final InternalTenantContext internalTenantContext) {
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
index a02fb1a..d39f831 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentAttemptTask.java
@@ -17,6 +17,7 @@
 
 package org.killbill.billing.payment.core.janitor;
 
+import java.io.IOException;
 import java.util.List;
 
 import javax.inject.Inject;
@@ -27,6 +28,7 @@ import org.killbill.billing.account.api.AccountApiException;
 import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.PaymentInternalEvent;
 import org.killbill.billing.osgi.api.OSGIServiceRegistration;
 import org.killbill.billing.payment.api.PaymentApiException;
 import org.killbill.billing.payment.api.TransactionStatus;
@@ -46,6 +48,7 @@ import org.killbill.billing.util.config.PaymentConfig;
 import org.killbill.billing.util.entity.Pagination;
 import org.killbill.clock.Clock;
 import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.notificationq.api.NotificationQueue;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
@@ -142,6 +145,11 @@ public class IncompletePaymentAttemptTask extends CompletionTaskBase<PaymentAtte
         }
     }
 
+    @Override
+    public void processPaymentEvent(final PaymentInternalEvent event, final NotificationQueue janitorQueue) throws IOException {
+        // Nothing
+    }
+
     private DateTime getCreatedDateBefore() {
         final long delayBeforeNowMs = paymentConfig.getIncompleteAttemptsTimeSpanDelay().getMillis();
         return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
index 2779e21..99d4725 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/IncompletePaymentTransactionTask.java
@@ -17,9 +17,12 @@
 
 package org.killbill.billing.payment.core.janitor;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.UUID;
 
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import org.joda.time.DateTime;
@@ -27,6 +30,7 @@ import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.events.PaymentInternalEvent;
 import org.killbill.billing.osgi.api.OSGIServiceRegistration;
 import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.payment.api.TransactionStatus;
@@ -45,9 +49,10 @@ import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.TenantContext;
 import org.killbill.billing.util.config.PaymentConfig;
-import org.killbill.billing.util.entity.Pagination;
 import org.killbill.clock.Clock;
 import org.killbill.commons.locker.GlobalLocker;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -57,11 +62,26 @@ import com.google.common.collect.Iterables;
 
 public class IncompletePaymentTransactionTask extends CompletionTaskBase<PaymentTransactionModelDao> {
 
+    private final static long MILLIS_TO_SEC = 1000L;
+    private final static long SEC_TO_HOUR = 3600L;
+    private final static long HOURS_TO_DAY = 24L;
+
+    private final static List<Long> RETRY_ATTEMPTS = ImmutableList.<Long>of((3L * MILLIS_TO_SEC), // 3 min
+                                                                            (10L * MILLIS_TO_SEC), // 10 min
+                                                                            (1L * SEC_TO_HOUR * MILLIS_TO_SEC), // 1 hour
+                                                                            (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC), // 7 times every day
+                                                                            (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC),
+                                                                            (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC),
+                                                                            (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC),
+                                                                            (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC),
+                                                                            (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC),
+                                                                            (HOURS_TO_DAY * SEC_TO_HOUR * MILLIS_TO_SEC));
 
     private static final ImmutableList<TransactionStatus> TRANSACTION_STATUSES_TO_CONSIDER = ImmutableList.<TransactionStatus>builder()
                                                                                                           .add(TransactionStatus.PENDING)
                                                                                                           .add(TransactionStatus.UNKNOWN)
                                                                                                           .build();
+
     @Inject
     public IncompletePaymentTransactionTask(final InternalCallContextFactory internalCallContextFactory, final PaymentConfig paymentConfig,
                                             final PaymentDao paymentDao, final Clock clock,
@@ -72,24 +92,24 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
 
     @Override
     public Iterable<PaymentTransactionModelDao> getItemsForIteration() {
-        // Code will go away in the next commit -- allow to fix h2...
-        final Pagination<PaymentTransactionModelDao> result = paymentDao.getByTransactionStatusAcrossTenants(TRANSACTION_STATUSES_TO_CONSIDER, getCreatedDateBefore(), getCreatedDateAfter(), 0L, 1000L);
-        if (result.getTotalNbRecords() > 0) {
-            log.info("Janitor IncompletePaymentTransactionTask start run: found {} pending/unknown payments", result.getTotalNbRecords());
-        }
-        return result;
+        // This is not triggered by Janitor proper but instead relies on bus event + notificationQ
+        return ImmutableList.of();
     }
 
     @Override
     public void doIteration(final PaymentTransactionModelDao paymentTransaction) {
+        // Nothing
+    }
+
+    public void processNotification(final JanitorNotificationKey notificationKey, final UUID userToken, final Long accountRecordId, final long tenantRecordId) {
 
-        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(paymentTransaction.getTenantRecordId(), paymentTransaction.getAccountRecordId());
+        final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId);
         doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
             @Override
             public Void doIteration() {
 
                 // State may have changed since we originally retrieved with no lock
-                final PaymentTransactionModelDao rehydratedPaymentTransaction = paymentDao.getPaymentTransaction(paymentTransaction.getId(), internalTenantContext);
+                final PaymentTransactionModelDao rehydratedPaymentTransaction = paymentDao.getPaymentTransaction(notificationKey.getUuidKey(), internalTenantContext);
 
                 final TenantContext tenantContext = internalCallContextFactory.createTenantContext(internalTenantContext);
                 final PaymentModelDao payment = paymentDao.getPayment(rehydratedPaymentTransaction.getPaymentId(), internalTenantContext);
@@ -123,37 +143,45 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
                 } catch (final Exception e) {
                     paymentTransactionInfoPlugin = undefinedPaymentTransaction;
                 }
-                updatePaymentAndTransactionIfNeeded(payment, rehydratedPaymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+                updatePaymentAndTransactionIfNeeded(payment, notificationKey.getAttemptNumber(), userToken, rehydratedPaymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
                 return null;
             }
         }, internalTenantContext);
 
     }
 
+    @Override
+    public void processPaymentEvent(final PaymentInternalEvent event, final NotificationQueue janitorQueue) throws IOException {
+        if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(event.getStatus())) {
+            return;
+        }
+        insertNewNotificationForUnresolvedTransactionIfNeeded(event.getPaymentTransactionId(), 1, event.getUserToken(), event.getSearchKey1(), event.getSearchKey2());
+    }
+
     public boolean updatePaymentAndTransactionIfNeededWithAccountLock(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
         // Can happen in the GET case, see PaymentProcessor#toPayment
         if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(paymentTransaction.getTransactionStatus())) {
             // Nothing to do
             return false;
         }
-        final Boolean result =  doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
+        final Boolean result = doJanitorOperationWithAccountLock(new JanitorIterationCallback() {
             @Override
             public Boolean doIteration() {
-                return updatePaymentAndTransactionInternal(payment, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+                return updatePaymentAndTransactionInternal(payment, null, null, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
             }
         }, internalTenantContext);
         return result != null && result;
     }
 
-    private boolean updatePaymentAndTransactionIfNeeded(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
+    private boolean updatePaymentAndTransactionIfNeeded(final PaymentModelDao payment, final int attemptNumber, final UUID userToken, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
         if (!TRANSACTION_STATUSES_TO_CONSIDER.contains(paymentTransaction.getTransactionStatus())) {
             // Nothing to do
             return false;
         }
-        return updatePaymentAndTransactionInternal(payment, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
+        return updatePaymentAndTransactionInternal(payment, attemptNumber, userToken, paymentTransaction, paymentTransactionInfoPlugin, internalTenantContext);
     }
 
-    private boolean updatePaymentAndTransactionInternal(final PaymentModelDao payment, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
+    private boolean updatePaymentAndTransactionInternal(final PaymentModelDao payment, @Nullable final Integer attemptNumber, @Nullable final UUID userToken, final PaymentTransactionModelDao paymentTransaction, final PaymentTransactionInfoPlugin paymentTransactionInfoPlugin, final InternalTenantContext internalTenantContext) {
         final CallContext callContext = createCallContext("IncompletePaymentTransactionTask", internalTenantContext);
 
         // First obtain the new transactionStatus,
@@ -176,6 +204,7 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
                 log.info("Janitor IncompletePaymentTransactionTask unable to repair payment {}, transaction {}: {} -> {}",
                          payment.getId(), paymentTransaction.getId(), paymentTransaction.getTransactionStatus(), transactionStatus);
                 // We can't get anything interesting from the plugin...
+                insertNewNotificationForUnresolvedTransactionIfNeeded(paymentTransaction.getId(), attemptNumber, userToken, internalTenantContext.getAccountRecordId(), internalTenantContext.getTenantRecordId());
                 return false;
         }
 
@@ -203,7 +232,6 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
 
     }
 
-
     // Keep the existing currentTransactionStatus if we can't obtain a better answer from the plugin; if not, return the newTransactionStatus
     private TransactionStatus computeNewTransactionStatusFromPaymentTransactionInfoPlugin(final PaymentTransactionInfoPlugin input, final TransactionStatus currentTransactionStatus) {
         final TransactionStatus newTransactionStatus = PaymentTransactionInfoPluginConverter.toTransactionStatus(input);
@@ -222,6 +250,26 @@ public class IncompletePaymentTransactionTask extends CompletionTaskBase<Payment
         return pluginApi;
     }
 
+    private DateTime getNextNotificationTime(@Nullable final Integer attemptNumber) {
+        if (attemptNumber == null || attemptNumber > RETRY_ATTEMPTS.size()) {
+            return null;
+        }
+        final long nextDelay = RETRY_ATTEMPTS.get(attemptNumber - 1);
+        return clock.getUTCNow().plusMillis((int) nextDelay);
+    }
+
+    private void insertNewNotificationForUnresolvedTransactionIfNeeded(final UUID paymentTransactionId, @Nullable final Integer attemptNumber, @Nullable final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+        final NotificationEvent key = new JanitorNotificationKey(paymentTransactionId, IncompletePaymentTransactionTask.class.toString(), attemptNumber);
+        final DateTime notificationTime = getNextNotificationTime(attemptNumber);
+        if (notificationTime != null) {
+            try {
+                janitorQueue.recordFutureNotification(notificationTime, key, userToken, accountRecordId, tenantRecordId);
+            } catch (IOException e) {
+                log.warn("Janitor IncompletePaymentTransactionTask : Failed to insert future notification for paymentTransactionId = {}: {}", paymentTransactionId, e.getMessage());
+            }
+        }
+    }
+
     private DateTime getCreatedDateBefore() {
         final long delayBeforeNowMs = paymentConfig.getIncompleteTransactionsTimeSpanDelay().getMillis();
         return clock.getUTCNow().minusMillis((int) delayBeforeNowMs);
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
index 032b1a0..8ca2037 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/Janitor.java
@@ -17,14 +17,28 @@
 
 package org.killbill.billing.payment.core.janitor;
 
+import java.io.IOException;
+import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.inject.Inject;
 import javax.inject.Named;
 
+import org.joda.time.DateTime;
+import org.killbill.billing.events.PaymentErrorInternalEvent;
+import org.killbill.billing.events.PaymentInfoInternalEvent;
+import org.killbill.billing.events.PaymentInternalEvent;
+import org.killbill.billing.payment.glue.DefaultPaymentService;
 import org.killbill.billing.payment.glue.PaymentModule;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.config.PaymentConfig;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,32 +50,65 @@ public class Janitor {
     private static final Logger log = LoggerFactory.getLogger(Janitor.class);
 
     private static final int TERMINATION_TIMEOUT_SEC = 5;
+    public static final String QUEUE_NAME = "janitor";
 
+    private final NotificationQueueService notificationQueueService;
     private final ScheduledExecutorService janitorExecutor;
     private final PaymentConfig paymentConfig;
     private final IncompletePaymentAttemptTask incompletePaymentAttemptTask;
     private final IncompletePaymentTransactionTask incompletePaymentTransactionTask;
+    private final InternalCallContextFactory internalCallContextFactory;
+
+    private NotificationQueue janitorQueue;
 
     private volatile boolean isStopped;
 
     @Inject
     public Janitor(final PaymentConfig paymentConfig,
+                   final NotificationQueueService notificationQueueService,
                    @Named(PaymentModule.JANITOR_EXECUTOR_NAMED) final ScheduledExecutorService janitorExecutor,
                    final IncompletePaymentAttemptTask incompletePaymentAttemptTask,
-                   final IncompletePaymentTransactionTask incompletePaymentTransactionTask) {
+                   final IncompletePaymentTransactionTask incompletePaymentTransactionTask,
+                   final InternalCallContextFactory internalCallContextFactory) {
+        this.notificationQueueService = notificationQueueService;
         this.janitorExecutor = janitorExecutor;
         this.paymentConfig = paymentConfig;
         this.incompletePaymentAttemptTask = incompletePaymentAttemptTask;
         this.incompletePaymentTransactionTask = incompletePaymentTransactionTask;
+        this.internalCallContextFactory = internalCallContextFactory;
         this.isStopped = false;
     }
 
+    public void initialize() throws NotificationQueueAlreadyExists {
+        janitorQueue = notificationQueueService.createNotificationQueue(DefaultPaymentService.SERVICE_NAME,
+                                                                        QUEUE_NAME,
+                                                                        new NotificationQueueHandler() {
+                                                                            @Override
+                                                                            public void handleReadyNotification(final NotificationEvent notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                if (! (notificationKey instanceof JanitorNotificationKey)) {
+                                                                                    log.error("Janitor service received an unexpected event type {}" + notificationKey.getClass().getName());
+                                                                                    return;
+
+                                                                                }
+                                                                                final JanitorNotificationKey janitorKey = (JanitorNotificationKey) notificationKey;
+                                                                                if (janitorKey.getTaskName().equals(incompletePaymentTransactionTask.getClass().toString())) {
+                                                                                    incompletePaymentTransactionTask.processNotification(janitorKey, userToken, accountRecordId, tenantRecordId);
+                                                                                }
+                                                                            }
+                                                                        }
+                                                                       );
+        incompletePaymentTransactionTask.attachJanitorQueue(janitorQueue);
+        incompletePaymentAttemptTask.attachJanitorQueue(janitorQueue);
+    }
+
     public void start() {
         if (isStopped) {
             log.warn("Janitor is not a restartable service, and was already started, aborting");
             return;
         }
 
+        janitorQueue.startQueue();
+
         // Start task for completing incomplete payment attempts
         final TimeUnit attemptCompletionRateUnit = paymentConfig.getJanitorRunningRate().getUnit();
         final long attemptCompletionPeriod = paymentConfig.getJanitorRunningRate().getPeriod();
@@ -73,7 +120,7 @@ public class Janitor {
         janitorExecutor.scheduleAtFixedRate(incompletePaymentTransactionTask, erroredCompletionPeriod, erroredCompletionPeriod, erroredCompletionRateUnit);
     }
 
-    public void stop() {
+    public void stop() throws NoSuchNotificationQueue {
         if (isStopped) {
             log.warn("Janitor is already in a stopped state");
             return;
@@ -93,6 +140,11 @@ public class Janitor {
             if (!success) {
                 log.warn("Janitor failed to complete termination within " + TERMINATION_TIMEOUT_SEC + "sec");
             }
+
+            if (janitorQueue != null) {
+                janitorQueue.stopQueue();
+                notificationQueueService.deleteNotificationQueue(DefaultPaymentService.SERVICE_NAME, QUEUE_NAME);
+            }
         } catch (final InterruptedException e) {
             Thread.currentThread().interrupt();
             log.warn("Janitor stop sequence got interrupted");
@@ -100,4 +152,9 @@ public class Janitor {
             isStopped = true;
         }
     }
+
+    public void processPaymentEvent(final PaymentInternalEvent event) throws IOException {
+        incompletePaymentAttemptTask.processPaymentEvent(event, janitorQueue);
+        incompletePaymentTransactionTask.processPaymentEvent(event, janitorQueue);
+    }
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java b/payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java
new file mode 100644
index 0000000..b0f2c1d
--- /dev/null
+++ b/payment/src/main/java/org/killbill/billing/payment/core/janitor/JanitorNotificationKey.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.payment.core.janitor;
+
+import java.util.UUID;
+
+import org.killbill.notificationq.DefaultUUIDNotificationKey;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class JanitorNotificationKey extends DefaultUUIDNotificationKey {
+
+    private final Integer attemptNumber;
+    private final String taskName;
+
+    @JsonCreator
+    public JanitorNotificationKey(@JsonProperty("uuidKey") final UUID uuidKey,
+                                  @JsonProperty("taskName") final String taskName,
+                                  @JsonProperty("attemptNumber") final Integer attemptNumber) {
+        super(uuidKey);
+        this.attemptNumber = attemptNumber;
+        this.taskName = taskName;
+    }
+
+    public Integer getAttemptNumber() {
+        return attemptNumber;
+    }
+
+    public String getTaskName() {
+        return taskName;
+    }
+}
\ No newline at end of file
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java b/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java
index 5b340aa..3dc675a 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/sm/payments/PaymentEnteringStateCallback.java
@@ -65,6 +65,8 @@ public abstract class PaymentEnteringStateCallback implements EnteringStateCallb
             //
             final BusInternalEvent event = new DefaultPaymentErrorEvent(paymentStateContext.getAccount().getId(),
                                                                         null,
+                                                                        null,
+                                                                        null,
                                                                         paymentStateContext.getTransactionType(),
                                                                         "Early abortion of payment transaction",
                                                                         paymentStateContext.getInternalCallContext().getAccountRecordId(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index 0921bea..33ab485 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -301,7 +301,7 @@ public class DefaultPaymentDao implements PaymentDao {
                 } else {
                     entitySqlDaoWrapperFactory.become(PaymentSqlDao.class).updatePaymentStateName(paymentId.toString(), currentPaymentStateName, context);
                 }
-                postPaymentEventFromTransaction(accountId, transactionStatus, transactionType, paymentId, processedAmount, processedCurrency, clock.getUTCNow(), gatewayErrorCode, entitySqlDaoWrapperFactory, context);
+                postPaymentEventFromTransaction(accountId, transactionStatus, transactionType, paymentId, transactionId, processedAmount, processedCurrency, clock.getUTCNow(), gatewayErrorCode, entitySqlDaoWrapperFactory, context);
                 return null;
             }
         });
@@ -561,6 +561,7 @@ public class DefaultPaymentDao implements PaymentDao {
                                                  final TransactionStatus transactionStatus,
                                                  final TransactionType transactionType,
                                                  final UUID paymentId,
+                                                 final UUID transactionId,
                                                  final BigDecimal processedAmount,
                                                  final Currency processedCurrency,
                                                  final DateTime effectiveDate,
@@ -574,6 +575,7 @@ public class DefaultPaymentDao implements PaymentDao {
             case PENDING:
                 event = new DefaultPaymentInfoEvent(accountId,
                                                     paymentId,
+                                                    transactionId,
                                                     processedAmount,
                                                     processedCurrency,
                                                     transactionStatus,
@@ -587,6 +589,8 @@ public class DefaultPaymentDao implements PaymentDao {
             case PAYMENT_FAILURE:
                 event = new DefaultPaymentErrorEvent(accountId,
                                                      paymentId,
+                                                     transactionId,
+                                                     transactionStatus,
                                                      transactionType,
                                                      gatewayErrorCode,
                                                      context.getAccountRecordId(),
@@ -598,6 +602,8 @@ public class DefaultPaymentDao implements PaymentDao {
             default:
                 event = new DefaultPaymentPluginErrorEvent(accountId,
                                                            paymentId,
+                                                           transactionId,
+                                                           transactionStatus,
                                                            transactionType,
                                                            gatewayErrorCode,
                                                            context.getAccountRecordId(),
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java b/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
index 5cf77f0..d003f74 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/DefaultPaymentService.java
@@ -20,7 +20,7 @@ package org.killbill.billing.payment.glue;
 
 import org.killbill.billing.payment.api.PaymentApi;
 import org.killbill.billing.payment.api.PaymentService;
-import org.killbill.billing.payment.bus.InvoiceHandler;
+import org.killbill.billing.payment.bus.PaymentBusEventHandler;
 import org.killbill.billing.payment.invoice.PaymentTagHandler;
 import org.killbill.billing.payment.core.janitor.Janitor;
 import org.killbill.billing.payment.retry.DefaultRetryService;
@@ -40,7 +40,7 @@ public class DefaultPaymentService implements PaymentService {
 
     public static final String SERVICE_NAME = "payment-service";
 
-    private final InvoiceHandler invoiceHandler;
+    private final PaymentBusEventHandler paymentBusEventHandler;
     private final PaymentTagHandler tagHandler;
     private final PersistentBus eventBus;
     private final PaymentApi api;
@@ -48,13 +48,13 @@ public class DefaultPaymentService implements PaymentService {
     private final Janitor janitor;
 
     @Inject
-    public DefaultPaymentService(final InvoiceHandler invoiceHandler,
+    public DefaultPaymentService(final PaymentBusEventHandler paymentBusEventHandler,
                                  final PaymentTagHandler tagHandler,
                                  final PaymentApi api,
                                  final DefaultRetryService retryService,
                                  final PersistentBus eventBus,
                                  final Janitor janitor) {
-        this.invoiceHandler = invoiceHandler;
+        this.paymentBusEventHandler = paymentBusEventHandler;
         this.tagHandler = tagHandler;
         this.eventBus = eventBus;
         this.api = api;
@@ -70,12 +70,13 @@ public class DefaultPaymentService implements PaymentService {
     @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
     public void initialize() throws NotificationQueueAlreadyExists {
         try {
-            eventBus.register(invoiceHandler);
+            eventBus.register(paymentBusEventHandler);
             eventBus.register(tagHandler);
         } catch (final PersistentBus.EventBusException e) {
             log.error("Unable to register with the EventBus!", e);
         }
-        retryService.initialize(SERVICE_NAME);
+        retryService.initialize();
+        janitor.initialize();
     }
 
     @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
@@ -87,7 +88,7 @@ public class DefaultPaymentService implements PaymentService {
     @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
     public void stop() throws NoSuchNotificationQueue {
         try {
-            eventBus.unregister(invoiceHandler);
+            eventBus.unregister(paymentBusEventHandler);
             eventBus.unregister(tagHandler);
         } catch (final PersistentBus.EventBusException e) {
             throw new RuntimeException("Unable to unregister to the EventBus!", e);
diff --git a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
index 8053d33..a602405 100644
--- a/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
+++ b/payment/src/main/java/org/killbill/billing/payment/glue/PaymentModule.java
@@ -36,7 +36,7 @@ import org.killbill.billing.payment.api.DefaultPaymentGatewayApi;
 import org.killbill.billing.payment.api.PaymentApi;
 import org.killbill.billing.payment.api.PaymentGatewayApi;
 import org.killbill.billing.payment.api.PaymentService;
-import org.killbill.billing.payment.bus.InvoiceHandler;
+import org.killbill.billing.payment.bus.PaymentBusEventHandler;
 import org.killbill.billing.payment.core.PaymentGatewayProcessor;
 import org.killbill.billing.payment.core.PaymentMethodProcessor;
 import org.killbill.billing.payment.core.PaymentProcessor;
@@ -182,7 +182,7 @@ public class PaymentModule extends KillBillModule {
         bind(PaymentApi.class).to(DefaultPaymentApi.class).asEagerSingleton();
         bind(PaymentGatewayApi.class).to(DefaultPaymentGatewayApi.class).asEagerSingleton();
         bind(AdminPaymentApi.class).to(DefaultAdminPaymentApi.class).asEagerSingleton();
-        bind(InvoiceHandler.class).asEagerSingleton();
+        bind(PaymentBusEventHandler.class).asEagerSingleton();
         bind(PaymentTagHandler.class).asEagerSingleton();
         bind(PaymentService.class).to(DefaultPaymentService.class).asEagerSingleton();
         installPaymentProviderPlugins(paymentConfig);
diff --git a/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java
index 7370f39..a5ec4ce 100644
--- a/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/retry/BaseRetryService.java
@@ -44,10 +44,10 @@ import com.google.inject.Inject;
 public abstract class BaseRetryService implements RetryService {
 
     private static final Logger log = LoggerFactory.getLogger(BaseRetryService.class);
-    private static final String PAYMENT_RETRY_SERVICE = "PaymentRetryService";
 
     private final NotificationQueueService notificationQueueService;
     private final InternalCallContextFactory internalCallContextFactory;
+    private final String paymentRetryService;
 
     private NotificationQueue retryQueue;
 
@@ -55,11 +55,12 @@ public abstract class BaseRetryService implements RetryService {
                             final InternalCallContextFactory internalCallContextFactory) {
         this.notificationQueueService = notificationQueueService;
         this.internalCallContextFactory = internalCallContextFactory;
+        this.paymentRetryService = DefaultPaymentService.SERVICE_NAME + "-" + getQueueName();
     }
 
     @Override
-    public void initialize(final String svcName) throws NotificationQueueAlreadyExists {
-        retryQueue = notificationQueueService.createNotificationQueue(svcName,
+    public void initialize() throws NotificationQueueAlreadyExists {
+        retryQueue = notificationQueueService.createNotificationQueue(DefaultPaymentService.SERVICE_NAME,
                                                                       getQueueName(),
                                                                       new NotificationQueueHandler() {
                                                                           @Override
@@ -69,7 +70,7 @@ public abstract class BaseRetryService implements RetryService {
                                                                                   return;
                                                                               }
                                                                               final PaymentRetryNotificationKey key = (PaymentRetryNotificationKey) notificationKey;
-                                                                              final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
+                                                                              final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, paymentRetryService, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
                                                                               retryPaymentTransaction(key.getAttemptId(), key.getPaymentControlPluginNames(), callContext);
                                                                           }
                                                                       }
@@ -132,7 +133,8 @@ public abstract class BaseRetryService implements RetryService {
         }
 
         protected InternalCallContext createCallContextFromPaymentId(final ObjectType objectType, final UUID objectId, final Long tenantRecordId) {
-            return internalCallContextFactory.createInternalCallContext(objectId, objectType, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, null, tenantRecordId);
+            final String paymentRetryService = DefaultPaymentService.SERVICE_NAME + "-" + getQueueName();
+            return internalCallContextFactory.createInternalCallContext(objectId, objectType, paymentRetryService, CallOrigin.INTERNAL, UserType.SYSTEM, null, tenantRecordId);
         }
 
         public abstract String getQueueName();
diff --git a/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java b/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java
index ea320f1..03bdeee 100644
--- a/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java
+++ b/payment/src/main/java/org/killbill/billing/payment/retry/RetryService.java
@@ -28,7 +28,7 @@ import org.killbill.notificationq.api.NotificationQueueService.NotificationQueue
 
 public interface RetryService {
 
-    public void initialize(final String svcName) throws NotificationQueueAlreadyExists;
+    public void initialize() throws NotificationQueueAlreadyExists;
 
     public void start();
 
diff --git a/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java b/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java
index 809199f..8e2338a 100644
--- a/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java
+++ b/payment/src/test/java/org/killbill/billing/payment/api/TestEventJson.java
@@ -34,7 +34,7 @@ public class TestEventJson extends PaymentTestSuiteNoDB {
 
     @Test(groups = "fast")
     public void testPaymentErrorEvent() throws Exception {
-        final PaymentErrorInternalEvent e = new DefaultPaymentErrorEvent(UUID.randomUUID(), UUID.randomUUID(), TransactionType.PURCHASE, "no message", 1L, 2L, UUID.randomUUID());
+        final PaymentErrorInternalEvent e = new DefaultPaymentErrorEvent(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), TransactionStatus.SUCCESS, TransactionType.PURCHASE, "no message", 1L, 2L, UUID.randomUUID());
         final String json = mapper.writeValueAsString(e);
 
         final Class<?> claz = Class.forName(DefaultPaymentErrorEvent.class.getName());
@@ -44,7 +44,7 @@ public class TestEventJson extends PaymentTestSuiteNoDB {
 
     @Test(groups = "fast")
     public void testPaymentInfoEvent() throws Exception {
-        final PaymentInfoInternalEvent e = new DefaultPaymentInfoEvent(UUID.randomUUID(), UUID.randomUUID(), new BigDecimal(12.9), Currency.EUR, TransactionStatus.SUCCESS,
+        final PaymentInfoInternalEvent e = new DefaultPaymentInfoEvent(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), new BigDecimal(12.9), Currency.EUR, TransactionStatus.SUCCESS,
                                                                        TransactionType.PURCHASE, new DateTime(), 1L, 2L, UUID.randomUUID());
         final String json = mapper.writeValueAsString(e);
 
diff --git a/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java b/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java
index 7ad8a57..b7ba850 100644
--- a/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/glue/TestPaymentModuleWithEmbeddedDB.java
@@ -20,6 +20,7 @@ package org.killbill.billing.payment.glue;
 
 import org.killbill.billing.GuicyKillbillTestWithEmbeddedDBModule;
 import org.killbill.billing.account.glue.DefaultAccountModule;
+import org.killbill.billing.api.TestApiListener;
 import org.killbill.billing.platform.api.KillbillConfigSource;
 import org.killbill.billing.util.glue.NonEntityDaoModule;
 import org.killbill.clock.Clock;
@@ -35,7 +36,7 @@ public class TestPaymentModuleWithEmbeddedDB extends TestPaymentModule {
         install(new GuicyKillbillTestWithEmbeddedDBModule(configSource));
         install(new NonEntityDaoModule(configSource));
         install(new DefaultAccountModule(configSource));
-
+        bind(TestApiListener.class).asEagerSingleton();
         super.configure();
     }
 }
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
index bc68a36..cca3b59 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -23,13 +23,19 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
 
 import org.joda.time.LocalDate;
 import org.killbill.billing.account.api.Account;
+import org.killbill.billing.api.TestApiListener;
+import org.killbill.billing.api.TestApiListener.NextEvent;
+import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.catalog.api.Currency;
 import org.killbill.billing.invoice.api.Invoice;
 import org.killbill.billing.invoice.api.InvoiceApiException;
 import org.killbill.billing.invoice.api.InvoiceItem;
+import org.killbill.billing.payment.api.DefaultPayment;
 import org.killbill.billing.payment.api.Payment;
 import org.killbill.billing.payment.api.PaymentApiException;
 import org.killbill.billing.payment.api.PaymentOptions;
@@ -37,13 +43,21 @@ import org.killbill.billing.payment.api.PaymentTransaction;
 import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.payment.api.TransactionStatus;
 import org.killbill.billing.payment.api.TransactionType;
+import org.killbill.billing.payment.bus.PaymentBusEventHandler;
 import org.killbill.billing.payment.core.janitor.Janitor;
 import org.killbill.billing.payment.dao.PaymentAttemptModelDao;
 import org.killbill.billing.payment.dao.PaymentTransactionModelDao;
+import org.killbill.billing.payment.glue.DefaultPaymentService;
 import org.killbill.billing.payment.invoice.InvoicePaymentRoutingPluginApi;
 import org.killbill.billing.payment.provider.MockPaymentProviderPlugin;
 import org.killbill.billing.platform.api.KillbillConfigSource;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.bus.api.PersistentBus.EventBusException;
+import org.killbill.commons.profiling.Profiling;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.testng.Assert;
@@ -53,11 +67,16 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 
+import static com.jayway.awaitility.Awaitility.await;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
 
 public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
 
@@ -75,6 +94,14 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
 
     @Inject
     private Janitor janitor;
+    @Inject
+    private PaymentBusEventHandler handler;
+    @Inject
+    protected TestApiListener testListener;
+    @Inject
+    protected InternalCallContextFactory internalCallContextFactory;
+    @Inject
+    protected NotificationQueueService notificationQueueService;
 
     private MockPaymentProviderPlugin mockPaymentProviderPlugin;
 
@@ -93,6 +120,7 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
     protected void beforeClass() throws Exception {
         super.beforeClass();
         mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
+        janitor.initialize();
         janitor.start();
     }
 
@@ -104,12 +132,17 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
     @BeforeMethod(groups = "slow")
     public void beforeMethod() throws Exception {
         super.beforeMethod();
+        eventBus.register(handler);
+        testListener.reset();
+        eventBus.register(testListener);
         mockPaymentProviderPlugin.clear();
         account = testHelper.createTestAccount("bobo@gmail.com", true);
     }
 
     @AfterMethod(groups = "slow")
     public void afterMethod() throws Exception {
+        eventBus.unregister(handler);
+        eventBus.unregister(testListener);
         super.afterMethod();
     }
 
@@ -233,20 +266,23 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
         final String paymentExternalKey = "qwru";
         final String transactionExternalKey = "lkjdsf";
 
+        testListener.pushExpectedEvent(NextEvent.PAYMENT);
         final Payment payment = paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
                                                                transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
+        testListener.assertListenerStatus();
 
         // Artificially move the transaction status to UNKNOWN
+        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
         final String paymentStateName = paymentSMHelper.getErroredStateForTransaction(TransactionType.AUTHORIZE).toString();
+        testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
         paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
                                                            payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
                                                            "foo", "bar", internalCallContext);
-        // The UnknownPaymentTransactionTask will look for UNKNOWN payment that *just happened* , and that are not too old (less than 7 days)
-        clock.addDays(1);
-        try {
-            Thread.sleep(1500);
-        } catch (InterruptedException e) {
-        }
+        testListener.assertListenerStatus();
+
+        // Move clock for notification to be processed
+        clock.addDeltaFromReality(5 * 60 * 1000);
+        assertNotificationsCompleted(internalCallContext, 5);
 
         final Payment updatedPayment = paymentApi.getPayment(payment.getId(), false, ImmutableList.<PluginProperty>of(), callContext);
         assertEquals(updatedPayment.getTransactions().get(0).getTransactionStatus(), TransactionStatus.SUCCESS);
@@ -261,23 +297,26 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
 
         // Make sure the state as seen by the plugin will be in PaymentPluginStatus.ERROR, which will be returned later to Janitor
         mockPaymentProviderPlugin.makeNextPaymentFailWithError();
+        testListener.pushExpectedEvent(NextEvent.PAYMENT_ERROR);
         final Payment payment = paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
                                                                transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
+        testListener.assertListenerStatus();
 
         // Artificially move the transaction status to UNKNOWN
+        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
         final String paymentStateName = paymentSMHelper.getErroredStateForTransaction(TransactionType.AUTHORIZE).toString();
+        testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
         paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
                                                            payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
                                                            "foo", "bar", internalCallContext);
+        testListener.assertListenerStatus();
 
         final List<PaymentTransactionModelDao> paymentTransactionHistoryBeforeJanitor = getPaymentTransactionHistory(transactionExternalKey);
         Assert.assertEquals(paymentTransactionHistoryBeforeJanitor.size(), 3);
 
-        clock.addDays(1);
-        try {
-            Thread.sleep(1500);
-        } catch (InterruptedException e) {
-        }
+        // Move clock for notification to be processed
+        clock.addDeltaFromReality(5 * 60 * 1000);
+        assertNotificationsCompleted(internalCallContext, 5);
 
         // Proves the Janitor ran (and updated the transaction)
         final List<PaymentTransactionModelDao> paymentTransactionHistoryAfterJanitor = getPaymentTransactionHistory(transactionExternalKey);
@@ -299,26 +338,33 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
         // Make sure the state as seen by the plugin will be in PaymentPluginStatus.ERROR, which will be returned later to Janitor
         mockPaymentProviderPlugin.makeNextPaymentFailWithException();
         try {
+            testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
             paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
                                            transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
         } catch (PaymentApiException ignore) {
+            testListener.assertListenerStatus();
         }
         final Payment payment = paymentApi.getPaymentByExternalKey(paymentExternalKey, false, ImmutableList.<PluginProperty>of(), callContext);
 
         // Artificially move the transaction status to UNKNOWN
+        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
+
         final String paymentStateName = paymentSMHelper.getErroredStateForTransaction(TransactionType.AUTHORIZE).toString();
+        testListener.pushExpectedEvent(NextEvent.PAYMENT_PLUGIN_ERROR);
         paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
                                                            payment.getTransactions().get(0).getId(), TransactionStatus.UNKNOWN, requestedAmount, account.getCurrency(),
                                                            "foo", "bar", internalCallContext);
+        testListener.assertListenerStatus();
+
+        // Move clock for notification to be processed
+        clock.addDeltaFromReality(5 * 60 * 1000);
+        // NO because we will keep retrying as we can't fix it...
+        //assertNotificationsCompleted(internalCallContext, 5);
 
         final List<PaymentTransactionModelDao> paymentTransactionHistoryBeforeJanitor = getPaymentTransactionHistory(transactionExternalKey);
         Assert.assertEquals(paymentTransactionHistoryBeforeJanitor.size(), 3);
 
-        clock.addDays(1);
-        try {
-            Thread.sleep(1500);
-        } catch (InterruptedException e) {
-        }
+
 
         // Nothing new happened
         final List<PaymentTransactionModelDao> paymentTransactionHistoryAfterJanitor = getPaymentTransactionHistory(transactionExternalKey);
@@ -326,31 +372,36 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
     }
 
     @Test(groups = "slow")
-    public void testPendingEntries() throws PaymentApiException, EventBusException {
+    public void testPendingEntries() throws PaymentApiException, EventBusException, NoSuchNotificationQueue {
 
         final BigDecimal requestedAmount = BigDecimal.TEN;
         final String paymentExternalKey = "jhj44";
         final String transactionExternalKey = "4jhjj2";
 
+        testListener.pushExpectedEvent(NextEvent.PAYMENT);
         final Payment payment = paymentApi.createAuthorization(account, account.getPaymentMethodId(), null, requestedAmount, account.getCurrency(), paymentExternalKey,
                                                                transactionExternalKey, ImmutableList.<PluginProperty>of(), callContext);
+        testListener.assertListenerStatus();
+
+        final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
 
         // Artificially move the transaction status to PENDING
         final String paymentStateName = paymentSMHelper.getPendingStateForTransaction(TransactionType.AUTHORIZE).toString();
+        testListener.pushExpectedEvent(NextEvent.PAYMENT);
         paymentDao.updatePaymentAndTransactionOnCompletion(account.getId(), payment.getId(), TransactionType.AUTHORIZE, paymentStateName, paymentStateName,
                                                            payment.getTransactions().get(0).getId(), TransactionStatus.PENDING, requestedAmount, account.getCurrency(),
                                                            "loup", "chat", internalCallContext);
-        clock.addDays(1);
-        try {
-            Thread.sleep(1500);
-        } catch (InterruptedException e) {
-        }
+        testListener.assertListenerStatus();
+
+        // Move clock for notification to be processed
+        clock.addDeltaFromReality(5 * 60 * 1000);
 
+        assertNotificationsCompleted(internalCallContext, 5);
         final Payment updatedPayment = paymentApi.getPayment(payment.getId(), false, ImmutableList.<PluginProperty>of(), callContext);
         Assert.assertEquals(updatedPayment.getTransactions().get(0).getTransactionStatus(), TransactionStatus.SUCCESS);
-
     }
 
+
     private List<PluginProperty> createPropertiesForInvoice(final Invoice invoice) {
         final List<PluginProperty> result = new ArrayList<PluginProperty>();
         result.add(new PluginProperty(InvoicePaymentRoutingPluginApi.PROP_IPCD_INVOICE_ID, invoice.getId().toString(), false));
@@ -385,5 +436,19 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
             }
         });
     }
+
+    private void assertNotificationsCompleted(final InternalCallContext internalCallContext, final long timeoutSec) {
+        try {
+            await().atMost(timeoutSec, SECONDS).until(new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    final List<NotificationEventWithMetadata<NotificationEvent>> notifications = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+                    return notifications.isEmpty();
+                }
+            });
+        } catch (final Exception e) {
+            fail("Test failed ", e);
+        }
+    }
 }
 
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java b/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
index 2a1ae78..dcf9a9a 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestRetryService.java
@@ -70,7 +70,7 @@ public class TestRetryService extends PaymentTestSuiteNoDB {
         ((MockPaymentDao) paymentDao).reset();
         mockPaymentProviderPlugin = (MockPaymentProviderPlugin) registry.getServiceForName(MockPaymentProviderPlugin.PLUGIN_NAME);
         mockPaymentProviderPlugin.clear();
-        retryService.initialize(DefaultPaymentService.SERVICE_NAME);
+        retryService.initialize();
         retryService.start();
 
     }