killbill-aplcache

Details

diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 11721c8..7ee2e10 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.TreeSet;
 
 import org.joda.time.DateTime;
-import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 import com.ning.billing.util.clock.Clock;
@@ -33,7 +32,7 @@ import com.ning.billing.util.notificationq.NotificationQueueService.Notification
 public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
 
 
-    private TreeSet<Notification> notifications;
+    private final TreeSet<Notification> notifications;
 
     public MockNotificationQueue(final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
         super(clock, svcName, queueName, handler, config);
@@ -63,17 +62,27 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     protected void doProcessEvents(int sequenceId) {
 
         List<Notification> processedNotifications = new ArrayList<Notification>();
+        List<Notification> oldNotifications = new ArrayList<Notification>();
+
+        List<Notification> readyNotifications = new ArrayList<Notification>();
         synchronized(notifications) {
             Iterator<Notification> it = notifications.iterator();
             while (it.hasNext()) {
                 Notification cur = it.next();
                 if (cur.isAvailableForProcessing(clock.getUTCNow())) {
-                    handler.handleReadyNotification(cur.getNotificationKey());
-                    DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
-                    it.remove();
-                    processedNotifications.add(processedNotification);
+                    readyNotifications.add(cur);
                 }
             }
+            for (Notification cur : readyNotifications) {
+                handler.handleReadyNotification(cur.getNotificationKey());
+                DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+                oldNotifications.add(cur);
+                processedNotifications.add(processedNotification);
+
+            }
+            if (oldNotifications.size() > 0) {
+                notifications.removeAll(oldNotifications);
+            }
             if (processedNotifications.size() > 0) {
                 notifications.addAll(processedNotifications);
             }