killbill-aplcache

util: first pass at linking bus events and notifications Link

11/29/2012 12:51:27 AM

Changes

Details

diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
index ef98c42..517ea8d 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
@@ -140,7 +140,7 @@ public class BeatrixListener {
         default:
         }
         return eventBusType != null ?
-                new ExtBusEventEntry(hostname, objectType, objectId, eventBusType, event.getAccountRecordId(), event.getAccountRecordId()) : null;
+                new ExtBusEventEntry(hostname, objectType, objectId, event.getUserToken(), eventBusType, event.getAccountRecordId(), event.getAccountRecordId()) : null;
     }
 
 }
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java
index b882a7c..a34e62f 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java
@@ -37,12 +37,12 @@ public class ExtBusEventEntry implements PersistentQueueEntryLifecycle {
     private final ObjectType objectType;
     private final UUID objectId;
     private final ExtBusEventType extBusType;
-
+    private final UUID userToken;
 
     public ExtBusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable,
-                         final PersistentQueueEntryLifecycleState processingState,
-                         final ObjectType objectType, final UUID objectId, final  ExtBusEventType extBusType,
-                         final Long accountRecordId, final Long tenantRecordId) {
+                            final PersistentQueueEntryLifecycleState processingState,
+                            final ObjectType objectType, final UUID objectId, final UUID userToken, final ExtBusEventType extBusType,
+                            final Long accountRecordId, final Long tenantRecordId) {
         this.id = id;
         this.createdOwner = createdOwner;
         this.owner = owner;
@@ -51,14 +51,14 @@ public class ExtBusEventEntry implements PersistentQueueEntryLifecycle {
         this.objectType = objectType;
         this.objectId = objectId;
         this.extBusType = extBusType;
+        this.userToken = userToken;
         this.accountRecordId = accountRecordId;
         this.tenantRecordId = tenantRecordId;
     }
 
-    public ExtBusEventEntry(final String createdOwner,
-            final ObjectType objectType, final UUID objectId, final  ExtBusEventType extBusType,
-                         final Long accountRecordId, final Long tenantRecordId) {
-        this(0, createdOwner, null, null, null, objectType, objectId, extBusType, accountRecordId, tenantRecordId);
+    public ExtBusEventEntry(final String createdOwner, final ObjectType objectType, final UUID objectId, final UUID userToken, final ExtBusEventType extBusType,
+                            final Long accountRecordId, final Long tenantRecordId) {
+        this(0, createdOwner, null, null, null, objectType, objectId, userToken, extBusType, accountRecordId, tenantRecordId);
     }
 
     public long getId() {
@@ -78,6 +78,11 @@ public class ExtBusEventEntry implements PersistentQueueEntryLifecycle {
     }
 
     @Override
+    public UUID getUserToken() {
+        return userToken;
+    }
+
+    @Override
     public String getOwner() {
         return owner;
     }
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java
index 2f3c3d0..f174158 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java
@@ -13,6 +13,7 @@
  * License for the specific language governing permissions and limitations
  * under the License.
  */
+
 package com.ning.billing.beatrix.extbus.dao;
 
 import java.sql.ResultSet;
@@ -42,43 +43,41 @@ import com.ning.billing.util.dao.BinderBase;
 import com.ning.billing.util.dao.MapperBase;
 import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
-
 @ExternalizedSqlViaStringTemplate3()
 public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
 
-
     @SqlQuery
     @Mapper(ExtBusSqlMapper.class)
     public ExtBusEventEntry getNextBusExtEventEntry(@Bind("max") int max,
-                                              @Bind("owner") String owner,
-                                              @Bind("now") Date now,
-                                              @InternalTenantContextBinder final InternalTenantContext context);
+                                                    @Bind("owner") String owner,
+                                                    @Bind("now") Date now,
+                                                    @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlUpdate
     public int claimBusExtEvent(@Bind("owner") String owner,
-                             @Bind("nextAvailable") Date nextAvailable,
-                             @Bind("recordId") Long id,
-                             @Bind("now") Date now,
-                             @InternalTenantContextBinder final InternalCallContext context);
+                                @Bind("nextAvailable") Date nextAvailable,
+                                @Bind("recordId") Long id,
+                                @Bind("now") Date now,
+                                @InternalTenantContextBinder final InternalCallContext context);
 
     @SqlUpdate
     public void clearBusExtEvent(@Bind("recordId") Long id,
-                              @Bind("owner") String owner,
-                              @InternalTenantContextBinder final InternalCallContext context);
+                                 @Bind("owner") String owner,
+                                 @InternalTenantContextBinder final InternalCallContext context);
 
     @SqlUpdate
     public void removeBusExtEventsById(@Bind("recordId") Long id,
-                                    @InternalTenantContextBinder final InternalCallContext context);
+                                       @InternalTenantContextBinder final InternalCallContext context);
 
     @SqlUpdate
     public void insertBusExtEvent(@Bind(binder = ExtBusSqlBinder.class) ExtBusEventEntry evt,
-                               @InternalTenantContextBinder final InternalCallContext context);
+                                  @InternalTenantContextBinder final InternalCallContext context);
 
     @SqlUpdate
     public void insertClaimedExtHistory(@Bind("ownerId") String owner,
-                                     @Bind("claimedDate") Date claimedDate,
-                                     @Bind("busEventId") long id,
-                                     @InternalTenantContextBinder final InternalCallContext context);
+                                        @Bind("claimedDate") Date claimedDate,
+                                        @Bind("busEventId") long id,
+                                        @InternalTenantContextBinder final InternalCallContext context);
 
     public static class ExtBusSqlBinder extends BinderBase implements Binder<Bind, ExtBusEventEntry> {
 
@@ -87,6 +86,7 @@ public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
             stmt.bind("eventType", evt.getExtBusType().toString());
             stmt.bind("objectId", evt.getObjectId().toString());
             stmt.bind("objectType", evt.getObjectType().toString());
+            stmt.bind("userToken", getUUIDString(evt.getUserToken()));
             stmt.bind("createdDate", getDate(new DateTime()));
             stmt.bind("creatingOwner", evt.getCreatedOwner());
             stmt.bind("processingAvailableDate", getDate(evt.getNextAvailableDate()));
@@ -104,6 +104,7 @@ public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
             final ExtBusEventType eventType = ExtBusEventType.valueOf(r.getString("event_type"));
             final UUID objectId = getUUID(r, "object_id");
             final ObjectType objectType = ObjectType.valueOf(r.getString("object_type"));
+            final UUID userToken = getUUID(r, "user_token");
             final String createdOwner = r.getString("creating_owner");
             final DateTime nextAvailableDate = getDateTime(r, "processing_available_date");
             final String processingOwner = r.getString("processing_owner");
@@ -111,7 +112,7 @@ public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
             final Long accountRecordId = r.getLong("account_record_id");
             final Long tenantRecordId = r.getLong("tenant_record_id");
             return new ExtBusEventEntry(recordId, createdOwner, processingOwner, nextAvailableDate, processingState,
-                    objectType, objectId, eventType, accountRecordId, tenantRecordId);
+                                        objectType, objectId, userToken, eventType, accountRecordId, tenantRecordId);
         }
     }
 }
