killbill-uncached

overdue: revisit notification queue queries Go with account

2/19/2013 10:20:47 PM

Details

diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
index 4090fc6..946d8b6 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -16,6 +16,7 @@
 
 package com.ning.billing.ovedue.notification;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.joda.time.DateTime;
@@ -35,11 +36,12 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.notificationq.Notification;
-import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+import com.ning.billing.util.queue.PersistentQueueBase;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
 
 public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
@@ -63,9 +65,6 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
         try {
             checkOverdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
                                                                               DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
-            log.info("Queuing overdue check notification. id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
-
-            final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(overdueable.getId(), Type.get(overdueable));
 
             transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
                 @Override
@@ -73,10 +72,10 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
                     boolean shouldInsertNewNotification = true;
 
                     // Check if we already have notifications for that key
-                    final List<Notification> futureNotificationsForKey = checkOverdueQueue.getFutureNotificationsForKeyFromTransaction(entitySqlDaoWrapperFactory, notificationKey, context);
-                    if (futureNotificationsForKey.size() > 0) {
+                    final List<Notification> futureNotifications = getFutureNotificationsForAccountAndOverdueableInTransaction(entitySqlDaoWrapperFactory, checkOverdueQueue, overdueable, context);
+                    if (futureNotifications.size() > 0) {
                         // Results are ordered by effective date asc
-                        final DateTime earliestExistingNotificationDate = futureNotificationsForKey.get(0).getEffectiveDate();
+                        final DateTime earliestExistingNotificationDate = futureNotifications.get(0).getEffectiveDate();
 
                         final int minIndexToDeleteFrom;
                         if (earliestExistingNotificationDate.isBefore(futureNotificationTime)) {
@@ -88,13 +87,17 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
                             minIndexToDeleteFrom = 0;
                         }
 
-                        for (int i = minIndexToDeleteFrom; i < futureNotificationsForKey.size(); i++) {
-                            checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationsForKey.get(i).getId(), context);
+                        for (int i = minIndexToDeleteFrom; i < futureNotifications.size(); i++) {
+                            checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotifications.get(i).getId(), context);
                         }
                     }
 
                     if (shouldInsertNewNotification) {
+                        log.info("Queuing overdue check notification. Overdueable id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
+                        final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(overdueable.getId(), Type.get(overdueable));
                         checkOverdueQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationTime, notificationKey, context);
+                    } else {
+                        log.info("Skipping queuing overdue check notification. Overdueable id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
                     }
 
                     return null;
@@ -107,20 +110,44 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
 
     @Override
     public void clearNotificationsFor(final Blockable overdueable, final InternalCallContext context) {
-        final NotificationQueue checkOverdueQueue;
         try {
-            checkOverdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
-                                                                              DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
-            final NotificationKey key = new NotificationKey() {
+            final NotificationQueue checkOverdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
+                                                                                                      DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
+            transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+
                 @Override
-                public String toString() {
-                    return overdueable.getId().toString();
+                public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                    final List<Notification> futureNotifications = getFutureNotificationsForAccountAndOverdueableInTransaction(entitySqlDaoWrapperFactory, checkOverdueQueue, overdueable, context);
+                    for (final Notification notification : futureNotifications) {
+                        checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory, notification.getId(), context);
+                    }
+
+                    return null;
                 }
-            };
-            checkOverdueQueue.removeNotificationsByKey(key, context);
+            });
         } catch (NoSuchNotificationQueue e) {
             log.error("Attempting to clear items from a non-existent queue (DefaultOverdueCheck).", e);
         }
     }
 
+    @VisibleForTesting
+    List<Notification> getFutureNotificationsForAccountAndOverdueableInTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory,
+                                                                                   final NotificationQueue checkOverdueQueue,
+                                                                                   final Blockable overdueable,
+                                                                                   final InternalCallContext context) {
+        final List<Notification> notifications = new ArrayList<Notification>();
+
+        final List<Notification> candidates = checkOverdueQueue.getFutureNotificationsForAccountFromTransaction(entitySqlDaoWrapperFactory, context);
+        for (final Notification candidate : candidates) {
+            if (OverdueCheckNotificationKey.class.getName().equals(candidate.getNotificationKeyClass())) {
+                final OverdueCheckNotificationKey key = PersistentQueueBase.deserializeEvent(candidate.getNotificationKeyClass(), candidate.getNotificationKey());
+
+                if (Type.get(overdueable).equals(key.getType()) && overdueable.getId().equals(key.getUuidKey())) {
+                    notifications.add(candidate);
+                }
+            }
+        }
+
+        return notifications;
+    }
 }
