diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 11721c8..7ee2e10 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.TreeSet;
import org.joda.time.DateTime;
-import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import com.ning.billing.util.clock.Clock;
@@ -33,7 +32,7 @@ import com.ning.billing.util.notificationq.NotificationQueueService.Notification
public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
- private TreeSet<Notification> notifications;
+ private final TreeSet<Notification> notifications;
public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
super(clock, svcName, queueName, handler, config);
@@ -63,17 +62,27 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
protected void doProcessEvents(int sequenceId) {
List<Notification> processedNotifications = new ArrayList<Notification>();
+ List<Notification> oldNotifications = new ArrayList<Notification>();
+
+ List<Notification> readyNotifications = new ArrayList<Notification>();
synchronized(notifications) {
Iterator<Notification> it = notifications.iterator();
while (it.hasNext()) {
Notification cur = it.next();
if (cur.isAvailableForProcessing(clock.getUTCNow())) {
- handler.handleReadyNotification(cur.getNotificationKey());
- DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
- it.remove();
- processedNotifications.add(processedNotification);
+ readyNotifications.add(cur);
}
}
+ for (Notification cur : readyNotifications) {
+ handler.handleReadyNotification(cur.getNotificationKey());
+ DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+ oldNotifications.add(cur);
+ processedNotifications.add(processedNotification);
+
+ }
+ if (oldNotifications.size() > 0) {
+ notifications.removeAll(oldNotifications);
+ }
if (processedNotifications.size() > 0) {
notifications.addAll(processedNotifications);
}