diff --git a/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql b/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
index 13857a7..93d3421 100644
--- a/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
+++ b/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
@@ -6,6 +6,7 @@ CREATE TABLE bus_ext_events (
     event_type varchar(32) NOT NULL,
     object_id varchar(64) NOT NULL,
     object_type varchar(32) NOT NULL,
+    user_token char(36),
     created_date datetime NOT NULL,
     creating_owner char(50) NOT NULL,
     processing_owner char(50) DEFAULT NULL,
diff --git a/beatrix/src/main/resources/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.sql.stg b/beatrix/src/main/resources/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.sql.stg
index 897d273..c1acfa4 100644
--- a/beatrix/src/main/resources/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.sql.stg
+++ b/beatrix/src/main/resources/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.sql.stg
@@ -9,6 +9,7 @@ getNextBusExtEventEntry() ::= <<
       , event_type
       , object_id
       , object_type
+      , user_token
       , created_date
       , creating_owner
       , processing_owner
@@ -66,6 +67,7 @@ insertBusExtEvent() ::= <<
      event_type
     , object_id
     , object_type
+    , user_token
     , created_date
     , creating_owner
     , processing_owner
@@ -76,7 +78,8 @@ insertBusExtEvent() ::= <<
     ) values (
       :eventType
     , :objectId
-    , :objectType    
+    , :objectType
+    , :userToken
     , :createdDate
     , :creatingOwner
     , :processingOwner
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 27c9eea..10ac6c9 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -115,7 +115,7 @@ public class Engine implements EventListener, EntitlementService {
         try {
             final NotificationQueueHandler queueHandler = new NotificationQueueHandler() {
                 @Override
-                public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+                public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final UUID fromNotificationQueueUserToken, final Long accountRecordId, final Long tenantRecordId) {
                     if (!(inputKey instanceof EntitlementNotificationKey)) {
                         log.error("Entitlement service received an unexpected event type {}" + inputKey.getClass().getName());
                         return;
@@ -128,7 +128,8 @@ public class Engine implements EventListener, EntitlementService {
                         return;
                     }
 
-                    final UUID userToken = (event.getType() == EventType.API_USER) ? ((ApiEvent) event).getUserToken() : null;
+                    // TODO STEPH?
+                    final UUID userToken = (event.getType() == EventType.API_USER) ? ((ApiEvent) event).getUserToken() : fromNotificationQueueUserToken;
                     final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "SubscriptionEventQueue", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
                     processEventReady(event, key.getSeqId(), context);
                 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
index aecdd99..c033364 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
@@ -853,7 +853,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
         try {
             final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
                                                                                                            Engine.NOTIFICATION_QUEUE_NAME);
-            subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, null, notificationKey, context);
+            subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, notificationKey, context);
         } catch (NoSuchNotificationQueue e) {
             throw new RuntimeException(e);
         } catch (IOException e) {
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index b73c1e3..408a0f9 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -412,7 +412,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
         try {
             final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
                                                                                                            Engine.NOTIFICATION_QUEUE_NAME);
-            subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, null, notificationKey, context);
+            subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey, context);
         } catch (NoSuchNotificationQueue e) {
             throw new RuntimeException(e);
         } catch (IOException e) {
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index c1920f4..02b3ec5 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -55,7 +55,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.inject.Inject;
 
@@ -199,7 +198,7 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
                         transInvoiceItemSqlDao.create(invoiceItemModelDao, context);
                     }
 
-                    notifyOfFutureBillingEvents(entitySqlDaoWrapperFactory, invoice.getAccountId(), callbackDateTimePerSubscriptions);
+                    notifyOfFutureBillingEvents(entitySqlDaoWrapperFactory, invoice.getAccountId(), callbackDateTimePerSubscriptions, context.getUserToken());
 
                     // Create associated payments
                     final InvoicePaymentSqlDao invoicePaymentSqlDao = entitySqlDaoWrapperFactory.become(InvoicePaymentSqlDao.class);
@@ -939,10 +938,11 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
         invoice.addPayments(invoicePayments);
     }
 
-    private void notifyOfFutureBillingEvents(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId, final Map<UUID, DateTime> callbackDateTimePerSubscriptions) {
+    private void notifyOfFutureBillingEvents(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId,
+                                             final Map<UUID, DateTime> callbackDateTimePerSubscriptions, final UUID userToken) {
         for (final UUID subscriptionId : callbackDateTimePerSubscriptions.keySet()) {
             final DateTime callbackDateTimeUTC = callbackDateTimePerSubscriptions.get(subscriptionId);
-            nextBillingDatePoster.insertNextBillingNotification(entitySqlDaoWrapperFactory, accountId, subscriptionId, callbackDateTimeUTC);
+            nextBillingDatePoster.insertNextBillingNotification(entitySqlDaoWrapperFactory, accountId, subscriptionId, callbackDateTimeUTC, userToken);
         }
     }
 
diff --git a/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java b/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java
index 25bb819..74914d7 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java
@@ -73,9 +73,9 @@ public class InvoiceListener {
         }
     }
 
-    public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+    public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
         try {
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
             dispatcher.processSubscription(subscriptionId, eventDateTime, context);
         } catch (InvoiceApiException e) {
             log.error(e.getMessage());
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 6e3a8d7..757859d 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -82,7 +82,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
 
         final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
             @Override
-            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final Long accountRecordId, final Long tenantRecordId) {
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                 try {
                     if (!(notificationKey instanceof NextBillingDateNotificationKey)) {
                         log.error("Invoice service received an unexpected event type {}", notificationKey.getClass().getName());
@@ -95,7 +95,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
                         if (subscription == null) {
                             log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")");
                         } else {
-                            processEvent(key.getUuidKey(), eventDate, accountRecordId, tenantRecordId);
+                            processEvent(key.getUuidKey(), eventDate, userToken, accountRecordId, tenantRecordId);
                         }
                     } catch (EntitlementUserApiException e) {
                         log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")", e);
@@ -125,7 +125,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
         }
     }
 