diff --git a/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java b/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
index 4f4363d..d84cbb5 100644
--- a/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
+++ b/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
@@ -29,9 +29,12 @@ import org.testng.annotations.Test;
 import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.junction.api.Blockable;
-import com.ning.billing.junction.api.Blockable.Type;
 import com.ning.billing.overdue.OverdueTestSuiteWithEmbeddedDB;
 import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.entity.dao.EntitySqlDao;
+import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
+import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
+import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.jackson.ObjectMapper;
 import com.ning.billing.util.notificationq.Notification;
 import com.ning.billing.util.notificationq.NotificationQueue;
@@ -40,11 +43,14 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
 
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
+    private EntitySqlDaoTransactionalJdbiWrapper entitySqlDaoTransactionalJdbiWrapper;
     private NotificationQueue overdueQueue;
     private DateTime testReferenceTime;
 
     @BeforeMethod(groups = "slow")
     public void setUp() throws Exception {
+        entitySqlDaoTransactionalJdbiWrapper = new EntitySqlDaoTransactionalJdbiWrapper(getDBI(), clock, cacheControllerDispatcher, nonEntityDao);
+
         overdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
                                                                      DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
         Assert.assertTrue(overdueQueue.isStarted());
@@ -71,11 +77,8 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
         verifyQueueContent(otherOverdueable, 5, 5);
         verifyQueueContent(otherOverdueable, 15, 5);
 
-        // Verify the final content of the queue for each key
-        final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(subscriptionId, Type.SUBSCRIPTION);
-        Assert.assertEquals(overdueQueue.getFutureNotificationsForKey(notificationKey, internalCallContext).size(), 1);
-        final OverdueCheckNotificationKey otherNotificationKey = new OverdueCheckNotificationKey(bundleId, Type.SUBSCRIPTION_BUNDLE);
-        Assert.assertEquals(overdueQueue.getFutureNotificationsForKey(otherNotificationKey, internalCallContext).size(), 1);
+        // Verify the final content of the queue
+        Assert.assertEquals(overdueQueue.getFutureNotificationsForAccount(internalCallContext).size(), 2);
     }
 
     private void verifyQueueContent(final Blockable overdueable, final int nbDaysInFuture, final int expectedNbDaysInFuture) throws IOException {
@@ -83,9 +86,18 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
         poster.insertOverdueCheckNotification(overdueable, futureNotificationTime, internalCallContext);
 
         final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(overdueable.getId(), Blockable.Type.get(overdueable));
-        final List<Notification> notificationsForKey = overdueQueue.getFutureNotificationsForKey(notificationKey, internalCallContext);
+        final List<Notification> notificationsForKey = getNotificationsForOverdueable(overdueable);
         Assert.assertEquals(notificationsForKey.size(), 1);
         Assert.assertEquals(notificationsForKey.get(0).getNotificationKey(), objectMapper.writeValueAsString(notificationKey));
         Assert.assertEquals(notificationsForKey.get(0).getEffectiveDate(), testReferenceTime.plusDays(expectedNbDaysInFuture));
     }
+
+    private List<Notification> getNotificationsForOverdueable(final Blockable overdueable) {
+        return entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<List<Notification>>() {
+            @Override
+            public List<Notification> inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                return ((DefaultOverdueCheckPoster) poster).getFutureNotificationsForAccountAndOverdueableInTransaction(entitySqlDaoWrapperFactory, overdueQueue, overdueable, internalCallContext);
+            }
+        });
+    }
 }
