killbill-memoizeit

queue: improve performance of search APIs Stream results

2/24/2017 10:10:04 AM

Details

diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
index 590f3f1..cc85579 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -39,7 +40,6 @@ import org.killbill.billing.ErrorCode;
 import org.killbill.billing.ObjectType;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.account.api.AccountApiException;
-import org.killbill.billing.account.api.AccountData;
 import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.account.api.ImmutableAccountData;
 import org.killbill.billing.callcontext.InternalCallContext;
@@ -695,26 +695,21 @@ public class InvoiceDispatcher {
         try {
             final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
                                                                                                       DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
-            final List<NotificationEventWithMetadata<NextBillingDateNotificationKey>> futureNotifications = notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+            final Iterable<NotificationEventWithMetadata<NextBillingDateNotificationKey>> futureNotifications = notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
 
-            final Iterable<NotificationEventWithMetadata<NextBillingDateNotificationKey>> allUpcomingEvents = Iterables.filter(futureNotifications, new Predicate<NotificationEventWithMetadata<NextBillingDateNotificationKey>>() {
-                @Override
-                public boolean apply(@Nullable final NotificationEventWithMetadata<NextBillingDateNotificationKey> input) {
-                    final boolean isEventForSubscription = !filteredSubscriptionIds.iterator().hasNext() || Iterables.contains(filteredSubscriptionIds, input.getEvent().getUuidKey());
+            final Collection<DateTime> effectiveDates = new LinkedList<DateTime>();
+            for (final NotificationEventWithMetadata<NextBillingDateNotificationKey> input : futureNotifications) {
+                final boolean isEventForSubscription = !filteredSubscriptionIds.iterator().hasNext() || Iterables.contains(filteredSubscriptionIds, input.getEvent().getUuidKey());
 
-                    final boolean isEventDryRunForNotifications = input.getEvent().isDryRunForInvoiceNotification() != null ?
-                                                                  input.getEvent().isDryRunForInvoiceNotification() : false;
-                    return isEventForSubscription && !isEventDryRunForNotifications;
+                final boolean isEventDryRunForNotifications = input.getEvent().isDryRunForInvoiceNotification() != null ?
+                                                              input.getEvent().isDryRunForInvoiceNotification() : false;
+                if (isEventForSubscription && !isEventDryRunForNotifications) {
+                    effectiveDates.add(input.getEffectiveDate());
                 }
-            });
 
-            return Iterables.transform(allUpcomingEvents, new Function<NotificationEventWithMetadata<NextBillingDateNotificationKey>, DateTime>() {
-                @Nullable
-                @Override
-                public DateTime apply(@Nullable final NotificationEventWithMetadata<NextBillingDateNotificationKey> input) {
-                    return input.getEffectiveDate();
-                }
-            });
+            }
+
+            return effectiveDates;
         } catch (final NoSuchNotificationQueue noSuchNotificationQueue) {
             throw new IllegalStateException(noSuchNotificationQueue);
         }
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java
index 9014bdb..6dac86e 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -19,7 +19,6 @@
 package org.killbill.billing.invoice.notification;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -34,8 +33,6 @@ import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificatio
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 
 public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
@@ -71,23 +68,25 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
                                                                              DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
 
             // If we see existing notification for the same date (and isDryRunForInvoiceNotification mode), we don't insert a new notification
-            final List<NotificationEventWithMetadata<NextBillingDateNotificationKey>> futureNotifications = nextBillingQueue.getFutureNotificationFromTransactionForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
-            final NotificationEventWithMetadata<NextBillingDateNotificationKey> existingFutureNotificationWithSameDate = Iterables.tryFind(futureNotifications, new Predicate<NotificationEventWithMetadata<NextBillingDateNotificationKey>>() {
-                @Override
-                public boolean apply(final NotificationEventWithMetadata<NextBillingDateNotificationKey> input) {
-                    final boolean isEventDryRunForNotifications = input.getEvent().isDryRunForInvoiceNotification() != null ?
-                                                                  input.getEvent().isDryRunForInvoiceNotification() : false;
-
-                    final LocalDate notificationEffectiveLocaleDate = internalCallContext.toLocalDate(futureNotificationTime);
-                    final LocalDate eventEffectiveLocaleDate = internalCallContext.toLocalDate(input.getEffectiveDate());
-
-                    return notificationEffectiveLocaleDate.compareTo(eventEffectiveLocaleDate) == 0 &&
-                           ((isDryRunForInvoiceNotification && isEventDryRunForNotifications) ||
-                            (!isDryRunForInvoiceNotification && !isEventDryRunForNotifications));
+            final Iterable<NotificationEventWithMetadata<NextBillingDateNotificationKey>> futureNotifications = nextBillingQueue.getFutureNotificationFromTransactionForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
+
+            boolean existingFutureNotificationWithSameDate = false;
+            for (final NotificationEventWithMetadata<NextBillingDateNotificationKey> input : futureNotifications) {
+                final boolean isEventDryRunForNotifications = input.getEvent().isDryRunForInvoiceNotification() != null ?
+                                                              input.getEvent().isDryRunForInvoiceNotification() : false;
+
+                final LocalDate notificationEffectiveLocaleDate = internalCallContext.toLocalDate(futureNotificationTime);
+                final LocalDate eventEffectiveLocaleDate = internalCallContext.toLocalDate(input.getEffectiveDate());
+
+                if (notificationEffectiveLocaleDate.compareTo(eventEffectiveLocaleDate) == 0 &&
+                    ((isDryRunForInvoiceNotification && isEventDryRunForNotifications) ||
+                     (!isDryRunForInvoiceNotification && !isEventDryRunForNotifications))) {
+                    existingFutureNotificationWithSameDate = true;
                 }
-            }).orNull();
+                // Go through all results to close the connection
+            }
 
-            if (existingFutureNotificationWithSameDate == null) {
+            if (!existingFutureNotificationWithSameDate) {
                 log.info("Queuing next billing date notification at {} for subscriptionId {}", futureNotificationTime.toString(), subscriptionId.toString());
 
                 nextBillingQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), futureNotificationTime,
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java b/invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java
index afd8816..43870c3 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -18,7 +18,6 @@
 package org.killbill.billing.invoice.notification;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -33,8 +32,6 @@ import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificatio
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 
 public class ParentInvoiceCommitmentPoster {
@@ -58,19 +55,20 @@ public class ParentInvoiceCommitmentPoster {
                                                                                ParentInvoiceCommitmentNotifier.PARENT_INVOICE_COMMITMENT_NOTIFIER_QUEUE);
 
             // If we see existing notification for the same date we don't insert a new notification
-            final List<NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey>> futureNotifications = commitInvoiceQueue.getFutureNotificationFromTransactionForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
-            final NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey> existingFutureNotificationWithSameDate = Iterables.tryFind(futureNotifications, new Predicate<NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey>>() {
-                @Override
-                public boolean apply(final NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey> input) {
+            final Iterable<NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey>> futureNotifications = commitInvoiceQueue.getFutureNotificationFromTransactionForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
 
-                    final LocalDate notificationEffectiveLocaleDate = internalCallContext.toLocalDate(futureNotificationTime);
-                    final LocalDate eventEffectiveLocaleDate = internalCallContext.toLocalDate(input.getEffectiveDate());
+            boolean existingFutureNotificationWithSameDate = false;
+            for (final NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey> input : futureNotifications) {
+                final LocalDate notificationEffectiveLocaleDate = internalCallContext.toLocalDate(futureNotificationTime);
+                final LocalDate eventEffectiveLocaleDate = internalCallContext.toLocalDate(input.getEffectiveDate());
 
-                    return notificationEffectiveLocaleDate.compareTo(eventEffectiveLocaleDate) == 0;
+                if (notificationEffectiveLocaleDate.compareTo(eventEffectiveLocaleDate) == 0) {
+                    existingFutureNotificationWithSameDate = true;
                 }
-            }).orNull();
+                // Go through all results to close the connection
+            }
 
-            if (existingFutureNotificationWithSameDate == null) {
+            if (!existingFutureNotificationWithSameDate) {
                 log.info("Queuing parent invoice commitment notification at {} for invoiceId {}", futureNotificationTime.toString(), invoiceId.toString());
 
                 commitInvoiceQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), futureNotificationTime,
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java
index b391d41..7542220 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -20,10 +20,6 @@ package org.killbill.billing.jaxrs.resources;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.UUID;
 
 import javax.annotation.Nullable;
@@ -90,7 +86,6 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
 import com.google.inject.Singleton;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
@@ -152,6 +147,7 @@ public class AdminResource extends JaxRsResourceBase {
                                     @QueryParam("serviceName") final String serviceName,
                                     @QueryParam("withHistory") @DefaultValue("true") final Boolean withHistory,
                                     @QueryParam("minDate") final String minDateOrNull,
+                                    @QueryParam("maxDate") final String maxDateOrNull,
                                     @QueryParam("withInProcessing") @DefaultValue("true") final Boolean withInProcessing,
                                     @QueryParam("withBusEvents") @DefaultValue("true") final Boolean withBusEvents,
                                     @QueryParam("withNotifications") @DefaultValue("true") final Boolean withNotifications,
@@ -161,7 +157,8 @@ public class AdminResource extends JaxRsResourceBase {
         final Long accountRecordId = Strings.isNullOrEmpty(accountIdStr) ? null : recordIdApi.getRecordId(UUID.fromString(accountIdStr), ObjectType.ACCOUNT, tenantContext);
 
         // Limit search results by default
-        final DateTime minDate = Strings.isNullOrEmpty(minDateOrNull) ? clock.getUTCNow().minusMonths(2) : DATE_TIME_FORMATTER.parseDateTime(minDateOrNull).toDateTime(DateTimeZone.UTC);
+        final DateTime minDate = Strings.isNullOrEmpty(minDateOrNull) ? clock.getUTCNow().minusDays(2) : DATE_TIME_FORMATTER.parseDateTime(minDateOrNull).toDateTime(DateTimeZone.UTC);
+        final DateTime maxDate = Strings.isNullOrEmpty(maxDateOrNull) ? clock.getUTCNow().plusDays(2) : DATE_TIME_FORMATTER.parseDateTime(maxDateOrNull).toDateTime(DateTimeZone.UTC);
 
         final StreamingOutput json = new StreamingOutput() {
             @Override
@@ -174,7 +171,7 @@ public class AdminResource extends JaxRsResourceBase {
                 if (withBusEvents) {
                     generator.writeFieldName("busEvents");
                     generator.writeStartArray();
-                    for (final BusEventWithMetadata<BusEvent> busEvent : getBusEvents(withInProcessing, withHistory, minDate, accountRecordId, tenantRecordId)) {
+                    for (final BusEventWithMetadata<BusEvent> busEvent : getBusEvents(withInProcessing, withHistory, minDate, maxDate, accountRecordId, tenantRecordId)) {
                         generator.writeObject(new BusEventWithRichMetadata(busEvent));
                     }
                     generator.writeEndArray();
@@ -183,7 +180,7 @@ public class AdminResource extends JaxRsResourceBase {
                 if (withNotifications) {
                     generator.writeFieldName("notifications");
                     generator.writeStartArray();
-                    for (final NotificationEventWithMetadata<NotificationEvent> notification : getNotifications(queueName, serviceName, withInProcessing, withHistory, minDate, accountRecordId, tenantRecordId)) {
+                    for (final NotificationEventWithMetadata<NotificationEvent> notification : getNotifications(queueName, serviceName, withInProcessing, withHistory, minDate, maxDate, accountRecordId, tenantRecordId)) {
                         generator.writeObject(notification);
                     }
                     generator.writeEndArray();
@@ -391,9 +388,10 @@ public class AdminResource extends JaxRsResourceBase {
                                                                                         final boolean includeInProcessing,
                                                                                         final boolean includeHistory,
                                                                                         @Nullable final DateTime minEffectiveDate,
+                                                                                        @Nullable final DateTime maxEffectiveDate,
                                                                                         @Nullable final Long accountRecordId,
                                                                                         final Long tenantRecordId) {
-        final Collection<NotificationEventWithMetadata<NotificationEvent>> notifications = new LinkedList<NotificationEventWithMetadata<NotificationEvent>>();
+        Iterable<NotificationEventWithMetadata<NotificationEvent>> notifications = ImmutableList.<NotificationEventWithMetadata<NotificationEvent>>of();
         for (final NotificationQueue notificationQueue : notificationQueueService.getNotificationQueues()) {
             if (queueName != null && !queueName.equals(notificationQueue.getQueueName())) {
                 continue;
@@ -401,75 +399,76 @@ public class AdminResource extends JaxRsResourceBase {
                 continue;
             }
 
-            final List<NotificationEventWithMetadata<NotificationEvent>> notificationsForQueue;
             if (includeInProcessing) {
                 if (accountRecordId != null) {
-                    notificationsForQueue = notificationQueue.getFutureOrInProcessingNotificationForSearchKeys(accountRecordId, tenantRecordId);
+                    notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notifications,
+                                                                                                       notificationQueue.getFutureOrInProcessingNotificationForSearchKeys(accountRecordId, tenantRecordId));
                 } else {
-                    notificationsForQueue = notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(tenantRecordId);
+                    notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notifications,
+                                                                                                       notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(maxEffectiveDate, tenantRecordId));
                 }
             } else {
                 if (accountRecordId != null) {
-                    notificationsForQueue = notificationQueue.getFutureNotificationForSearchKeys(accountRecordId, tenantRecordId);
+                    notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notifications,
+                                                                                                       notificationQueue.getFutureNotificationForSearchKeys(accountRecordId, tenantRecordId));
                 } else {
-                    notificationsForQueue = notificationQueue.getFutureNotificationForSearchKey2(tenantRecordId);
+                    notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notifications,
+                                                                                                       notificationQueue.getFutureNotificationForSearchKey2(maxEffectiveDate, tenantRecordId));
                 }
             }
 
-            notifications.addAll(notificationsForQueue);
-
             if (includeHistory) {
                 if (accountRecordId != null) {
-                    notifications.addAll(notificationQueue.getHistoricalNotificationForSearchKeys(accountRecordId, tenantRecordId));
+                    notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notificationQueue.getHistoricalNotificationForSearchKeys(accountRecordId, tenantRecordId),
+                                                                                                       notifications);
                 } else {
-                    notifications.addAll(notificationQueue.getHistoricalNotificationForSearchKey2(minEffectiveDate, tenantRecordId));
+                    notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notificationQueue.getHistoricalNotificationForSearchKey2(minEffectiveDate, tenantRecordId),
+                                                                                                       notifications);
                 }
             }
         }
 
-        return Ordering.<NotificationEventWithMetadata<NotificationEvent>>from(new Comparator<NotificationEventWithMetadata<NotificationEvent>>() {
-            @Override
-            public int compare(final NotificationEventWithMetadata<NotificationEvent> o1, final NotificationEventWithMetadata<NotificationEvent> o2) {
-                final int effectiveDateComparison = o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
-                return effectiveDateComparison == 0 ? o1.getRecordId().compareTo(o2.getRecordId()) : effectiveDateComparison;
-            }
-        }).sortedCopy(notifications);
+        // Note: entries are properly ordered by queue, but not cross queues unfortunately
+        return notifications;
     }
 
     private Iterable<BusEventWithMetadata<BusEvent>> getBusEvents(final boolean includeInProcessing,
                                                                   final boolean includeHistory,
                                                                   @Nullable final DateTime minCreatedDate,
+                                                                  @Nullable final DateTime maxCreatedDate,
                                                                   @Nullable final Long accountRecordId,
                                                                   final Long tenantRecordId) {
-        final Collection<BusEventWithMetadata<BusEvent>> busEvents = new LinkedList<BusEventWithMetadata<BusEvent>>();
+        Iterable<BusEventWithMetadata<BusEvent>> busEvents = ImmutableList.<BusEventWithMetadata<BusEvent>>of();
         if (includeInProcessing) {
             if (accountRecordId != null) {
-                busEvents.addAll(persistentBus.getAvailableOrInProcessingBusEventsForSearchKeys(accountRecordId, tenantRecordId));
+                busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(busEvents,
+                                                                             persistentBus.getAvailableOrInProcessingBusEventsForSearchKeys(accountRecordId, tenantRecordId));
             } else {
-                busEvents.addAll(persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(tenantRecordId));
+                busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(busEvents,
+                                                                             persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(maxCreatedDate, tenantRecordId));
             }
         } else {
             if (accountRecordId != null) {
-                busEvents.addAll(persistentBus.getAvailableBusEventsForSearchKeys(accountRecordId, tenantRecordId));
+                busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(busEvents,
+                                                                             persistentBus.getAvailableBusEventsForSearchKeys(accountRecordId, tenantRecordId));
             } else {
-                busEvents.addAll(persistentBus.getAvailableBusEventsForSearchKey2(tenantRecordId));
+                busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(busEvents,
+                                                                             persistentBus.getAvailableBusEventsForSearchKey2(maxCreatedDate, tenantRecordId));
             }
         }
 
         if (includeHistory) {
             if (accountRecordId != null) {
-                busEvents.addAll(persistentBus.getHistoricalBusEventsForSearchKeys(accountRecordId, tenantRecordId));
+                busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(persistentBus.getHistoricalBusEventsForSearchKeys(accountRecordId, tenantRecordId),
+                                                                             busEvents);
             } else {
-                busEvents.addAll(persistentBus.getHistoricalBusEventsForSearchKey2(minCreatedDate, tenantRecordId));
+                busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(persistentBus.getHistoricalBusEventsForSearchKey2(minCreatedDate, tenantRecordId),
+                                                                             busEvents);
             }
         }
 
-        return Ordering.<BusEventWithMetadata<BusEvent>>from(new Comparator<BusEventWithMetadata<BusEvent>>() {
-            @Override
-            public int compare(final BusEventWithMetadata<BusEvent> o1, final BusEventWithMetadata<BusEvent> o2) {
-                return o1.getRecordId().compareTo(o2.getRecordId());
-            }
-        }).sortedCopy(busEvents);
+        // Note: entries are properly ordered by queue, but not cross queues unfortunately
+        return busEvents;
     }
 
     private class BusEventWithRichMetadata extends BusEventWithMetadata<BusEvent> {
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
index 9e1f704..261238a 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -18,9 +18,6 @@
 
 package org.killbill.billing.jaxrs.resources;
 
-import java.util.Collection;
-import java.util.List;
-
 import javax.inject.Inject;
 import javax.servlet.ServletRequest;
 import javax.servlet.http.HttpServletRequest;
@@ -61,9 +58,7 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -237,30 +232,27 @@ public class TestResource extends JaxRsResourceBase {
     }
 
     private boolean areAllNotificationsProcessed(final Long tenantRecordId) {
-        final Collection<NotificationQueue> filtered = ImmutableList.<NotificationQueue>copyOf(Collections2.<NotificationQueue>filter(notificationQueueService.getNotificationQueues(),
-                                                                                                                                      new Predicate<NotificationQueue>() {
-                                                                                                                                          @Override
-                                                                                                                                          public boolean apply(final NotificationQueue notificationQueue) {
-                                                                                                                                              for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(tenantRecordId)) {
-                                                                                                                                                  if (!notificationEvent.getEffectiveDate().isAfter(clock.getUTCNow())) {
-                                                                                                                                                      return true;
-                                                                                                                                                  }
-                                                                                                                                              }
-                                                                                                                                              return false;
-                                                                                                                                          }
-                                                                                                                                      }));
-        if (!filtered.isEmpty()) {
-            log.info("TestResource: {} queue(s) with more notification(s) to process", filtered.size());
+        int nbNotifications = 0;
+        for (final NotificationQueue notificationQueue : notificationQueueService.getNotificationQueues()) {
+            for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(null, tenantRecordId)) {
+                if (!notificationEvent.getEffectiveDate().isAfter(clock.getUTCNow())) {
+                    nbNotifications += 1;
+                }
+            }
+        }
+        if (nbNotifications != 0) {
+            log.info("TestResource: {} queue(s) with more notification(s) to process", nbNotifications);
         }
-        return filtered.isEmpty();
+        return nbNotifications == 0;
     }
 
     private boolean areAllBusEventsProcessed(final Long tenantRecordId) {
-        final List<BusEventWithMetadata<BusEvent>> availableBusEventForSearchKey2 = persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(tenantRecordId);
-        if (!availableBusEventForSearchKey2.isEmpty()) {
-            log.info("TestResource: at least {} more bus event(s) to process", availableBusEventForSearchKey2.size());
+        final Iterable<BusEventWithMetadata<BusEvent>> availableBusEventForSearchKey2 = persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(null, tenantRecordId);
+        final int nbBusEvents = Iterables.<BusEventWithMetadata<BusEvent>>size(availableBusEventForSearchKey2);
+        if (nbBusEvents != 0) {
+            log.info("TestResource: at least {} more bus event(s) to process", nbBusEvents);
         }
-        return availableBusEventForSearchKey2.isEmpty();
+        return nbBusEvents == 0;
     }
 
     private ClockMock getClockMock() {
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java b/overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java
index 463d08c..d06c167 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -18,7 +18,6 @@
 
 package org.killbill.billing.overdue.notification;
 
-import java.util.Collection;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -66,8 +65,8 @@ public abstract class DefaultOverduePosterBase implements OverduePoster {
                 public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
                     // Check if we already have notifications for that key
                     final Class<T> clazz = (Class<T>) notificationKey.getClass();
-                    final Collection<NotificationEventWithMetadata<T>> futureNotifications = getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, overdueQueue,
-                                                                                                                                           clazz, context);
+                    final Iterable<NotificationEventWithMetadata<T>> futureNotifications = getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, overdueQueue,
+                                                                                                                                         clazz, context);
 
                     final boolean shouldInsertNewNotification = cleanupFutureNotificationsFormTransaction(entitySqlDaoWrapperFactory, futureNotifications, futureNotificationTime, overdueQueue);
                     if (shouldInsertNewNotification) {
@@ -92,8 +91,8 @@ public abstract class DefaultOverduePosterBase implements OverduePoster {
             transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
                 @Override
                 public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
-                    final Collection<NotificationEventWithMetadata<T>> futureNotifications = getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, checkOverdueQueue,
-                                                                                                                                           clazz, context);
+                    final Iterable<NotificationEventWithMetadata<T>> futureNotifications = getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, checkOverdueQueue,
+                                                                                                                                         clazz, context);
                     for (final NotificationEventWithMetadata<T> notification : futureNotifications) {
                         checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), notification.getRecordId());
                     }
@@ -107,15 +106,15 @@ public abstract class DefaultOverduePosterBase implements OverduePoster {
     }
 
     @VisibleForTesting
-    <T extends OverdueCheckNotificationKey> Collection<NotificationEventWithMetadata<T>> getFutureNotificationsForAccountInTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
-                                                                                                                                       final NotificationQueue checkOverdueQueue,
-                                                                                                                                       final Class<T> clazz,
-                                                                                                                                       final InternalCallContext context) {
+    <T extends OverdueCheckNotificationKey> Iterable<NotificationEventWithMetadata<T>> getFutureNotificationsForAccountInTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
+                                                                                                                                     final NotificationQueue checkOverdueQueue,
+                                                                                                                                     final Class<T> clazz,
+                                                                                                                                     final InternalCallContext context) {
         return checkOverdueQueue.getFutureNotificationFromTransactionForSearchKeys(context.getAccountRecordId(), context.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
     }
 
     protected abstract <T extends OverdueCheckNotificationKey> boolean cleanupFutureNotificationsFormTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
-                                                                                                                 final Collection<NotificationEventWithMetadata<T>> futureNotifications,
+                                                                                                                 final Iterable<NotificationEventWithMetadata<T>> futureNotifications,
                                                                                                                  final DateTime futureNotificationTime, final NotificationQueue overdueQueue);
 
 }
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java b/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java
index d6b7a44..124fe69 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -18,8 +18,6 @@
 
 package org.killbill.billing.overdue.notification;
 
-import java.util.Collection;
-
 import org.joda.time.DateTime;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
@@ -31,6 +29,7 @@ import org.killbill.notificationq.api.NotificationQueue;
 import org.killbill.notificationq.api.NotificationQueueService;
 import org.skife.jdbi.v2.IDBI;
 
+import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 
 public class OverdueAsyncBusPoster extends DefaultOverduePosterBase {
@@ -44,12 +43,13 @@ public class OverdueAsyncBusPoster extends DefaultOverduePosterBase {
 
     @Override
     protected <T extends OverdueCheckNotificationKey> boolean cleanupFutureNotificationsFormTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
-                                                                                                        final Collection<NotificationEventWithMetadata<T>> futureNotifications,
+                                                                                                        final Iterable<NotificationEventWithMetadata<T>> futureNotifications,
                                                                                                         final DateTime futureNotificationTime,
                                                                                                         final NotificationQueue overdueQueue) {
         // If we already have notification for that account we don't insert the new one
         // Note that this is slightly incorrect because we could for instance already have a REFRESH and insert a CLEAR, but if that were the case,
         // if means overdue state would change very rapidly and the behavior would anyway be non deterministic
-        return futureNotifications.isEmpty();
+        // Note: don't use isEmpty() to go through all results to close the connection
+        return Iterables.<NotificationEventWithMetadata<T>>size(futureNotifications) == 0;
     }
 }
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueCheckPoster.java b/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueCheckPoster.java
index 1119e53..25d7eff 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueCheckPoster.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueCheckPoster.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -18,8 +18,6 @@
 
 package org.killbill.billing.overdue.notification;
 
-import java.util.Collection;
-
 import org.joda.time.DateTime;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
@@ -44,32 +42,31 @@ public class OverdueCheckPoster extends DefaultOverduePosterBase {
 
     @Override
     protected <T extends OverdueCheckNotificationKey> boolean cleanupFutureNotificationsFormTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
-                                                                                                        final Collection<NotificationEventWithMetadata<T>> futureNotifications,
+                                                                                                        final Iterable<NotificationEventWithMetadata<T>> futureNotifications,
                                                                                                         final DateTime futureNotificationTime, final NotificationQueue overdueQueue) {
 
         boolean shouldInsertNewNotification = true;
-        if (!futureNotifications.isEmpty()) {
+        int minIndexToDeleteFrom = 0;
+        int index = 0;
+        for (final NotificationEventWithMetadata<T> cur : futureNotifications) {
             // Results are ordered by effective date asc
-            final DateTime earliestExistingNotificationDate = futureNotifications.iterator().next().getEffectiveDate();
-
-            final int minIndexToDeleteFrom;
-            if (earliestExistingNotificationDate.isBefore(futureNotificationTime)) {
-                // We don't have to insert a new one. For sanity, delete any other future notification
-                minIndexToDeleteFrom = 1;
-                shouldInsertNewNotification = false;
-            } else {
-                // We win - we are before any other already recorded. Delete all others.
-                minIndexToDeleteFrom = 0;
+            if (index == 0) {
+                if (cur.getEffectiveDate().isBefore(futureNotificationTime)) {
+                    // We don't have to insert a new one. For sanity, delete any other future notification
+                    minIndexToDeleteFrom = 1;
+                    shouldInsertNewNotification = false;
+                } else {
+                    // We win - we are before any other already recorded. Delete all others.
+                    minIndexToDeleteFrom = 0;
+                }
             }
 
-            int index = 0;
-            for (final NotificationEventWithMetadata<T> cur : futureNotifications) {
-                if (minIndexToDeleteFrom <= index) {
-                    overdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), cur.getRecordId());
-                }
-                index++;
+            if (minIndexToDeleteFrom <= index) {
+                overdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), cur.getRecordId());
             }
+            index++;
         }
+
         return shouldInsertNewNotification;
     }
 }
diff --git a/overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java b/overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java
index 03e91fa..3f8cd00 100644
--- a/overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java
+++ b/overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -17,24 +19,26 @@
 package org.killbill.billing.overdue.notification;
 
 import java.io.IOException;
-import java.util.Collection;
+import java.util.List;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
 import org.killbill.billing.account.api.Account;
-import org.killbill.notificationq.api.NotificationEventWithMetadata;
-import org.killbill.notificationq.api.NotificationQueue;
 import org.killbill.billing.overdue.OverdueTestSuiteWithEmbeddedDB;
 import org.killbill.billing.overdue.service.DefaultOverdueService;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import org.killbill.billing.util.jackson.ObjectMapper;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 
 public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedDB {
 
@@ -68,7 +72,7 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
         insertOverdueCheckAndVerifyQueueContent(overdueable, 15, 5);
 
         // Verify the final content of the queue
-        Assert.assertEquals(overdueQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()).size(), 1);
+        Assert.assertEquals(Iterables.<NotificationEventWithMetadata>size(overdueQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId())), 1);
     }
 
     private void insertOverdueCheckAndVerifyQueueContent(final Account account, final int nbDaysInFuture, final int expectedNbDaysInFuture) throws IOException {
@@ -77,18 +81,18 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
         final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(account.getId());
         checkPoster.insertOverdueNotification(account.getId(), futureNotificationTime, OverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE, notificationKey, internalCallContext);
 
-        final Collection<NotificationEventWithMetadata<OverdueCheckNotificationKey>> notificationsForKey = getNotificationsForOverdueable(account);
+        final List<NotificationEventWithMetadata<OverdueCheckNotificationKey>> notificationsForKey = getNotificationsForOverdueable(account);
         Assert.assertEquals(notificationsForKey.size(), 1);
-        final NotificationEventWithMetadata nm = notificationsForKey.iterator().next();
+        final NotificationEventWithMetadata nm = notificationsForKey.get(0);
         Assert.assertEquals(nm.getEvent(), notificationKey);
         Assert.assertEquals(nm.getEffectiveDate(), testReferenceTime.plusDays(expectedNbDaysInFuture));
     }
 
-    private Collection<NotificationEventWithMetadata<OverdueCheckNotificationKey>> getNotificationsForOverdueable(final Account account) {
-        return entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Collection<NotificationEventWithMetadata<OverdueCheckNotificationKey>>>() {
+    private List<NotificationEventWithMetadata<OverdueCheckNotificationKey>> getNotificationsForOverdueable(final Account account) {
+        return entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<List<NotificationEventWithMetadata<OverdueCheckNotificationKey>>>() {
             @Override
-            public Collection<NotificationEventWithMetadata<OverdueCheckNotificationKey>> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
-                return ((OverdueCheckPoster)checkPoster).getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, overdueQueue, OverdueCheckNotificationKey.class, internalCallContext);
+            public List<NotificationEventWithMetadata<OverdueCheckNotificationKey>> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
+                return ImmutableList.<NotificationEventWithMetadata<OverdueCheckNotificationKey>>copyOf(((OverdueCheckPoster) checkPoster).getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, overdueQueue, OverdueCheckNotificationKey.class, internalCallContext));
             }
         });
     }
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
index f4a0491..b35e99a 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -408,14 +408,14 @@ public class PaymentProcessor extends ProcessorBase {
     public void cancelScheduledPaymentTransaction(final UUID lastPaymentAttemptId, final InternalCallContext internalCallContext) throws PaymentApiException {
         try {
             final NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, DefaultRetryService.QUEUE_NAME);
-            final List<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
+            final Iterable<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
                     retryQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
 
             for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationEventWithMetadatas) {
                 if (((PaymentRetryNotificationKey) notificationEvent.getEvent()).getAttemptId().equals(lastPaymentAttemptId)) {
                     retryQueue.removeNotification(notificationEvent.getRecordId());
-                    break;
                 }
+                // Go through all results to close the connection
             }
         } catch (final NoSuchNotificationQueue noSuchNotificationQueue) {
             log.error("ERROR Loading Notification Queue - " + noSuchNotificationQueue.getMessage());
@@ -767,7 +767,7 @@ public class PaymentProcessor extends ProcessorBase {
         // Get Future Payment Attempts from Notification Queue and add them to the list
         try {
             final NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, DefaultRetryService.QUEUE_NAME);
-            final List<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
+            final Iterable<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
                     retryQueue.getFutureNotificationForSearchKeys(internalTenantContext.getAccountRecordId(), internalTenantContext.getTenantRecordId());
 
             for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationEventWithMetadatas) {
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
index 5a137ac..191ed5c 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -40,6 +40,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 
 public class TestIncompletePaymentTransactionTaskWithDB extends PaymentTestSuiteWithEmbeddedDB {
 
@@ -77,7 +78,7 @@ public class TestIncompletePaymentTransactionTaskWithDB extends PaymentTestSuite
         final JanitorNotificationKey notificationKey = new JanitorNotificationKey(transactionId, incompletePaymentTransactionTask.getClass().toString(), 1);
         final UUID userToken = UUID.randomUUID();
 
-        Assert.assertTrue(incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()).isEmpty());
+        Assert.assertTrue(Iterables.<NotificationEventWithMetadata<NotificationEvent>>isEmpty(incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId())));
 
         GlobalLock lock = null;
         try {
@@ -85,16 +86,17 @@ public class TestIncompletePaymentTransactionTaskWithDB extends PaymentTestSuite
 
             incompletePaymentTransactionTask.processNotification(notificationKey, userToken, internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
 
-            final List<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
-            Assert.assertFalse(futureNotifications.isEmpty());
-            Assert.assertEquals(futureNotifications.get(0).getUserToken(), userToken);
-            Assert.assertEquals(futureNotifications.get(0).getEvent().getClass(), JanitorNotificationKey.class);
-            final JanitorNotificationKey event = (JanitorNotificationKey) futureNotifications.get(0).getEvent();
+            final Iterable<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+            Assert.assertFalse(Iterables.<NotificationEventWithMetadata<NotificationEvent>>isEmpty(futureNotifications));
+            final NotificationEventWithMetadata<NotificationEvent> notificationEventWithMetadata = ImmutableList.<NotificationEventWithMetadata<NotificationEvent>>copyOf(futureNotifications).get(0);
+            Assert.assertEquals(notificationEventWithMetadata.getUserToken(), userToken);
+            Assert.assertEquals(notificationEventWithMetadata.getEvent().getClass(), JanitorNotificationKey.class);
+            final JanitorNotificationKey event = (JanitorNotificationKey) notificationEventWithMetadata.getEvent();
             Assert.assertEquals(event.getUuidKey(), transactionId);
             Assert.assertEquals((int) event.getAttemptNumber(), 2);
 
             // Based on config "15s,1m,3m,1h,1d,1d,1d,1d,1d"
-            Assert.assertTrue(futureNotifications.get(0).getEffectiveDate().compareTo(clock.getUTCNow().plusMinutes(1).plusSeconds(1)) < 0);
+            Assert.assertTrue(notificationEventWithMetadata.getEffectiveDate().compareTo(clock.getUTCNow().plusMinutes(1).plusSeconds(1)) < 0);
         } catch (final LockFailedException e) {
             Assert.fail();
         } finally {
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
index b8d044b..0fcfb7f 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -71,6 +71,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 
 import static com.jayway.awaitility.Awaitility.await;
@@ -508,12 +509,14 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
             await().atMost(timeoutSec, SECONDS).until(new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws Exception {
+                    boolean completed = true;
                     for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId())) {
                         if (!notificationEvent.getEffectiveDate().isAfter(clock.getUTCNow())) {
-                            return false;
+                            completed = false;
                         }
+                        // Go through all results to close the connection
                     }
-                    return true;
+                    return completed;
                 }
             });
         } catch (final Exception e) {
@@ -523,7 +526,7 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
 
     private int getPendingNotificationCnt(final InternalCallContext internalCallContext) {
         try {
-            return notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()).size();
+            return Iterables.<NotificationEventWithMetadata>size(notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
         } catch (final Exception e) {
             fail("Test failed ", e);
         }
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java b/subscription/src/main/java/org/killbill/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java
index c72cf42..ccc67eb 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -731,7 +731,7 @@ public class DefaultSubscriptionInternalApi extends SubscriptionApiBase implemen
         try {
             final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultSubscriptionBaseService.SUBSCRIPTION_SERVICE_NAME,
                                                                                                       DefaultSubscriptionBaseService.NOTIFICATION_QUEUE_NAME);
-            final List<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+            final Iterable<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
             return Iterables.transform(futureNotifications, new Function<NotificationEventWithMetadata<NotificationEvent>, DateTime>() {
                 @Nullable
                 @Override