killbill-uncached

overdue: limit the number of notifications Each time overdue

2/18/2013 10:32:22 PM

Details

diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
index b2e91c5..4090fc6 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -16,15 +16,25 @@
 
 package com.ning.billing.ovedue.notification;
 
-import java.io.IOException;
+import java.util.List;
 
 import org.joda.time.DateTime;
+import org.skife.jdbi.v2.IDBI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.billing.junction.api.Blockable;
+import com.ning.billing.junction.api.Blockable.Type;
 import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContext;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.dao.NonEntityDao;
+import com.ning.billing.util.entity.dao.EntitySqlDao;
+import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
+import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
+import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import com.ning.billing.util.notificationq.Notification;
 import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService;
@@ -33,15 +43,18 @@ import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotifi
 import com.google.inject.Inject;
 
 public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
+
     private static final Logger log = LoggerFactory.getLogger(DefaultOverdueCheckNotifier.class);
 
     private final NotificationQueueService notificationQueueService;
+    private final EntitySqlDaoTransactionalJdbiWrapper transactionalSqlDao;
 
     @Inject
-    public DefaultOverdueCheckPoster(
-            final NotificationQueueService notificationQueueService) {
-        super();
+    public DefaultOverdueCheckPoster(final NotificationQueueService notificationQueueService,
+                                     final IDBI dbi, final Clock clock,
+                                     final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
         this.notificationQueueService = notificationQueueService;
+        this.transactionalSqlDao = new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, cacheControllerDispatcher, nonEntityDao);
     }
 
     @Override
@@ -52,15 +65,46 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
                                                                               DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
             log.info("Queuing overdue check notification. id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
 
-            checkOverdueQueue.recordFutureNotification(futureNotificationTime, new OverdueCheckNotificationKey(overdueable.getId(), Blockable.Type.get(overdueable)), context);
+            final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(overdueable.getId(), Type.get(overdueable));
+
+            transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+                @Override
+                public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                    boolean shouldInsertNewNotification = true;
+
+                    // Check if we already have notifications for that key
+                    final List<Notification> futureNotificationsForKey = checkOverdueQueue.getFutureNotificationsForKeyFromTransaction(entitySqlDaoWrapperFactory, notificationKey, context);
+                    if (futureNotificationsForKey.size() > 0) {
+                        // Results are ordered by effective date asc
+                        final DateTime earliestExistingNotificationDate = futureNotificationsForKey.get(0).getEffectiveDate();
+
+                        final int minIndexToDeleteFrom;
+                        if (earliestExistingNotificationDate.isBefore(futureNotificationTime)) {
+                            // We don't have to insert a new one. For sanity, delete any other future notification
+                            minIndexToDeleteFrom = 1;
+                            shouldInsertNewNotification = false;
+                        } else {
+                            // We win - we are before any other already recorded. Delete all others.
+                            minIndexToDeleteFrom = 0;
+                        }
+
+                        for (int i = minIndexToDeleteFrom; i < futureNotificationsForKey.size(); i++) {
+                            checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationsForKey.get(i).getId(), context);
+                        }
+                    }
+
+                    if (shouldInsertNewNotification) {
+                        checkOverdueQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationTime, notificationKey, context);
+                    }
+
+                    return null;
+                }
+            });
         } catch (NoSuchNotificationQueue e) {
             log.error("Attempting to put items on a non-existent queue (DefaultOverdueCheck).", e);
-        } catch (IOException e) {
-            log.error("Failed to serialize notifcationKey for {}", overdueable.toString());
         }
     }
 