diff --git a/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteWithEmbeddedDB.java b/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteWithEmbeddedDB.java
index a3c4e12..b7ab466 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteWithEmbeddedDB.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteWithEmbeddedDB.java
@@ -30,7 +30,9 @@ import com.ning.billing.overdue.calculator.BillingStateCalculatorBundle;
 import com.ning.billing.overdue.glue.TestOverdueModuleWithEmbeddedDB;
 import com.ning.billing.overdue.service.DefaultOverdueService;
 import com.ning.billing.overdue.wrapper.OverdueWrapperFactory;
+import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.dao.NonEntityDao;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
@@ -58,6 +60,8 @@ public abstract class OverdueTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
     @Inject
     protected EntitlementInternalApi entitlementApi;
     @Inject
+    protected CacheControllerDispatcher cacheControllerDispatcher;
+    @Inject
     protected InternalBus bus;
     @Inject
     protected InternalCallContextFactory internalCallContextFactory;
@@ -80,6 +84,8 @@ public abstract class OverdueTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
     @Inject
     protected OverdueWrapperFactory overdueWrapperFactory;
     @Inject
+    protected NonEntityDao nonEntityDao;
+    @Inject
     protected TestOverdueHelper testOverdueHelper;
 
     @BeforeClass(groups = "slow")
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 78c8a35..38a3086 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -32,7 +32,7 @@ import org.skife.jdbi.v2.sqlobject.SqlUpdate;
 import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
 import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
 import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
-import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.UseStringTemplate3StatementLocator;
 import org.skife.jdbi.v2.tweak.ResultSetMapper;
 
 import com.ning.billing.util.callcontext.InternalCallContext;
@@ -44,12 +44,9 @@ import com.ning.billing.util.notificationq.DefaultNotification;
 import com.ning.billing.util.notificationq.Notification;
 import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