-    private void processEvent(final UUID subscriptionId, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
-        listener.handleNextBillingDateEvent(subscriptionId, eventDateTime, accountRecordId, tenantRecordId);
+    private void processEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+        listener.handleNextBillingDateEvent(subscriptionId, eventDateTime, userToken, accountRecordId, tenantRecordId);
     }
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
index f923aff..c17af59 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
@@ -52,8 +52,8 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
 
     @Override
     public void insertNextBillingNotification(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId,
-                                              final UUID subscriptionId, final DateTime futureNotificationTime) {
-        final InternalCallContext context = createCallContext(accountId);
+                                              final UUID subscriptionId, final DateTime futureNotificationTime, final UUID userToken) {
+        final InternalCallContext context = createCallContext(accountId, userToken);
 
         final NotificationQueue nextBillingQueue;
         try {
@@ -61,7 +61,7 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
                                                                              DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
             log.info("Queuing next billing date notification at {} for subscriptionId {}", futureNotificationTime.toString(), subscriptionId.toString());
 
-            nextBillingQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationTime, accountId,
+            nextBillingQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationTime,
                                                                      new NextBillingDateNotificationKey(subscriptionId), context);
         } catch (NoSuchNotificationQueue e) {
             log.error("Attempting to put items on a non-existent queue (NextBillingDateNotifier).", e);
@@ -70,7 +70,7 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
         }
     }
 
-    private InternalCallContext createCallContext(final UUID accountId) {
-        return internalCallContextFactory.createInternalCallContext(accountId, "NextBillingDatePoster", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+    private InternalCallContext createCallContext(final UUID accountId, final UUID userToken) {
+        return internalCallContextFactory.createInternalCallContext(accountId, "NextBillingDatePoster", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
     }
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
index 95955f7..1a8da73 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
@@ -25,7 +25,6 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 
 public interface NextBillingDatePoster {
 
-    void insertNextBillingNotification(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, UUID accountId,
-                                       UUID subscriptionId, DateTime futureNotificationTime);
-
+    void insertNextBillingNotification(EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, UUID accountId,
+                                       UUID subscriptionId, DateTime futureNotificationTime, UUID userToken);
 }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java b/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java
index 6b77d80..54091f5 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java
@@ -26,6 +26,7 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 public class MockNextBillingDatePoster implements NextBillingDatePoster {
 
     @Override
-    public void insertNextBillingNotification(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId, final UUID subscriptionId, final DateTime futureNotificationTime) {
+    public void insertNextBillingNotification(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId,
+                                              final UUID subscriptionId, final DateTime futureNotificationTime, final UUID userToken) {
     }
 }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
index 3b9a47e..affe419 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
@@ -92,8 +92,7 @@ public class TestNextBillingDateNotifier extends InvoiceTestSuiteWithEmbeddedDB 
         }
 
         @Override
-        public void handleNextBillingDateEvent(final UUID subscriptionId,
-                                               final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+        public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
             eventCount++;
             latestSubscriptionId = subscriptionId;
         }
@@ -183,7 +182,7 @@ public class TestNextBillingDateNotifier extends InvoiceTestSuiteWithEmbeddedDB 
         entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
             @Override
             public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
-                poster.insertNextBillingNotification(entitySqlDaoWrapperFactory, accountId, subscriptionId, readyTime);
+                poster.insertNextBillingNotification(entitySqlDaoWrapperFactory, accountId, subscriptionId, readyTime, UUID.randomUUID());
                 return null;
             }
         });
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
index 39b7ef4..67e783e 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -16,15 +16,16 @@
 
 package com.ning.billing.ovedue.notification;
 
+import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.overdue.OverdueProperties;
 import com.ning.billing.overdue.listener.OverdueListener;
 import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService;
@@ -69,7 +70,7 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
 
         final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
             @Override