-
     @Override
     public void clearNotificationsFor(final Blockable overdueable, final InternalCallContext context) {
         final NotificationQueue checkOverdueQueue;
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java
index 1c0d441..9768a62 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java
@@ -26,5 +26,4 @@ public interface OverdueCheckPoster {
     void insertOverdueCheckNotification(Blockable blockable, DateTime futureNotificationTime, final InternalCallContext context);
 
     void clearNotificationsFor(Blockable blockable, final InternalCallContext context);
-
 }
diff --git a/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java b/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
new file mode 100644
index 0000000..4f4363d
--- /dev/null
+++ b/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2010-2013 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.ovedue.notification;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBundle;
+import com.ning.billing.junction.api.Blockable;
+import com.ning.billing.junction.api.Blockable.Type;
+import com.ning.billing.overdue.OverdueTestSuiteWithEmbeddedDB;
+import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.jackson.ObjectMapper;
+import com.ning.billing.util.notificationq.Notification;
+import com.ning.billing.util.notificationq.NotificationQueue;
+
+public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedDB {
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private NotificationQueue overdueQueue;
+    private DateTime testReferenceTime;
+
+    @BeforeMethod(groups = "slow")
+    public void setUp() throws Exception {
+        overdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
+                                                                     DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
+        Assert.assertTrue(overdueQueue.isStarted());
+
+        testReferenceTime = clock.getUTCNow();
+    }
+
+    @Test(groups = "slow")
+    public void testShouldntInsertMultipleNotificationsPerOverdueable() throws Exception {
+        final UUID subscriptionId = UUID.randomUUID();
+        final Blockable overdueable = Mockito.mock(Subscription.class);
+        Mockito.when(overdueable.getId()).thenReturn(subscriptionId);
+
+        verifyQueueContent(overdueable, 10, 10);
+        verifyQueueContent(overdueable, 5, 5);
+        verifyQueueContent(overdueable, 15, 5);
+
+        // Check we don't conflict with other overdueables
+        final UUID bundleId = UUID.randomUUID();
+        final Blockable otherOverdueable = Mockito.mock(SubscriptionBundle.class);
+        Mockito.when(otherOverdueable.getId()).thenReturn(bundleId);
+
+        verifyQueueContent(otherOverdueable, 10, 10);
+        verifyQueueContent(otherOverdueable, 5, 5);
+        verifyQueueContent(otherOverdueable, 15, 5);
+
+        // Verify the final content of the queue for each key
+        final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(subscriptionId, Type.SUBSCRIPTION);
+        Assert.assertEquals(overdueQueue.getFutureNotificationsForKey(notificationKey, internalCallContext).size(), 1);
+        final OverdueCheckNotificationKey otherNotificationKey = new OverdueCheckNotificationKey(bundleId, Type.SUBSCRIPTION_BUNDLE);
+        Assert.assertEquals(overdueQueue.getFutureNotificationsForKey(otherNotificationKey, internalCallContext).size(), 1);
+    }
+
+    private void verifyQueueContent(final Blockable overdueable, final int nbDaysInFuture, final int expectedNbDaysInFuture) throws IOException {
+        final DateTime futureNotificationTime = testReferenceTime.plusDays(nbDaysInFuture);
+        poster.insertOverdueCheckNotification(overdueable, futureNotificationTime, internalCallContext);
+
+        final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(overdueable.getId(), Blockable.Type.get(overdueable));
+        final List<Notification> notificationsForKey = overdueQueue.getFutureNotificationsForKey(notificationKey, internalCallContext);
+        Assert.assertEquals(notificationsForKey.size(), 1);
+        Assert.assertEquals(notificationsForKey.get(0).getNotificationKey(), objectMapper.writeValueAsString(notificationKey));
+        Assert.assertEquals(notificationsForKey.get(0).getEffectiveDate(), testReferenceTime.plusDays(expectedNbDaysInFuture));
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 4e59096..78c8a35 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -63,9 +63,10 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
 
     @SqlQuery
     @Mapper(NotificationSqlMapper.class)
-    public List<Notification> getNotificationForAccountAndDate(@Bind("accountRecordId") final long accountRecordId,
-                                                               @Bind("effectiveDate") final Date effectiveDate,
-                                                               @InternalTenantContextBinder final InternalTenantContext context);
+    public List<Notification> getFutureNotificationsForKey(@Bind("notificationKey") String key,
+                                                           @Bind("className") String className,
+                                                           @Bind("queueName") String queueName,
+                                                           @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlUpdate
     public void removeNotification(@Bind("id") String id,
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 208a211..a414463 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -18,20 +18,14 @@ package com.ning.billing.util.notificationq;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
-import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.ning.billing.ObjectType;
 import com.ning.billing.util.Hostname;
-
-import com.ning.billing.util.cache.Cachable.CacheType;
 import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.dao.NonEntityDao;
@@ -41,13 +35,11 @@ import com.ning.billing.util.notificationq.NotificationQueueService.Notification
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
 
 public class DefaultNotificationQueue implements NotificationQueue {
 
     private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
 
-    private final IDBI dbi;
     private final NotificationSqlDao dao;
     private final String hostname;
 
@@ -62,7 +54,6 @@ public class DefaultNotificationQueue implements NotificationQueue {
     private final NonEntityDao nonEntityDao;
     private final CacheControllerDispatcher cacheControllerDispatcher;
 
-
     private volatile boolean isStarted;
 
     public DefaultNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler,
@@ -71,7 +62,6 @@ public class DefaultNotificationQueue implements NotificationQueue {
         this.svcName = svcName;
         this.queueName = queueName;
         this.handler = handler;
-        this.dbi = dbi;
         this.nonEntityDao = nonEntityDao;
         this.cacheControllerDispatcher = cacheControllerDispatcher;
         this.dao = dbi.onDemand(NotificationSqlDao.class);
@@ -80,7 +70,6 @@ public class DefaultNotificationQueue implements NotificationQueue {
         this.objectMapper = new ObjectMapper();
     }
 
-
     @Override
     public void recordFutureNotification(final DateTime futureNotificationTime,
                                          final NotificationKey notificationKey,
@@ -109,20 +98,31 @@ public class DefaultNotificationQueue implements NotificationQueue {
         thisDao.insertNotification(notification, context);
     }
 
-
-
     @Override
     public void removeNotificationsByKey(final NotificationKey notificationKey, final InternalCallContext context) {
+        // TODO Pierre Don't we want to check for the notification key class and queue name as well?
         dao.removeNotificationsByKey(notificationKey.toString(), context);
     }
 
     @Override
-    public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
-        final Long accountRecordId = nonEntityDao.retrieveRecordIdFromObject(accountId, ObjectType.ACCOUNT, cacheControllerDispatcher.getCacheController(CacheType.RECORD_ID));
-        if (accountId == null) {
-            return ImmutableList.<Notification>of();
-        } else {
-            return dao.getNotificationForAccountAndDate(accountRecordId, effectiveDate.toDate(), context);
+    public List<Notification> getFutureNotificationsForKey(final NotificationKey notificationKey, final InternalCallContext context) {
+        return getFutureNotificationsForKeyInternal(dao, notificationKey, context);
+    }
+
+    @Override
+    public List<Notification> getFutureNotificationsForKeyFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+                                                                          final NotificationKey notificationKey, final InternalCallContext context) {
+        final NotificationSqlDao transactionalNotificationDao = transactionalDao.transmogrify(NotificationSqlDao.class);
+        return getFutureNotificationsForKeyInternal(transactionalNotificationDao, notificationKey, context);
+    }
+
+    private List<Notification> getFutureNotificationsForKeyInternal(final NotificationSqlDao transactionalDao,
+                                                                    final NotificationKey notificationKey, final InternalCallContext context) {
+        try {
+            final String json = objectMapper.writeValueAsString(notificationKey);
+            return transactionalDao.getFutureNotificationsForKey(json, notificationKey.getClass().getName(), getFullQName(), context);
+        } catch (IOException e) {
+            throw new IllegalArgumentException(e);
         }
     }
 
@@ -132,6 +132,12 @@ public class DefaultNotificationQueue implements NotificationQueue {
     }
 
     @Override
+    public void removeNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao, final UUID notificationId, final InternalCallContext context) {
+        final NotificationSqlDao transactionalNotificationDao = transactionalDao.transmogrify(NotificationSqlDao.class);
+        transactionalNotificationDao.removeNotification(notificationId.toString(), context);
+    }
+
+    @Override
     public String getFullQName() {
         return NotificationQueueServiceBase.getCompositeName(svcName, queueName);
     }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 58b49c4..f009a6c 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -60,13 +60,35 @@ public interface NotificationQueue extends QueueLifecycle {
     public void removeNotificationsByKey(final NotificationKey notificationKey,
                                          final InternalCallContext context);
 
-    public List<Notification> getNotificationForAccountAndDate(final UUID accountId,
-                                                               final DateTime effectiveDate,
-                                                               final InternalCallContext context);
+    /**
+     * Retrieve all future pending notifications for a given key
+     * Results are ordered by effective date asc.
+     *
+     * @param notificationKey notification key to look for
+     * @param context         internal call context
+     * @return future notifications matching that key
+     */
+    public List<Notification> getFutureNotificationsForKey(final NotificationKey notificationKey,
+                                                           final InternalCallContext context);
+
+    /**
+     * Retrieve all future pending notifications for a given key in a transaction.
+     * Results are ordered by effective date asc.
+     *
+     * @param notificationKey notification key to look for
+     * @param context         internal call context
+     * @return future notifications matching that key
+     */
+    public List<Notification> getFutureNotificationsForKeyFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+                                                                          final NotificationKey notificationKey,
+                                                                          final InternalCallContext context);
 
     public void removeNotification(final UUID notificationId,
                                    final InternalCallContext context);
 
+    public void removeNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+                                                  final UUID notificationId,
+                                                  final InternalCallContext context);
 
     /**
      * @return the name of that queue
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index d2266b9..71482a8 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -44,27 +44,30 @@ getPendingCountNotifications() ::= <<
     ;
 >>
 
-getNotificationForAccountAndDate() ::= <<
-   select
-       record_id
-     , id
-     , class_name
-     , notification_key
-     , user_token
-     , future_user_token
-     , created_date
-     , creating_owner
-     , effective_date
-     , queue_name
-     , processing_owner
-     , processing_available_date
-     , processing_state
-     , account_record_id
-     , tenant_record_id
-   from notifications
-   where
-   account_record_id = :accountRecordId AND effective_date = :effectiveDate
-   ;
+getFutureNotificationsForKey() ::= <<
+select
+    record_id
+  , id
+  , class_name
+  , notification_key
+  , user_token
+  , future_user_token
+  , created_date
+  , creating_owner
+  , effective_date
+  , queue_name
+  , processing_owner
+  , processing_available_date
+  , processing_state
+  , account_record_id
+  , tenant_record_id
+from notifications
+where notification_key = :notificationKey
+and class_name = :className
+and queue_name = :queueName
+and processing_state != 'REMOVED'
+order by effective_date, record_id
+;
 >>
 
 removeNotification()  ::= <<
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 1ef9b84..7bab177 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -25,7 +25,6 @@ import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
 import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
@@ -96,33 +95,6 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
         validateDate(notification.getNextAvailableDate(), nextAvailable);
     }
 
-    @Test(groups = "slow")
-    public void testGetByAccountAndDate() throws InterruptedException {
-        final long accountRecordId = 1242L;
-        final String notificationKey = UUID.randomUUID().toString();
-        final DateTime effDt = new DateTime();
-        final Notification notif1 = new DefaultNotification("testBasic1", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
-                                                            accountRecordId, internalCallContext.getTenantRecordId());
-        dao.insertNotification(notif1, internalCallContext);
-
-        final Notification notif2 = new DefaultNotification("testBasic2", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
-                                                            accountRecordId, internalCallContext.getTenantRecordId());
-        dao.insertNotification(notif2, internalCallContext);
-
-        List<Notification> notifications = dao.getNotificationForAccountAndDate(accountRecordId, effDt.toDate(), internalCallContext);
-        assertEquals(notifications.size(), 2);
-        for (final Notification cur : notifications) {
-            Assert.assertEquals(cur.getProcessingState(), PersistentQueueEntryLifecycleState.AVAILABLE);
-            dao.removeNotification(cur.getId().toString(), internalCallContext);
-        }
-
-        notifications = dao.getNotificationForAccountAndDate(accountRecordId, effDt.toDate(), internalCallContext);
-        assertEquals(notifications.size(), 2);
-        for (final Notification cur : notifications) {
-            Assert.assertEquals(cur.getProcessingState(), PersistentQueueEntryLifecycleState.REMOVED);
-        }
-    }
-
     private Notification fetchNotification(final String notificationId) {
         return getDBI().withHandle(new HandleCallback<Notification>() {
             @Override
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index ac267a7..c62a365 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.TreeSet;
 import java.util.UUID;
 
+import javax.annotation.Nullable;
+
 import org.joda.time.DateTime;
 
 import com.ning.billing.util.Hostname;
@@ -85,7 +87,6 @@ public class MockNotificationQueue implements NotificationQueue {
         recordFutureNotification(futureNotificationTime, notificationKey, context);
     }
 
-
     @Override
     public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
         final List<Notification> toClearNotifications = new ArrayList<Notification>();
@@ -103,33 +104,40 @@ public class MockNotificationQueue implements NotificationQueue {
     }
 
     @Override
-    public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
-        /*
+    public List<Notification> getFutureNotificationsForKey(final NotificationKey notificationKey, final InternalCallContext context) {
+        return getFutureNotificationsForKeyFromTransaction(null, notificationKey, context);
+    }
+
+    @Override
+    public List<Notification> getFutureNotificationsForKeyFromTransaction(@Nullable final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+                                                                          final NotificationKey notificationKey, final InternalCallContext context) {
         final List<Notification> result = new ArrayList<Notification>();
         synchronized (notifications) {
-            for (Notification cur : notifications) {
-                if (cur.getAccountId().equals(accountId) || cur.getEffectiveDate().compareTo(effectiveDate) == 0) {
-                    result.add(cur);
+            for (final Notification notification : notifications) {
+                if (notificationKey.toString().equals(notification.getNotificationKey()) && notification.getEffectiveDate().isAfter(clock.getUTCNow())) {
+                    result.add(notification);
                 }
             }
         }
+
         return result;
-        */
-        return null;
     }
 
-
     @Override
     public void removeNotification(final UUID notificationId, final InternalCallContext context) {
+        removeNotificationFromTransaction(null, notificationId, context);
+    }
+
+    @Override
+    public void removeNotificationFromTransaction(@Nullable final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao, final UUID notificationId, final InternalCallContext context) {
         synchronized (notifications) {
-            for (Notification cur : notifications) {
+            for (final Notification cur : notifications) {
                 if (cur.getId().equals(notificationId)) {
                     notifications.remove(cur);
                     break;
                 }
             }
         }
-
     }
 
     @Override
@@ -169,12 +177,7 @@ public class MockNotificationQueue implements NotificationQueue {
         return isStarted;
     }
 
-
     public List<Notification> getReadyNotifications() {
-        final int result;
-        final List<Notification> processedNotifications = new ArrayList<Notification>();
-        final List<Notification> oldNotifications = new ArrayList<Notification>();
-
         final List<Notification> readyNotifications = new ArrayList<Notification>();
         synchronized (notifications) {
             for (final Notification cur : notifications) {