-@ExternalizedSqlViaStringTemplate3()
+@UseStringTemplate3StatementLocator()
 public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, CloseMe {
 
-    //
-    // APIs for event notifications
-    //
     @SqlQuery
     @Mapper(NotificationSqlMapper.class)
     public List<Notification> getReadyNotifications(@Bind("now") Date now,
@@ -63,10 +60,9 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
 
     @SqlQuery
     @Mapper(NotificationSqlMapper.class)
-    public List<Notification> getFutureNotificationsForKey(@Bind("notificationKey") String key,
-                                                           @Bind("className") String className,
-                                                           @Bind("queueName") String queueName,
-                                                           @InternalTenantContextBinder final InternalTenantContext context);
+    public List<Notification> getFutureNotificationsForAccount(@Bind("now") Date now,
+                                                               @Bind("queueName") String queueName,
+                                                               @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlUpdate
     public void removeNotification(@Bind("id") String id,
@@ -85,10 +81,6 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
                                   @InternalTenantContextBinder final InternalCallContext context);
 
     @SqlUpdate
-    public void removeNotificationsByKey(@Bind("notificationKey") String key,
-                                         @InternalTenantContextBinder final InternalCallContext context);
-
-    @SqlUpdate
     public void insertNotification(@Bind(binder = NotificationSqlDaoBinder.class) Notification evt,
                                    @InternalTenantContextBinder final InternalCallContext context);
 
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index a414463..563939e 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -22,13 +22,10 @@ import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.ning.billing.util.Hostname;
-import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContext;
-import com.ning.billing.util.dao.NonEntityDao;
+import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
@@ -38,36 +35,28 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 
 public class DefaultNotificationQueue implements NotificationQueue {
 
-    private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
-
     private final NotificationSqlDao dao;
     private final String hostname;
-
     private final String svcName;
     private final String queueName;
-
-    private final ObjectMapper objectMapper;
-
     private final NotificationQueueHandler handler;
-
     private final NotificationQueueService notificationQueueService;
-    private final NonEntityDao nonEntityDao;
-    private final CacheControllerDispatcher cacheControllerDispatcher;
+    private final ObjectMapper objectMapper;
+    private final Clock clock;
 
     private volatile boolean isStarted;
 
     public DefaultNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler,
                                     final IDBI dbi, final NotificationQueueService notificationQueueService,
-                                    final NonEntityDao nonEntityDao, final CacheControllerDispatcher cacheControllerDispatcher) {
+                                    final Clock clock) {
         this.svcName = svcName;
         this.queueName = queueName;
         this.handler = handler;
-        this.nonEntityDao = nonEntityDao;
-        this.cacheControllerDispatcher = cacheControllerDispatcher;
         this.dao = dbi.onDemand(NotificationSqlDao.class);
         this.hostname = Hostname.get();
         this.notificationQueueService = notificationQueueService;
         this.objectMapper = new ObjectMapper();
+        this.clock = clock;
     }
 
     @Override
@@ -99,31 +88,19 @@ public class DefaultNotificationQueue implements NotificationQueue {
     }
 
     @Override
-    public void removeNotificationsByKey(final NotificationKey notificationKey, final InternalCallContext context) {
-        // TODO Pierre Don't we want to check for the notification key class and queue name as well?
-        dao.removeNotificationsByKey(notificationKey.toString(), context);
-    }
-
-    @Override
-    public List<Notification> getFutureNotificationsForKey(final NotificationKey notificationKey, final InternalCallContext context) {
-        return getFutureNotificationsForKeyInternal(dao, notificationKey, context);
+    public List<Notification> getFutureNotificationsForAccount(final InternalCallContext context) {
+        return getFutureNotificationsForAccountInternal(dao, context);
     }
 
     @Override
-    public List<Notification> getFutureNotificationsForKeyFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
-                                                                          final NotificationKey notificationKey, final InternalCallContext context) {
+    public List<Notification> getFutureNotificationsForAccountFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+                                                                              final InternalCallContext context) {
         final NotificationSqlDao transactionalNotificationDao = transactionalDao.transmogrify(NotificationSqlDao.class);
-        return getFutureNotificationsForKeyInternal(transactionalNotificationDao, notificationKey, context);
+        return getFutureNotificationsForAccountInternal(transactionalNotificationDao, context);
     }
 
-    private List<Notification> getFutureNotificationsForKeyInternal(final NotificationSqlDao transactionalDao,
-                                                                    final NotificationKey notificationKey, final InternalCallContext context) {
-        try {
-            final String json = objectMapper.writeValueAsString(notificationKey);
-            return transactionalDao.getFutureNotificationsForKey(json, notificationKey.getClass().getName(), getFullQName(), context);
-        } catch (IOException e) {
-            throw new IllegalArgumentException(e);
-        }
+    private List<Notification> getFutureNotificationsForAccountInternal(final NotificationSqlDao transactionalDao, final InternalCallContext context) {
+        return transactionalDao.getFutureNotificationsForAccount(clock.getUTCNow().toDate(), getFullQName(), context);
     }
 
     @Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index d1a31db..88d17cd 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -18,34 +18,26 @@ package com.ning.billing.util.notificationq;
 
 import org.skife.jdbi.v2.IDBI;
 
-import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.dao.NonEntityDao;
 
 import com.google.inject.Inject;
 
 public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
 
     private final IDBI dbi;
-    private final NonEntityDao nonEntityDao;
-    private final CacheControllerDispatcher cacheControllerDispatcher;
-
 
     @Inject
     public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final NotificationQueueConfig config,
-                                           final InternalCallContextFactory internalCallContextFactory, final NonEntityDao nonEntityDao, final CacheControllerDispatcher cacheControllerDispatcher) {
+                                           final InternalCallContextFactory internalCallContextFactory) {
         super(clock, config, dbi, internalCallContextFactory);
         this.dbi = dbi;
-        this.nonEntityDao = nonEntityDao;
-        this.cacheControllerDispatcher = cacheControllerDispatcher;
     }
 
-
     @Override
     protected NotificationQueue createNotificationQueueInternal(final String svcName,
                                                                 final String queueName,
                                                                 final NotificationQueueHandler handler) {
-        return new DefaultNotificationQueue(svcName, queueName, handler, dbi, this, nonEntityDao, cacheControllerDispatcher);
+        return new DefaultNotificationQueue(svcName, queueName, handler, dbi, this, clock);
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index f009a6c..afb4cc5 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -55,33 +55,23 @@ public interface NotificationQueue extends QueueLifecycle {
             throws IOException;
 
     /**
-     * Remove all notifications associated with this key
-     */
-    public void removeNotificationsByKey(final NotificationKey notificationKey,
-                                         final InternalCallContext context);
-
-    /**
-     * Retrieve all future pending notifications for a given key
+     * Retrieve all future pending notifications for a given account (taken from the context)
      * Results are ordered by effective date asc.
      *
-     * @param notificationKey notification key to look for
-     * @param context         internal call context
+     * @param context internal call context
      * @return future notifications matching that key
      */
-    public List<Notification> getFutureNotificationsForKey(final NotificationKey notificationKey,
-                                                           final InternalCallContext context);
+    public List<Notification> getFutureNotificationsForAccount(final InternalCallContext context);
 
     /**
-     * Retrieve all future pending notifications for a given key in a transaction.
+     * Retrieve all future pending notifications for a given account (taken from the context) in a transaction.
      * Results are ordered by effective date asc.
      *
-     * @param notificationKey notification key to look for
-     * @param context         internal call context
+     * @param context internal call context
      * @return future notifications matching that key
      */
-    public List<Notification> getFutureNotificationsForKeyFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
-                                                                          final NotificationKey notificationKey,
-                                                                          final InternalCallContext context);
+    public List<Notification> getFutureNotificationsForAccountFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+                                                                              final InternalCallContext context);
 
     public void removeNotification(final UUID notificationId,
                                    final InternalCallContext context);
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index d3706bb..247cf8f 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -41,7 +41,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
     private boolean isProcessingEvents;
     private int curActiveThreads;
 
-    protected final ObjectMapper objectMapper;
+    protected static final ObjectMapper objectMapper = new ObjectMapper();
 
     private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
@@ -54,7 +54,6 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
         this.nbThreads = nbThreads;
         this.svcQName = svcQName;
         this.config = config;
-        this.objectMapper = new ObjectMapper();        
         this.isProcessingEvents = false;
         this.curActiveThreads = 0;
         this.isProcessingSuspended = new AtomicBoolean(false);
@@ -188,8 +187,8 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
         return isProcessingSuspended.get();
     }
 
-
-    protected <T> T deserializeEvent(final String className, final String json) {
+    // TODO PIERRE Better API?
+    public static <T> T deserializeEvent(final String className, final String json) {
         try {
             final Class<?> claz = Class.forName(className);
             return (T) objectMapper.readValue(json, claz);
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 71482a8..3afc5e3 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -44,7 +44,7 @@ getPendingCountNotifications() ::= <<
     ;
 >>
 
-getFutureNotificationsForKey() ::= <<
+getFutureNotificationsForAccount() ::= <<
 select
     record_id
   , id
@@ -62,10 +62,10 @@ select
   , account_record_id
   , tenant_record_id
 from notifications
-where notification_key = :notificationKey
-and class_name = :className
-and queue_name = :queueName
+where queue_name = :queueName
 and processing_state != 'REMOVED'
+and account_record_id = :accountRecordId
+and effective_date >= :now
 order by effective_date, record_id
 ;
 >>
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index c62a365..4c5c817 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -88,33 +88,17 @@ public class MockNotificationQueue implements NotificationQueue {
     }
 
     @Override
-    public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
-        final List<Notification> toClearNotifications = new ArrayList<Notification>();
-        for (final Notification notification : notifications) {
-            if (notification.getNotificationKey().equals(key.toString())) {
-                toClearNotifications.add(notification);
-            }
-        }
-
-        synchronized (notifications) {
-            if (toClearNotifications.size() > 0) {
-                notifications.removeAll(toClearNotifications);
-            }
-        }
-    }
-
-    @Override
-    public List<Notification> getFutureNotificationsForKey(final NotificationKey notificationKey, final InternalCallContext context) {
-        return getFutureNotificationsForKeyFromTransaction(null, notificationKey, context);
+    public List<Notification> getFutureNotificationsForAccount(final InternalCallContext context) {
+        return getFutureNotificationsForAccountFromTransaction(null, context);
     }
 
     @Override
-    public List<Notification> getFutureNotificationsForKeyFromTransaction(@Nullable final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
-                                                                          final NotificationKey notificationKey, final InternalCallContext context) {
+    public List<Notification> getFutureNotificationsForAccountFromTransaction(@Nullable final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+                                                                              final InternalCallContext context) {
         final List<Notification> result = new ArrayList<Notification>();
         synchronized (notifications) {
             for (final Notification notification : notifications) {
-                if (notificationKey.toString().equals(notification.getNotificationKey()) && notification.getEffectiveDate().isAfter(clock.getUTCNow())) {
+                if (notification.getAccountRecordId().equals(context.getAccountRecordId()) && notification.getEffectiveDate().isAfter(clock.getUTCNow())) {
                     result.add(notification);
                 }
             }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 6bd9ed1..3018299 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
 import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
@@ -90,6 +89,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         super.setup();
         entitySqlDaoTransactionalJdbiWrapper = new EntitySqlDaoTransactionalJdbiWrapper(getDBI(), clock, cacheControllerDispatcher, nonEntityDao);
     }
+
     @Override
     @BeforeMethod(groups = "slow")
     public void setupTest() throws Exception {
@@ -341,95 +341,4 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         Assert.assertTrue(expectedNotificationsFred.get(notificationKeyFred));
         Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney));
     }
-
-    @Test(groups = "slow")
-    public void testRemoveNotifications() throws Exception {
-        final UUID key = UUID.randomUUID();
-        final NotificationKey notificationKey = new TestNotificationKey(key.toString());
-        final UUID key2 = UUID.randomUUID();
-        final NotificationKey notificationKey2 = new TestNotificationKey(key2.toString());
-
-        final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
-                                                                             "remove",
-                                                                             new NotificationQueueHandler() {
-                                                                                 @Override
-                                                                                 public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
-                                                                                     if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
-                                                                                         log.info("Received notification with key: " + notificationKey);
-                                                                                         eventsReceived++;
-                                                                                     }
-                                                                                 }
-                                                                             });
-        queue.startQueue();
-
-        final DateTime start = clock.getUTCNow().plusHours(1);
-        final int nextReadyTimeIncrementMs = 1000;
-
-        // add 3 events
-
-        entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
-            @Override
-            public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
-                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(nextReadyTimeIncrementMs), notificationKey, internalCallContext);
-                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(2 * nextReadyTimeIncrementMs), notificationKey, internalCallContext);
-                queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(3 * nextReadyTimeIncrementMs), notificationKey2, internalCallContext);
-                return null;
-            }
-        });
-
-        queue.removeNotificationsByKey(notificationKey, internalCallContext); // should remove 2 of the 3
-
-        // Move time in the future after the notification effectiveDate
-        ((ClockMock) clock).setDeltaFromReality(4000000 + nextReadyTimeIncrementMs * 3);
-
-        try {
-            await().atMost(10, TimeUnit.SECONDS).until(new Callable<Boolean>() {
-                @Override
-                public Boolean call() throws Exception {
-                    return eventsReceived >= 2;
-                }
-            });
-            Assert.fail("There should only have been only one event left in the queue we got: " + eventsReceived);
-        } catch (Exception e) {
-            // expected behavior
-        }
-        log.info("Received " + eventsReceived + " events");
-        queue.stopQueue();
-    }
-
-    static NotificationQueueConfig getNotificationConfig(final boolean off, final long sleepTime) {
-        return new NotificationQueueConfig() {
-            @Override
-            public boolean isProcessingOff() {
-                return off;
-            }
-
-            @Override
-            public int getPrefetchAmount() {
-                return 10;
-            }
-
-            @Override
-            public long getSleepTimeMs() {
-                return sleepTime;
-            }
-        };
-    }
-
-    /*
-    public static class TestNotificationQueueModule extends AbstractModule {
-
-        @Override
-        protected void configure() {
-            bind(Clock.class).to(ClockMock.class).asEagerSingleton();
-
-            final IDBI dbi = getDBI();
-            bind(IDBI.class).toInstance(dbi);
-            final IDBI otherDbi = getDBI();
-            bind(IDBI.class).annotatedWith(Names.named("global-lock")).toInstance(otherDbi);
-            bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
-            bind(NotificationQueueConfig.class).toInstance(getNotificationConfig(false, 100));
-        }
-    }
-    */
 }