-            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final Long accountRecordId, final Long tenantRecordId) {
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                 try {
                     if (!(notificationKey instanceof OverdueCheckNotificationKey)) {
                         log.error("Overdue service received Unexpected notificationKey {}", notificationKey.getClass().getName());
@@ -77,7 +78,7 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
                     }
 
                     final OverdueCheckNotificationKey key = (OverdueCheckNotificationKey) notificationKey;
-                    listener.handleNextOverdueCheck(key, accountRecordId, tenantRecordId);
+                    listener.handleNextOverdueCheck(key, userToken, accountRecordId, tenantRecordId);
                 } catch (IllegalArgumentException e) {
                     log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
                 }
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
index ed401f1..2cb149e 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -52,7 +52,7 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
                                                                               DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
             log.info("Queuing overdue check notification. id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
 
-            checkOverdueQueue.recordFutureNotification(futureNotificationTime, null, new OverdueCheckNotificationKey(overdueable.getId(), Blockable.Type.get(overdueable)), context);
+            checkOverdueQueue.recordFutureNotification(futureNotificationTime, new OverdueCheckNotificationKey(overdueable.getId(), Blockable.Type.get(overdueable)), context);
         } catch (NoSuchNotificationQueue e) {
             log.error("Attempting to put items on a non-existent queue (DefaultOverdueCheck).", e);
         } catch (IOException e) {
diff --git a/overdue/src/main/java/com/ning/billing/overdue/listener/OverdueListener.java b/overdue/src/main/java/com/ning/billing/overdue/listener/OverdueListener.java
index 86af3de..1733da5 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/listener/OverdueListener.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/listener/OverdueListener.java
@@ -67,10 +67,10 @@ public class OverdueListener {
         dispatcher.processOverdueForAccount(accountId, createCallContext(event.getUserToken(), event.getAccountRecordId(), event.getTenantRecordId()));
     }
 
-    public void handleNextOverdueCheck(final OverdueCheckNotificationKey notificationKey, final Long accountRecordId, final Long tenantRecordId) {
+    public void handleNextOverdueCheck(final OverdueCheckNotificationKey notificationKey, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
         log.info(String.format("Received OD checkup notification for type = %s, id = %s",
                 notificationKey.getType(), notificationKey.getUuidKey()));
-        dispatcher.processOverdue(notificationKey.getType(), notificationKey.getUuidKey(), createCallContext(null, accountRecordId, tenantRecordId));
+        dispatcher.processOverdue(notificationKey.getType(), notificationKey.getUuidKey(), createCallContext(userToken, accountRecordId, tenantRecordId));
     }
 
     private InternalCallContext createCallContext(final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
diff --git a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
index d91d213..34aca06 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
@@ -16,9 +16,6 @@
 
 package com.ning.billing.overdue.notification;
 
-import static com.jayway.awaitility.Awaitility.await;
-import static java.util.concurrent.TimeUnit.MINUTES;
-
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.UUID;
@@ -38,8 +35,6 @@ import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
 import com.ning.billing.catalog.DefaultCatalogService;
 import com.ning.billing.catalog.api.CatalogService;
-import com.ning.billing.util.config.CatalogConfig;
-import com.ning.billing.util.config.InvoiceConfig;
 import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 import com.ning.billing.entitlement.api.user.Subscription;
@@ -62,8 +57,10 @@ import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
-import com.ning.billing.util.customfield.dao.DefaultCustomFieldDao;
+import com.ning.billing.util.config.CatalogConfig;
+import com.ning.billing.util.config.InvoiceConfig;
 import com.ning.billing.util.customfield.dao.CustomFieldDao;
+import com.ning.billing.util.customfield.dao.DefaultCustomFieldDao;
 import com.ning.billing.util.email.EmailModule;
 import com.ning.billing.util.email.templates.TemplateModule;
 import com.ning.billing.util.globallocker.GlobalLocker;
@@ -81,6 +78,9 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Stage;
 
+import static com.jayway.awaitility.Awaitility.await;
+import static java.util.concurrent.TimeUnit.MINUTES;
+
 public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
     private Clock clock;
     private DefaultOverdueCheckNotifier notifier;
@@ -98,7 +98,7 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
         }
 
         @Override
-        public void handleNextOverdueCheck(final OverdueCheckNotificationKey key, final Long accountRecordId, final Long tenantRecordId) {
+        public void handleNextOverdueCheck(final OverdueCheckNotificationKey key, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
             eventCount++;
             latestSubscriptionId = key.getUuidKey();
         }
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
index 2828c63..d1b78cc 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
@@ -66,13 +66,13 @@ public abstract class BaseRetryService implements RetryService {
                                                                       getQueueName(),
                                                                       new NotificationQueueHandler() {
                                                                           @Override
-                                                                          public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+                                                                          public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                                                                               if (!(notificationKey instanceof PaymentRetryNotificationKey)) {
                                                                                   log.error("Payment service got an unexpected notification type {}", notificationKey.getClass().getName());
                                                                                   return;
                                                                               }
                                                                               final PaymentRetryNotificationKey key = (PaymentRetryNotificationKey) notificationKey;
-                                                                              final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, null);
+                                                                              final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
                                                                               retry(key.getUuidKey(), callContext);
                                                                           }
                                                                       },
@@ -141,9 +141,9 @@ public abstract class BaseRetryService implements RetryService {
                 final NotificationKey key = new PaymentRetryNotificationKey(paymentId);
                 if (retryQueue != null) {
                     if (transactionalDao == null) {
-                        retryQueue.recordFutureNotification(timeOfRetry, null, key, context);
+                        retryQueue.recordFutureNotification(timeOfRetry, key, context);
                     } else {
-                        retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, null, key, context);
+                        retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, key, context);
                     }
                 }
             } catch (NoSuchNotificationQueue e) {
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
index c77521d..98be956 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.util.bus.dao;
 
+import java.util.UUID;
+
 import org.joda.time.DateTime;
 
 import com.ning.billing.util.queue.PersistentQueueEntryLifecycle;
@@ -29,12 +31,13 @@ public class BusEventEntry implements PersistentQueueEntryLifecycle {
     private final PersistentQueueEntryLifecycleState processingState;
     private final String busEventClass;
     private final String busEventJson;
+    private final UUID userToken;
     private final Long accountRecordId;
     private final Long tenantRecordId;
 
     public BusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable,
                          final PersistentQueueEntryLifecycleState processingState, final String busEventClass, final String busEventJson,
-                         final Long accountRecordId, final Long tenantRecordId) {
+                         final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
         this.id = id;
         this.createdOwner = createdOwner;
         this.owner = owner;
@@ -42,13 +45,14 @@ public class BusEventEntry implements PersistentQueueEntryLifecycle {
         this.processingState = processingState;
         this.busEventClass = busEventClass;
         this.busEventJson = busEventJson;
+        this.userToken = userToken;
         this.accountRecordId = accountRecordId;
         this.tenantRecordId = tenantRecordId;
     }
 
     public BusEventEntry(final String createdOwner, final String busEventClass, final String busEventJson,
-                         final Long accountRecordId, final Long tenantRecordId) {
-        this(0, createdOwner, null, null, null, busEventClass, busEventJson, accountRecordId, tenantRecordId);
+                         final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+        this(0, createdOwner, null, null, null, busEventClass, busEventJson, userToken, accountRecordId, tenantRecordId);
     }
 
     public long getId() {
@@ -64,6 +68,11 @@ public class BusEventEntry implements PersistentQueueEntryLifecycle {
     }
 
     @Override
+    public UUID getUserToken() {
+        return userToken;
+    }
+
+    @Override
     public String getOwner() {
         return owner;
     }
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
index 0f2ad73..3203c11 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.bus.dao;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Date;
+import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.SQLStatement;
@@ -82,6 +83,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
         public void bind(@SuppressWarnings("rawtypes") final SQLStatement stmt, final Bind bind, final BusEventEntry evt) {
             stmt.bind("className", evt.getBusEventClass());
             stmt.bind("eventJson", evt.getBusEventJson());
+            stmt.bind("userToken", getUUIDString(evt.getUserToken()));
             stmt.bind("createdDate", getDate(new DateTime()));
             stmt.bind("creatingOwner", evt.getCreatedOwner());
             stmt.bind("processingAvailableDate", getDate(evt.getNextAvailableDate()));
@@ -100,6 +102,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
             final String className = r.getString("class_name");
             final String createdOwner = r.getString("creating_owner");
             final String eventJson = r.getString("event_json");
+            final UUID userToken = getUUID(r, "user_token");
             final DateTime nextAvailableDate = getDateTime(r, "processing_available_date");
             final String processingOwner = r.getString("processing_owner");
             final PersistentQueueEntryLifecycleState processingState = PersistentQueueEntryLifecycleState.valueOf(r.getString("processing_state"));
@@ -107,7 +110,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
             final Long tenantRecordId = r.getLong("tenant_record_id");
 
             return new BusEventEntry(recordId, createdOwner, processingOwner, nextAvailableDate, processingState, className,
-                                     eventJson, accountRecordId, tenantRecordId);
+                                     eventJson, userToken, accountRecordId, tenantRecordId);
         }
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
index 859fce8..57122fe 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ThreadFactory;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -182,16 +181,15 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
     private void postFromTransaction(final BusInternalEvent event, final InternalCallContext context, final PersistentBusSqlDao transactional) {
         try {
             final String json = objectMapper.writeValueAsString(event);
-            final BusEventEntry entry = new BusEventEntry(hostname, event.getClass().getName(), json, context.getAccountRecordId(), context.getTenantRecordId());
+            final BusEventEntry entry = new BusEventEntry(hostname, event.getClass().getName(), json, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId());
             transactional.insertBusEvent(entry, context);
         } catch (Exception e) {
             log.error("Failed to post BusEvent " + event, e);
         }
     }
 
-
     private String tweakJsonToIncludeAccountAndTenantRecordId(final String input, final Long accountRecordId, final Long tenantRecordId) {
-        int lastIndexPriorFinalBracket = input.lastIndexOf("}");
+        final int lastIndexPriorFinalBracket = input.lastIndexOf("}");
         final StringBuilder tmp = new StringBuilder(input.substring(0, lastIndexPriorFinalBracket));
         tmp.append(",\"accountRecordId\":");
         tmp.append(accountRecordId);
@@ -200,5 +198,4 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
         tmp.append("}");
         return tmp.toString();
     }
-
 }
diff --git a/util/src/main/java/com/ning/billing/util/dao/BinderBase.java b/util/src/main/java/com/ning/billing/util/dao/BinderBase.java
index 27cb985..48f2534 100644
--- a/util/src/main/java/com/ning/billing/util/dao/BinderBase.java
+++ b/util/src/main/java/com/ning/billing/util/dao/BinderBase.java
@@ -17,11 +17,17 @@
 package com.ning.billing.util.dao;
 
 import java.util.Date;
+import java.util.UUID;
 
 import org.joda.time.DateTime;
 
 public abstract class BinderBase {
+
     protected Date getDate(final DateTime dateTime) {
         return dateTime == null ? null : dateTime.toDate();
     }
+
+    protected String getUUIDString(final UUID uuid) {
+        return uuid == null ? null : uuid.toString();
+    }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 811846e..66d6688 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -60,7 +60,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
 
     @SqlQuery
     @Mapper(NotificationSqlMapper.class)
-    public List<Notification> getNotificationForAccountAndDate(@Bind("accountId") final String accountId,
+    public List<Notification> getNotificationForAccountAndDate(@Bind("accountRecordId") final long accountRecordId,
                                                                @Bind("effectiveDate") final Date effectiveDate,
                                                                @InternalTenantContextBinder final InternalTenantContext context);
 
@@ -102,7 +102,8 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             stmt.bind("createdDate", getDate(new DateTime()));
             stmt.bind("creatingOwner", evt.getCreatedOwner());
             stmt.bind("className", evt.getNotificationKeyClass());
-            stmt.bind("accountId", evt.getAccountId() != null ? evt.getAccountId().toString() : null);
+            // The current user token will be bound with the InternalTenantContextBinder
+            stmt.bind("futureUserToken", getUUIDString(evt.getFutureUserToken()));
             stmt.bind("notificationKey", evt.getNotificationKey());
             stmt.bind("effectiveDate", getDate(evt.getEffectiveDate()));
             stmt.bind("queueName", evt.getQueueName());
@@ -123,7 +124,8 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             final String createdOwner = r.getString("creating_owner");
             final String className = r.getString("class_name");
             final String notificationKey = r.getString("notification_key");
-            final UUID accountId = getUUID(r, "account_id");
+            final UUID userToken = getUUID(r, "user_token");
+            final UUID futureUserToken = getUUID(r, "future_user_token");
             final String queueName = r.getString("queue_name");
             final DateTime effectiveDate = getDateTime(r, "effective_date");
             final DateTime nextAvailableDate = getDateTime(r, "processing_available_date");
@@ -133,7 +135,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             final Long tenantRecordId = r.getLong("tenant_record_id");
 
             return new DefaultNotification(ordering, id, createdOwner, processingOwner, queueName, nextAvailableDate,
-                                           processingState, className, notificationKey, accountId, effectiveDate,
+                                           processingState, className, notificationKey, userToken, futureUserToken, effectiveDate,
                                            accountRecordId, tenantRecordId);
         }
     }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 848ed9a..c5310a4 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -32,15 +32,16 @@ public class DefaultNotification extends EntityBase implements Notification {
     private final PersistentQueueEntryLifecycleState lifecycleState;
     private final String notificationKeyClass;
     private final String notificationKey;
+    private final UUID userToken;
+    private final UUID futureUserToken;
     private final DateTime effectiveDate;
-    private final UUID accountId;
     private final Long accountRecordId;
     private final Long tenantRecordId;
 
     public DefaultNotification(final long ordering, final UUID id, final String createdOwner, final String owner, final String queueName,
                                final DateTime nextAvailableDate, final PersistentQueueEntryLifecycleState lifecycleState,
-                               final String notificationKeyClass, final String notificationKey, final UUID accountId, final DateTime effectiveDate,
-                               final Long accountRecordId, final Long tenantRecordId) {
+                               final String notificationKeyClass, final String notificationKey, final UUID userToken, final UUID futureUserToken,
+                               final DateTime effectiveDate, final Long accountRecordId, final Long tenantRecordId) {
         super(id);
         this.ordering = ordering;
         this.owner = owner;
@@ -50,17 +51,18 @@ public class DefaultNotification extends EntityBase implements Notification {
         this.lifecycleState = lifecycleState;
         this.notificationKeyClass = notificationKeyClass;
         this.notificationKey = notificationKey;
-        this.accountId = accountId;
+        this.userToken = userToken;
+        this.futureUserToken = futureUserToken;
         this.effectiveDate = effectiveDate;
         this.accountRecordId = accountRecordId;
         this.tenantRecordId = tenantRecordId;
     }
 
     public DefaultNotification(final String queueName, final String createdOwner, final String notificationKeyClass,
-                               final String notificationKey, final UUID accountId, final DateTime effectiveDate,
+                               final String notificationKey, final UUID userToken, final UUID futureUserToken, final DateTime effectiveDate,
                                final Long accountRecordId, final Long tenantRecordId) {
         this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE,
-             notificationKeyClass, notificationKey, accountId, effectiveDate, accountRecordId, tenantRecordId);
+             notificationKeyClass, notificationKey, userToken, futureUserToken, effectiveDate, accountRecordId, tenantRecordId);
     }
 
     @Override
@@ -128,8 +130,13 @@ public class DefaultNotification extends EntityBase implements Notification {
     }
 
     @Override
-    public UUID getAccountId() {
-        return accountId;
+    public UUID getUserToken() {
+        return userToken;
+    }
+
+    @Override
+    public UUID getFutureUserToken() {
+        return futureUserToken;
     }
 
     @Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 6f58a2e..b054d93 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -20,31 +20,36 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import javax.annotation.Nullable;
 
 import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
+import com.google.common.collect.ImmutableList;
+
 public class DefaultNotificationQueue extends NotificationQueueBase {
 
     private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
 
+    private final IDBI dbi;
     private final NotificationSqlDao dao;
     private final InternalCallContextFactory internalCallContextFactory;
 
@@ -52,6 +57,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
                                     final NotificationQueueHandler handler, final NotificationConfig config,
                                     final InternalCallContextFactory internalCallContextFactory) {
         super(clock, svcName, queueName, handler, config);
+        this.dbi = dbi;
         this.dao = dbi.onDemand(NotificationSqlDao.class);
         this.internalCallContextFactory = internalCallContextFactory;
     }
@@ -60,7 +66,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     public int doProcessEvents() {
         logDebug("ENTER doProcessEvents");
         // Finding and claiming notifications is not done per tenant (yet?)
-        final List<Notification> notifications = getReadyNotifications(createCallContext(null, null));
+        final List<Notification> notifications = getReadyNotifications(createCallContext(null, null, null));
         if (notifications.size() == 0) {
             logDebug("EXIT doProcessEvents");
             return 0;
@@ -73,9 +79,9 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
             getNbProcessedEvents().incrementAndGet();
             logDebug("handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
             final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
-            getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
+            getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getFutureUserToken(), cur.getAccountRecordId(), cur.getTenantRecordId());
             result++;
-            clearNotification(cur, createCallContext(cur.getTenantRecordId(), cur.getAccountRecordId()));
+            clearNotification(cur, createCallContext(cur.getUserToken(), cur.getTenantRecordId(), cur.getAccountRecordId()));
             logDebug("done handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
         }
 
@@ -84,30 +90,31 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
 
     @Override
     public void recordFutureNotification(final DateTime futureNotificationTime,
-                                         final UUID accountId,
                                          final NotificationKey notificationKey,
                                          final InternalCallContext context) throws IOException {
-        recordFutureNotificationInternal(futureNotificationTime, accountId, notificationKey, dao, context);
+        recordFutureNotificationInternal(futureNotificationTime, notificationKey, dao, context);
     }
 
     @Override
     public void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
                                                         final DateTime futureNotificationTime,
-                                                        final UUID accountId,
                                                         final NotificationKey notificationKey,
                                                         final InternalCallContext context) throws IOException {
         final NotificationSqlDao transactionalNotificationDao = transactionalDao.transmogrify(NotificationSqlDao.class);
-        recordFutureNotificationInternal(futureNotificationTime, accountId, notificationKey, transactionalNotificationDao, context);
+        recordFutureNotificationInternal(futureNotificationTime, notificationKey, transactionalNotificationDao, context);
     }
 
     private void recordFutureNotificationInternal(final DateTime futureNotificationTime,
-                                                  final UUID accountId,
                                                   final NotificationKey notificationKey,
                                                   final NotificationSqlDao thisDao,
                                                   final InternalCallContext context) throws IOException {
         final String json = objectMapper.writeValueAsString(notificationKey);
+        // Create a new user token. This will be used in the future, when this notification is triggered, to trace
+        // generated bus events
+        final UUID futureUserToken = UUID.randomUUID();
         final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json,
-                                                                  accountId, futureNotificationTime, context.getAccountRecordId(), context.getTenantRecordId());
+                                                                  context.getUserToken(), futureUserToken, futureNotificationTime,
+                                                                  context.getAccountRecordId(), context.getTenantRecordId());
         thisDao.insertNotification(notification, context);
     }
 
@@ -158,7 +165,24 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
 
     @Override
     public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
-        return dao.getNotificationForAccountAndDate(accountId.toString(), effectiveDate.toDate(), context);
+        // TODO we have the same use case in InternalCallContextFactory, do we need some sort of helper class?
+        final Long accountRecordId = dbi.withHandle(new HandleCallback<Long>() {
+            @Override
+            public Long withHandle(final Handle handle) throws Exception {
+                final List<Map<String, Object>> values = handle.select("select record_id from accounts where id = " + accountId.toString());
+                if (values.size() == 0) {
+                    return null;
+                } else {
+                    return (Long) values.get(0).get("record_id");
+                }
+            }
+        });
+
+        if (accountId == null) {
+            return ImmutableList.<Notification>of();
+        } else {
+            return dao.getNotificationForAccountAndDate(accountRecordId, effectiveDate.toDate(), context);
+        }
     }
 
     @Override
@@ -166,7 +190,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
         dao.removeNotification(notificationId.toString(), context);
     }
 
-    private InternalCallContext createCallContext(@Nullable final Long tenantRecordId, @Nullable final Long accountRecordId) {
-        return internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "NotificationQueue", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+    private InternalCallContext createCallContext(final UUID userToken, @Nullable final Long tenantRecordId, @Nullable final Long accountRecordId) {
+        return internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "NotificationQueue", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index a790d15..4c28ff5 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -35,6 +35,8 @@ public interface Notification extends PersistentQueueEntryLifecycle, Entity {
 
     public String getQueueName();
 
-    // TODO - do we still need it now we have account_record_id?
-    public UUID getAccountId();
+    // Future user token, i.e. user token of the context when this notification will be claimed.
+    // The user token can be used as a trace to follow events (e.g. all bus events triggered as a result of a
+    // claimed notification will share the same user token)
+    public UUID getFutureUserToken();
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index e592674..22cb36a 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -21,10 +21,8 @@ import java.util.List;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 import com.ning.billing.util.callcontext.InternalCallContext;
-import com.ning.billing.util.entity.dao.EntityDao;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.queue.QueueLifecycle;
@@ -38,7 +36,6 @@ public interface NotificationQueue extends QueueLifecycle {
      * @param notificationKey        the key for that notification
      */
     public void recordFutureNotification(final DateTime futureNotificationTime,
-                                         final UUID accountId,
                                          final NotificationKey notificationKey,
                                          final InternalCallContext context)
             throws IOException;
@@ -52,7 +49,6 @@ public interface NotificationQueue extends QueueLifecycle {
      */
     public void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
                                                         final DateTime futureNotificationTime,
-                                                        final UUID accountId,
                                                         final NotificationKey notificationKey,
                                                         final InternalCallContext context)
             throws IOException;
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index d6df6dd..f83a1ec 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.util.notificationq;
 
+import java.util.UUID;
+
 import org.joda.time.DateTime;
 
 import com.ning.billing.util.config.NotificationConfig;
@@ -28,10 +30,11 @@ public interface NotificationQueueService {
          * Called for each notification ready
          *
          * @param notificationKey the notification key associated to that notification entry
+         * @param userToken user token associated with that notification entry
          * @param accountRecordId account record id associated with that notification entry
          * @param tenantRecordId  tenant record id associated with that notification entry
          */
-        public void handleReadyNotification(NotificationKey notificationKey, DateTime eventDateTime, Long accountRecordId, Long tenantRecordId);
+        public void handleReadyNotification(NotificationKey notificationKey, DateTime eventDateTime, UUID userToken, Long accountRecordId, Long tenantRecordId);
     }
 
     public static final class NotificationQueueAlreadyExists extends Exception {
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java
index ea4ab88..922ed34 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.util.queue;
 
+import java.util.UUID;
+
 import org.joda.time.DateTime;
 
 public interface PersistentQueueEntryLifecycle {
@@ -40,4 +42,8 @@ public interface PersistentQueueEntryLifecycle {
     public PersistentQueueEntryLifecycleState getProcessingState();
 
     public boolean isAvailableForProcessing(DateTime now);
+
+    // User token associated with this bus event or notification (i.e. user token from the context that
+    // was used to generate this PersistentQueueEntryLifecycle)
+    public UUID getUserToken();
 }
diff --git a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
index 5d67ee3..18ec252 100644
--- a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
@@ -8,6 +8,7 @@ getNextBusEventEntry() ::= <<
       record_id
       , class_name
       , event_json
+      , user_token
       , created_date
       , creating_owner
       , processing_owner
@@ -64,6 +65,7 @@ insertBusEvent() ::= <<
     insert into bus_events (
       class_name
     , event_json
+    , user_token
     , created_date
     , creating_owner
     , processing_owner
@@ -74,6 +76,7 @@ insertBusEvent() ::= <<
     ) values (
       :className
     , :eventJson
+    , :userToken
     , :createdDate
     , :creatingOwner
     , :processingOwner
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 404ba91..647fa9c 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -128,8 +128,9 @@ CREATE TABLE notifications (
     id char(36) NOT NULL,
     created_date datetime NOT NULL,
     class_name varchar(256) NOT NULL,
-    account_id  char(36),
     notification_key varchar(2048) NOT NULL,
+    user_token char(36),
+    future_user_token char(36),
     creating_owner char(50) NOT NULL,
     effective_date datetime NOT NULL,
     queue_name char(64) NOT NULL,
@@ -181,8 +182,9 @@ CREATE INDEX audit_log_tenant_account_record_id ON audit_log(tenant_record_id, a
 DROP TABLE IF EXISTS bus_events;
 CREATE TABLE bus_events (
     record_id int(11) unsigned NOT NULL AUTO_INCREMENT,
-    class_name varchar(128) NOT NULL, 
-    event_json varchar(2048) NOT NULL,     
+    class_name varchar(128) NOT NULL,
+    event_json varchar(2048) NOT NULL,
+    user_token char(36),
     created_date datetime NOT NULL,
     creating_owner char(50) NOT NULL,
     processing_owner char(50) DEFAULT NULL,
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 8287ea4..3b3aefc 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -8,8 +8,9 @@ getReadyNotifications() ::= <<
       record_id
       , id
       , class_name
-      , account_id
       , notification_key
+      , user_token
+      , future_user_token
       , created_date
       , creating_owner
       , effective_date
@@ -37,11 +38,12 @@ getReadyNotifications() ::= <<
 
 getNotificationForAccountAndDate() ::= <<
    select
-     record_id
+       record_id
      , id
      , class_name
-     , account_id
      , notification_key
+     , user_token
+     , future_user_token
      , created_date
      , creating_owner
      , effective_date
@@ -53,7 +55,7 @@ getNotificationForAccountAndDate() ::= <<
      , tenant_record_id
    from notifications
    where
-   account_id = :accountId AND effective_date = :effectiveDate
+   account_record_id = :accountRecordId AND effective_date = :effectiveDate
    ;
 >>
 
@@ -102,8 +104,9 @@ insertNotification() ::= <<
     insert into notifications (
       id
       , class_name
-      , account_id
       , notification_key
+      , user_token
+      , future_user_token
       , created_date
       , creating_owner
       , effective_date
@@ -116,8 +119,9 @@ insertNotification() ::= <<
     ) values (
       :id
       , :className
-      , :accountId
       , :notificationKey
+      , :userToken
+      , :futureUserToken
       , :createdDate
       , :creatingOwner
       , :effectiveDate
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 54be599..264f10c 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -26,7 +26,6 @@ import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.testng.Assert;
 import org.testng.annotations.BeforeSuite;
-import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
@@ -46,7 +45,6 @@ import static org.testng.Assert.assertNotNull;
 
 @Guice(modules = TestNotificationSqlDao.TestNotificationSqlDaoModule.class)
 public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
-    private static final UUID accountId = UUID.randomUUID();
     private static final String hostname = "Yop";
 
     @Inject
@@ -62,26 +60,15 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
         dao = dbi.onDemand(NotificationSqlDao.class);
     }
 
-    @BeforeTest(groups = "slow")
-    public void cleanupDb() {
-        dbi.withHandle(new HandleCallback<Void>() {
-            @Override
-            public Void withHandle(final Handle handle) throws Exception {
-                handle.execute("delete from notifications");
-                handle.execute("delete from claimed_notifications");
-                return null;
-            }
-        });
-    }
-
     @Test(groups = "slow")
     public void testBasic() throws InterruptedException {
+        final long accountRecordId = 1242L;
         final String ownerId = UUID.randomUUID().toString();
 
         final String notificationKey = UUID.randomUUID().toString();
         final DateTime effDt = new DateTime();
-        final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt,
-                                                           null, internalCallContext.getTenantRecordId());
+        final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
+                                                           accountRecordId, internalCallContext.getTenantRecordId());
         dao.insertNotification(notif, internalCallContext);
 
         Thread.sleep(1000);
@@ -121,24 +108,25 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
 
     @Test(groups = "slow")
     public void testGetByAccountAndDate() throws InterruptedException {
+        final long accountRecordId = 1242L;
         final String notificationKey = UUID.randomUUID().toString();
         final DateTime effDt = new DateTime();
-        final Notification notif1 = new DefaultNotification("testBasic1", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt,
-                                                            null, internalCallContext.getTenantRecordId());
+        final Notification notif1 = new DefaultNotification("testBasic1", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
+                                                            accountRecordId, internalCallContext.getTenantRecordId());
         dao.insertNotification(notif1, internalCallContext);
 
-        final Notification notif2 = new DefaultNotification("testBasic2", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt,
-                                                            null, internalCallContext.getTenantRecordId());
+        final Notification notif2 = new DefaultNotification("testBasic2", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
+                                                            accountRecordId, internalCallContext.getTenantRecordId());
         dao.insertNotification(notif2, internalCallContext);
 
-        List<Notification> notifications = dao.getNotificationForAccountAndDate(accountId.toString(), effDt.toDate(), internalCallContext);
+        List<Notification> notifications = dao.getNotificationForAccountAndDate(accountRecordId, effDt.toDate(), internalCallContext);
         assertEquals(notifications.size(), 2);
         for (final Notification cur : notifications) {
             Assert.assertEquals(cur.getProcessingState(), PersistentQueueEntryLifecycleState.AVAILABLE);
             dao.removeNotification(cur.getId().toString(), internalCallContext);
         }
 
-        notifications = dao.getNotificationForAccountAndDate(accountId.toString(), effDt.toDate(), internalCallContext);
+        notifications = dao.getNotificationForAccountAndDate(accountRecordId, effDt.toDate(), internalCallContext);
         assertEquals(notifications.size(), 2);
         for (final Notification cur : notifications) {
             Assert.assertEquals(cur.getProcessingState(), PersistentQueueEntryLifecycleState.REMOVED);
@@ -153,8 +141,9 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
                                                   " record_id " +
                                                   ", id" +
                                                   ", class_name" +
-                                                  ", account_id" +
                                                   ", notification_key" +
+                                                  ", user_token" +
+                                                  ", future_user_token" +
                                                   ", created_date" +
                                                   ", creating_owner" +
                                                   ", effective_date" +
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index ec917f2..2ed6079 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -24,11 +24,10 @@ import java.util.TreeSet;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
-import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
@@ -56,11 +55,10 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    public void recordFutureNotification(final DateTime futureNotificationTime, final UUID accountId,
-                                         final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
+    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
         final String json = objectMapper.writeValueAsString(notificationKey);
-        final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, accountId, futureNotificationTime,
-                                                                  null, 0L);
+        final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, context.getUserToken(),
+                                                                  UUID.randomUUID(), futureNotificationTime, null, 0L);
         synchronized (notifications) {
             notifications.add(notification);
         }
@@ -68,8 +66,8 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
 
     @Override
     public void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao, final DateTime futureNotificationTime,
-                                                        final UUID accountId, final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
-        recordFutureNotification(futureNotificationTime, accountId, notificationKey, context);
+                                                        final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
+        recordFutureNotification(futureNotificationTime, notificationKey, context);
     }
 
     public List<Notification> getPendingEvents() {
@@ -102,11 +100,11 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
         result = readyNotifications.size();
         for (final Notification cur : readyNotifications) {
             final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
-            getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
+            getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getFutureUserToken(), cur.getAccountRecordId(), cur.getTenantRecordId());
             final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
                                                                                       "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
                                                                                       PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
-                                                                                      cur.getNotificationKey(), cur.getAccountId(), cur.getEffectiveDate(),
+                                                                                      cur.getNotificationKey(), cur.getUserToken(), UUID.randomUUID(), cur.getEffectiveDate(),
                                                                                       cur.getAccountRecordId(), cur.getTenantRecordId());
             oldNotifications.add(cur);
             processedNotifications.add(processedNotification);
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index fd7da4e..03ea6ae 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -24,11 +24,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.joda.time.DateTime;
-import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.Transaction;
-import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -38,12 +34,12 @@ import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import com.ning.billing.KillbillTestSuiteWithEmbeddedDB;
-import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
@@ -66,10 +62,8 @@ import static org.testng.Assert.assertEquals;
 
 @Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
 public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
-    private final Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
-
 
-    private static final UUID accountId = UUID.randomUUID();
+    private final Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
 
     private EntitySqlDaoTransactionalJdbiWrapper entitySqlDaoTransactionalJdbiWrapper;
 
@@ -82,11 +76,10 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
     @Inject
     private Clock clock;
 
-    private DummySqlTest dao;
-
     private int eventsReceived;
 
     private static final class TestNotificationKey implements NotificationKey, Comparable<TestNotificationKey> {
+
         private final String value;
 
         @JsonCreator
@@ -109,22 +102,11 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
     public void setup() throws Exception {
         final String testDdl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl_test.sql"));
         helper.initDb(testDdl);
-        dao = dbi.onDemand(DummySqlTest.class);
         entitySqlDaoTransactionalJdbiWrapper = new EntitySqlDaoTransactionalJdbiWrapper(dbi);
     }
 
     @BeforeTest(groups = "slow")
     public void beforeTest() {
-        dbi.withHandle(new HandleCallback<Void>() {
-
-            @Override
-            public Void withHandle(final Handle handle) throws Exception {
-                handle.execute("delete from notifications");
-                handle.execute("delete from claimed_notifications");
-                handle.execute("delete from dummy");
-                return null;
-            }
-        });
         // Reset time to real value
         ((ClockMock) clock).resetDeltaFromReality();
         eventsReceived = 0;
@@ -144,7 +126,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
                                                                             new NotificationQueueHandler() {
                                                                                 @Override
-                                                                                public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                                                                                     synchronized (expectedNotifications) {
                                                                                         log.info("Handler received key: " + notificationKey);
 
@@ -172,7 +154,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
             public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
 
                 entitySqlDaoWrapperFactory.transmogrify(DummySqlTest.class).insertDummy(obj);
-                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, accountId, notificationKey, internalCallContext);
+                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, notificationKey, internalCallContext);
                 log.info("Posted key: " + notificationKey);
 
                 return null;
@@ -201,7 +183,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
                                                                             new NotificationQueueHandler() {
                                                                                 @Override
-                                                                                public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                                                                                     synchronized (expectedNotifications) {
                                                                                         expectedNotifications.put(notificationKey, Boolean.TRUE);
                                                                                         expectedNotifications.notify();
@@ -232,12 +214,11 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
 
                     entitySqlDaoWrapperFactory.transmogrify(DummySqlTest.class).insertDummy(obj);
                     queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, now.plus((currentIteration + 1) * nextReadyTimeIncrementMs),
-                                                                  accountId, notificationKey, internalCallContext);
+                                                                  notificationKey, internalCallContext);
                     return null;
                 }
             });
 
-
             // Move time in the future after the notification effectiveDate
             if (i == 0) {
                 ((ClockMock) clock).setDeltaFromReality(nextReadyTimeIncrementMs);
@@ -304,7 +285,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
 
         final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
             @Override
-            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                 log.info("Fred received key: " + notificationKey);
                 expectedNotificationsFred.put(notificationKey, Boolean.TRUE);
                 eventsReceived++;
@@ -314,7 +295,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
 
         final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
             @Override
-            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                 log.info("Barney received key: " + notificationKey);
                 expectedNotificationsBarney.put(notificationKey, Boolean.TRUE);
                 eventsReceived++;
@@ -342,9 +323,9 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
             public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
                 entitySqlDaoWrapperFactory.transmogrify(DummySqlTest.class).insertDummy(obj);
 
-                queueFred.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, accountId, notificationKeyFred, internalCallContext);
+                queueFred.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, notificationKeyFred, internalCallContext);
                 log.info("posted key: " + notificationKeyFred.toString());
-                queueBarney.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, accountId, notificationKeyBarney, internalCallContext);
+                queueBarney.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, notificationKeyBarney, internalCallContext);
                 log.info("posted key: " + notificationKeyBarney.toString());
                 return null;
             }
@@ -396,7 +377,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
                                                                             new NotificationQueueHandler() {
                                                                                 @Override
-                                                                                public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
                                                                                     if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
                                                                                         log.info("Received notification with key: " + notificationKey);
                                                                                         eventsReceived++;
@@ -416,9 +397,9 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
             @Override
             public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
-                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(nextReadyTimeIncrementMs), accountId, notificationKey, internalCallContext);
-                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(2 * nextReadyTimeIncrementMs), accountId, notificationKey, internalCallContext);
-                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(3 * nextReadyTimeIncrementMs), accountId, notificationKey2, internalCallContext);
+                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(nextReadyTimeIncrementMs), notificationKey, internalCallContext);
+                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(2 * nextReadyTimeIncrementMs), notificationKey, internalCallContext);
+                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(3 * nextReadyTimeIncrementMs), notificationKey2, internalCallContext);
                 return null;
             }
         });
@@ -444,6 +425,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
     }
 
     public static class TestNotificationQueueModule extends AbstractModule {
+
         @Override
         protected void configure() {
             bind(Clock.class).to(ClockMock.class);