killbill-memoizeit
Changes
invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java 39(+19 -20)
invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java 26(+12 -14)
overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java 23(+11 -12)
overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java 12(+6 -6)
overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java 36(+20 -16)
Details
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
index 590f3f1..cc85579 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -39,7 +40,6 @@ import org.killbill.billing.ErrorCode;
import org.killbill.billing.ObjectType;
import org.killbill.billing.account.api.Account;
import org.killbill.billing.account.api.AccountApiException;
-import org.killbill.billing.account.api.AccountData;
import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.account.api.ImmutableAccountData;
import org.killbill.billing.callcontext.InternalCallContext;
@@ -695,26 +695,21 @@ public class InvoiceDispatcher {
try {
final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
- final List<NotificationEventWithMetadata<NextBillingDateNotificationKey>> futureNotifications = notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+ final Iterable<NotificationEventWithMetadata<NextBillingDateNotificationKey>> futureNotifications = notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
- final Iterable<NotificationEventWithMetadata<NextBillingDateNotificationKey>> allUpcomingEvents = Iterables.filter(futureNotifications, new Predicate<NotificationEventWithMetadata<NextBillingDateNotificationKey>>() {
- @Override
- public boolean apply(@Nullable final NotificationEventWithMetadata<NextBillingDateNotificationKey> input) {
- final boolean isEventForSubscription = !filteredSubscriptionIds.iterator().hasNext() || Iterables.contains(filteredSubscriptionIds, input.getEvent().getUuidKey());
+ final Collection<DateTime> effectiveDates = new LinkedList<DateTime>();
+ for (final NotificationEventWithMetadata<NextBillingDateNotificationKey> input : futureNotifications) {
+ final boolean isEventForSubscription = !filteredSubscriptionIds.iterator().hasNext() || Iterables.contains(filteredSubscriptionIds, input.getEvent().getUuidKey());
- final boolean isEventDryRunForNotifications = input.getEvent().isDryRunForInvoiceNotification() != null ?
- input.getEvent().isDryRunForInvoiceNotification() : false;
- return isEventForSubscription && !isEventDryRunForNotifications;
+ final boolean isEventDryRunForNotifications = input.getEvent().isDryRunForInvoiceNotification() != null ?
+ input.getEvent().isDryRunForInvoiceNotification() : false;
+ if (isEventForSubscription && !isEventDryRunForNotifications) {
+ effectiveDates.add(input.getEffectiveDate());
}
- });
- return Iterables.transform(allUpcomingEvents, new Function<NotificationEventWithMetadata<NextBillingDateNotificationKey>, DateTime>() {
- @Nullable
- @Override
- public DateTime apply(@Nullable final NotificationEventWithMetadata<NextBillingDateNotificationKey> input) {
- return input.getEffectiveDate();
- }
- });
+ }
+
+ return effectiveDates;
} catch (final NoSuchNotificationQueue noSuchNotificationQueue) {
throw new IllegalStateException(noSuchNotificationQueue);
}
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java
index 9014bdb..6dac86e 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -19,7 +19,6 @@
package org.killbill.billing.invoice.notification;
import java.io.IOException;
-import java.util.List;
import java.util.UUID;
import org.joda.time.DateTime;
@@ -34,8 +33,6 @@ import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificatio
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
import com.google.inject.Inject;
public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
@@ -71,23 +68,25 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
// If we see existing notification for the same date (and isDryRunForInvoiceNotification mode), we don't insert a new notification
- final List<NotificationEventWithMetadata<NextBillingDateNotificationKey>> futureNotifications = nextBillingQueue.getFutureNotificationFromTransactionForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
- final NotificationEventWithMetadata<NextBillingDateNotificationKey> existingFutureNotificationWithSameDate = Iterables.tryFind(futureNotifications, new Predicate<NotificationEventWithMetadata<NextBillingDateNotificationKey>>() {
- @Override
- public boolean apply(final NotificationEventWithMetadata<NextBillingDateNotificationKey> input) {
- final boolean isEventDryRunForNotifications = input.getEvent().isDryRunForInvoiceNotification() != null ?
- input.getEvent().isDryRunForInvoiceNotification() : false;
-
- final LocalDate notificationEffectiveLocaleDate = internalCallContext.toLocalDate(futureNotificationTime);
- final LocalDate eventEffectiveLocaleDate = internalCallContext.toLocalDate(input.getEffectiveDate());
-
- return notificationEffectiveLocaleDate.compareTo(eventEffectiveLocaleDate) == 0 &&
- ((isDryRunForInvoiceNotification && isEventDryRunForNotifications) ||
- (!isDryRunForInvoiceNotification && !isEventDryRunForNotifications));
+ final Iterable<NotificationEventWithMetadata<NextBillingDateNotificationKey>> futureNotifications = nextBillingQueue.getFutureNotificationFromTransactionForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
+
+ boolean existingFutureNotificationWithSameDate = false;
+ for (final NotificationEventWithMetadata<NextBillingDateNotificationKey> input : futureNotifications) {
+ final boolean isEventDryRunForNotifications = input.getEvent().isDryRunForInvoiceNotification() != null ?
+ input.getEvent().isDryRunForInvoiceNotification() : false;
+
+ final LocalDate notificationEffectiveLocaleDate = internalCallContext.toLocalDate(futureNotificationTime);
+ final LocalDate eventEffectiveLocaleDate = internalCallContext.toLocalDate(input.getEffectiveDate());
+
+ if (notificationEffectiveLocaleDate.compareTo(eventEffectiveLocaleDate) == 0 &&
+ ((isDryRunForInvoiceNotification && isEventDryRunForNotifications) ||
+ (!isDryRunForInvoiceNotification && !isEventDryRunForNotifications))) {
+ existingFutureNotificationWithSameDate = true;
}
- }).orNull();
+ // Go through all results to close the connection
+ }
- if (existingFutureNotificationWithSameDate == null) {
+ if (!existingFutureNotificationWithSameDate) {
log.info("Queuing next billing date notification at {} for subscriptionId {}", futureNotificationTime.toString(), subscriptionId.toString());
nextBillingQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), futureNotificationTime,
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java b/invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java
index afd8816..43870c3 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java
@@ -1,6 +1,6 @@
/*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -18,7 +18,6 @@
package org.killbill.billing.invoice.notification;
import java.io.IOException;
-import java.util.List;
import java.util.UUID;
import org.joda.time.DateTime;
@@ -33,8 +32,6 @@ import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificatio
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
import com.google.inject.Inject;
public class ParentInvoiceCommitmentPoster {
@@ -58,19 +55,20 @@ public class ParentInvoiceCommitmentPoster {
ParentInvoiceCommitmentNotifier.PARENT_INVOICE_COMMITMENT_NOTIFIER_QUEUE);
// If we see existing notification for the same date we don't insert a new notification
- final List<NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey>> futureNotifications = commitInvoiceQueue.getFutureNotificationFromTransactionForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
- final NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey> existingFutureNotificationWithSameDate = Iterables.tryFind(futureNotifications, new Predicate<NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey>>() {
- @Override
- public boolean apply(final NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey> input) {
+ final Iterable<NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey>> futureNotifications = commitInvoiceQueue.getFutureNotificationFromTransactionForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
- final LocalDate notificationEffectiveLocaleDate = internalCallContext.toLocalDate(futureNotificationTime);
- final LocalDate eventEffectiveLocaleDate = internalCallContext.toLocalDate(input.getEffectiveDate());
+ boolean existingFutureNotificationWithSameDate = false;
+ for (final NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey> input : futureNotifications) {
+ final LocalDate notificationEffectiveLocaleDate = internalCallContext.toLocalDate(futureNotificationTime);
+ final LocalDate eventEffectiveLocaleDate = internalCallContext.toLocalDate(input.getEffectiveDate());
- return notificationEffectiveLocaleDate.compareTo(eventEffectiveLocaleDate) == 0;
+ if (notificationEffectiveLocaleDate.compareTo(eventEffectiveLocaleDate) == 0) {
+ existingFutureNotificationWithSameDate = true;
}
- }).orNull();
+ // Go through all results to close the connection
+ }
- if (existingFutureNotificationWithSameDate == null) {
+ if (!existingFutureNotificationWithSameDate) {
log.info("Queuing parent invoice commitment notification at {} for invoiceId {}", futureNotificationTime.toString(), invoiceId.toString());
commitInvoiceQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), futureNotificationTime,
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java
index b391d41..7542220 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java
@@ -1,6 +1,6 @@
/*
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -20,10 +20,6 @@ package org.killbill.billing.jaxrs.resources;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
import java.util.UUID;
import javax.annotation.Nullable;
@@ -90,7 +86,6 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
import com.google.inject.Singleton;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@@ -152,6 +147,7 @@ public class AdminResource extends JaxRsResourceBase {
@QueryParam("serviceName") final String serviceName,
@QueryParam("withHistory") @DefaultValue("true") final Boolean withHistory,
@QueryParam("minDate") final String minDateOrNull,
+ @QueryParam("maxDate") final String maxDateOrNull,
@QueryParam("withInProcessing") @DefaultValue("true") final Boolean withInProcessing,
@QueryParam("withBusEvents") @DefaultValue("true") final Boolean withBusEvents,
@QueryParam("withNotifications") @DefaultValue("true") final Boolean withNotifications,
@@ -161,7 +157,8 @@ public class AdminResource extends JaxRsResourceBase {
final Long accountRecordId = Strings.isNullOrEmpty(accountIdStr) ? null : recordIdApi.getRecordId(UUID.fromString(accountIdStr), ObjectType.ACCOUNT, tenantContext);
// Limit search results by default
- final DateTime minDate = Strings.isNullOrEmpty(minDateOrNull) ? clock.getUTCNow().minusMonths(2) : DATE_TIME_FORMATTER.parseDateTime(minDateOrNull).toDateTime(DateTimeZone.UTC);
+ final DateTime minDate = Strings.isNullOrEmpty(minDateOrNull) ? clock.getUTCNow().minusDays(2) : DATE_TIME_FORMATTER.parseDateTime(minDateOrNull).toDateTime(DateTimeZone.UTC);
+ final DateTime maxDate = Strings.isNullOrEmpty(maxDateOrNull) ? clock.getUTCNow().plusDays(2) : DATE_TIME_FORMATTER.parseDateTime(maxDateOrNull).toDateTime(DateTimeZone.UTC);
final StreamingOutput json = new StreamingOutput() {
@Override
@@ -174,7 +171,7 @@ public class AdminResource extends JaxRsResourceBase {
if (withBusEvents) {
generator.writeFieldName("busEvents");
generator.writeStartArray();
- for (final BusEventWithMetadata<BusEvent> busEvent : getBusEvents(withInProcessing, withHistory, minDate, accountRecordId, tenantRecordId)) {
+ for (final BusEventWithMetadata<BusEvent> busEvent : getBusEvents(withInProcessing, withHistory, minDate, maxDate, accountRecordId, tenantRecordId)) {
generator.writeObject(new BusEventWithRichMetadata(busEvent));
}
generator.writeEndArray();
@@ -183,7 +180,7 @@ public class AdminResource extends JaxRsResourceBase {
if (withNotifications) {
generator.writeFieldName("notifications");
generator.writeStartArray();
- for (final NotificationEventWithMetadata<NotificationEvent> notification : getNotifications(queueName, serviceName, withInProcessing, withHistory, minDate, accountRecordId, tenantRecordId)) {
+ for (final NotificationEventWithMetadata<NotificationEvent> notification : getNotifications(queueName, serviceName, withInProcessing, withHistory, minDate, maxDate, accountRecordId, tenantRecordId)) {
generator.writeObject(notification);
}
generator.writeEndArray();
@@ -391,9 +388,10 @@ public class AdminResource extends JaxRsResourceBase {
final boolean includeInProcessing,
final boolean includeHistory,
@Nullable final DateTime minEffectiveDate,
+ @Nullable final DateTime maxEffectiveDate,
@Nullable final Long accountRecordId,
final Long tenantRecordId) {
- final Collection<NotificationEventWithMetadata<NotificationEvent>> notifications = new LinkedList<NotificationEventWithMetadata<NotificationEvent>>();
+ Iterable<NotificationEventWithMetadata<NotificationEvent>> notifications = ImmutableList.<NotificationEventWithMetadata<NotificationEvent>>of();
for (final NotificationQueue notificationQueue : notificationQueueService.getNotificationQueues()) {
if (queueName != null && !queueName.equals(notificationQueue.getQueueName())) {
continue;
@@ -401,75 +399,76 @@ public class AdminResource extends JaxRsResourceBase {
continue;
}
- final List<NotificationEventWithMetadata<NotificationEvent>> notificationsForQueue;
if (includeInProcessing) {
if (accountRecordId != null) {
- notificationsForQueue = notificationQueue.getFutureOrInProcessingNotificationForSearchKeys(accountRecordId, tenantRecordId);
+ notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notifications,
+ notificationQueue.getFutureOrInProcessingNotificationForSearchKeys(accountRecordId, tenantRecordId));
} else {
- notificationsForQueue = notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(tenantRecordId);
+ notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notifications,
+ notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(maxEffectiveDate, tenantRecordId));
}
} else {
if (accountRecordId != null) {
- notificationsForQueue = notificationQueue.getFutureNotificationForSearchKeys(accountRecordId, tenantRecordId);
+ notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notifications,
+ notificationQueue.getFutureNotificationForSearchKeys(accountRecordId, tenantRecordId));
} else {
- notificationsForQueue = notificationQueue.getFutureNotificationForSearchKey2(tenantRecordId);
+ notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notifications,
+ notificationQueue.getFutureNotificationForSearchKey2(maxEffectiveDate, tenantRecordId));
}
}
- notifications.addAll(notificationsForQueue);
-
if (includeHistory) {
if (accountRecordId != null) {
- notifications.addAll(notificationQueue.getHistoricalNotificationForSearchKeys(accountRecordId, tenantRecordId));
+ notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notificationQueue.getHistoricalNotificationForSearchKeys(accountRecordId, tenantRecordId),
+ notifications);
} else {
- notifications.addAll(notificationQueue.getHistoricalNotificationForSearchKey2(minEffectiveDate, tenantRecordId));
+ notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notificationQueue.getHistoricalNotificationForSearchKey2(minEffectiveDate, tenantRecordId),
+ notifications);
}
}
}
- return Ordering.<NotificationEventWithMetadata<NotificationEvent>>from(new Comparator<NotificationEventWithMetadata<NotificationEvent>>() {
- @Override
- public int compare(final NotificationEventWithMetadata<NotificationEvent> o1, final NotificationEventWithMetadata<NotificationEvent> o2) {
- final int effectiveDateComparison = o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
- return effectiveDateComparison == 0 ? o1.getRecordId().compareTo(o2.getRecordId()) : effectiveDateComparison;
- }
- }).sortedCopy(notifications);
+ // Note: entries are properly ordered by queue, but not cross queues unfortunately
+ return notifications;
}
private Iterable<BusEventWithMetadata<BusEvent>> getBusEvents(final boolean includeInProcessing,
final boolean includeHistory,
@Nullable final DateTime minCreatedDate,
+ @Nullable final DateTime maxCreatedDate,
@Nullable final Long accountRecordId,
final Long tenantRecordId) {
- final Collection<BusEventWithMetadata<BusEvent>> busEvents = new LinkedList<BusEventWithMetadata<BusEvent>>();
+ Iterable<BusEventWithMetadata<BusEvent>> busEvents = ImmutableList.<BusEventWithMetadata<BusEvent>>of();
if (includeInProcessing) {
if (accountRecordId != null) {
- busEvents.addAll(persistentBus.getAvailableOrInProcessingBusEventsForSearchKeys(accountRecordId, tenantRecordId));
+ busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(busEvents,
+ persistentBus.getAvailableOrInProcessingBusEventsForSearchKeys(accountRecordId, tenantRecordId));
} else {
- busEvents.addAll(persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(tenantRecordId));
+ busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(busEvents,
+ persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(maxCreatedDate, tenantRecordId));
}
} else {
if (accountRecordId != null) {
- busEvents.addAll(persistentBus.getAvailableBusEventsForSearchKeys(accountRecordId, tenantRecordId));
+ busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(busEvents,
+ persistentBus.getAvailableBusEventsForSearchKeys(accountRecordId, tenantRecordId));
} else {
- busEvents.addAll(persistentBus.getAvailableBusEventsForSearchKey2(tenantRecordId));
+ busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(busEvents,
+ persistentBus.getAvailableBusEventsForSearchKey2(maxCreatedDate, tenantRecordId));
}
}
if (includeHistory) {
if (accountRecordId != null) {
- busEvents.addAll(persistentBus.getHistoricalBusEventsForSearchKeys(accountRecordId, tenantRecordId));
+ busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(persistentBus.getHistoricalBusEventsForSearchKeys(accountRecordId, tenantRecordId),
+ busEvents);
} else {
- busEvents.addAll(persistentBus.getHistoricalBusEventsForSearchKey2(minCreatedDate, tenantRecordId));
+ busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(persistentBus.getHistoricalBusEventsForSearchKey2(minCreatedDate, tenantRecordId),
+ busEvents);
}
}
- return Ordering.<BusEventWithMetadata<BusEvent>>from(new Comparator<BusEventWithMetadata<BusEvent>>() {
- @Override
- public int compare(final BusEventWithMetadata<BusEvent> o1, final BusEventWithMetadata<BusEvent> o2) {
- return o1.getRecordId().compareTo(o2.getRecordId());
- }
- }).sortedCopy(busEvents);
+ // Note: entries are properly ordered by queue, but not cross queues unfortunately
+ return busEvents;
}
private class BusEventWithRichMetadata extends BusEventWithMetadata<BusEvent> {
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
index 9e1f704..261238a 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -18,9 +18,6 @@
package org.killbill.billing.jaxrs.resources;
-import java.util.Collection;
-import java.util.List;
-
import javax.inject.Inject;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServletRequest;
@@ -61,9 +58,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
@@ -237,30 +232,27 @@ public class TestResource extends JaxRsResourceBase {
}
private boolean areAllNotificationsProcessed(final Long tenantRecordId) {
- final Collection<NotificationQueue> filtered = ImmutableList.<NotificationQueue>copyOf(Collections2.<NotificationQueue>filter(notificationQueueService.getNotificationQueues(),
- new Predicate<NotificationQueue>() {
- @Override
- public boolean apply(final NotificationQueue notificationQueue) {
- for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(tenantRecordId)) {
- if (!notificationEvent.getEffectiveDate().isAfter(clock.getUTCNow())) {
- return true;
- }
- }
- return false;
- }
- }));
- if (!filtered.isEmpty()) {
- log.info("TestResource: {} queue(s) with more notification(s) to process", filtered.size());
+ int nbNotifications = 0;
+ for (final NotificationQueue notificationQueue : notificationQueueService.getNotificationQueues()) {
+ for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(null, tenantRecordId)) {
+ if (!notificationEvent.getEffectiveDate().isAfter(clock.getUTCNow())) {
+ nbNotifications += 1;
+ }
+ }
+ }
+ if (nbNotifications != 0) {
+ log.info("TestResource: {} queue(s) with more notification(s) to process", nbNotifications);
}
- return filtered.isEmpty();
+ return nbNotifications == 0;
}
private boolean areAllBusEventsProcessed(final Long tenantRecordId) {
- final List<BusEventWithMetadata<BusEvent>> availableBusEventForSearchKey2 = persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(tenantRecordId);
- if (!availableBusEventForSearchKey2.isEmpty()) {
- log.info("TestResource: at least {} more bus event(s) to process", availableBusEventForSearchKey2.size());
+ final Iterable<BusEventWithMetadata<BusEvent>> availableBusEventForSearchKey2 = persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(null, tenantRecordId);
+ final int nbBusEvents = Iterables.<BusEventWithMetadata<BusEvent>>size(availableBusEventForSearchKey2);
+ if (nbBusEvents != 0) {
+ log.info("TestResource: at least {} more bus event(s) to process", nbBusEvents);
}
- return availableBusEventForSearchKey2.isEmpty();
+ return nbBusEvents == 0;
}
private ClockMock getClockMock() {
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java b/overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java
index 463d08c..d06c167 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -18,7 +18,6 @@
package org.killbill.billing.overdue.notification;
-import java.util.Collection;
import java.util.UUID;
import org.joda.time.DateTime;
@@ -66,8 +65,8 @@ public abstract class DefaultOverduePosterBase implements OverduePoster {
public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
// Check if we already have notifications for that key
final Class<T> clazz = (Class<T>) notificationKey.getClass();
- final Collection<NotificationEventWithMetadata<T>> futureNotifications = getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, overdueQueue,
- clazz, context);
+ final Iterable<NotificationEventWithMetadata<T>> futureNotifications = getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, overdueQueue,
+ clazz, context);
final boolean shouldInsertNewNotification = cleanupFutureNotificationsFormTransaction(entitySqlDaoWrapperFactory, futureNotifications, futureNotificationTime, overdueQueue);
if (shouldInsertNewNotification) {
@@ -92,8 +91,8 @@ public abstract class DefaultOverduePosterBase implements OverduePoster {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@Override
public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
- final Collection<NotificationEventWithMetadata<T>> futureNotifications = getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, checkOverdueQueue,
- clazz, context);
+ final Iterable<NotificationEventWithMetadata<T>> futureNotifications = getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, checkOverdueQueue,
+ clazz, context);
for (final NotificationEventWithMetadata<T> notification : futureNotifications) {
checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), notification.getRecordId());
}
@@ -107,15 +106,15 @@ public abstract class DefaultOverduePosterBase implements OverduePoster {
}
@VisibleForTesting
- <T extends OverdueCheckNotificationKey> Collection<NotificationEventWithMetadata<T>> getFutureNotificationsForAccountInTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
- final NotificationQueue checkOverdueQueue,
- final Class<T> clazz,
- final InternalCallContext context) {
+ <T extends OverdueCheckNotificationKey> Iterable<NotificationEventWithMetadata<T>> getFutureNotificationsForAccountInTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
+ final NotificationQueue checkOverdueQueue,
+ final Class<T> clazz,
+ final InternalCallContext context) {
return checkOverdueQueue.getFutureNotificationFromTransactionForSearchKeys(context.getAccountRecordId(), context.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
}
protected abstract <T extends OverdueCheckNotificationKey> boolean cleanupFutureNotificationsFormTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
- final Collection<NotificationEventWithMetadata<T>> futureNotifications,
+ final Iterable<NotificationEventWithMetadata<T>> futureNotifications,
final DateTime futureNotificationTime, final NotificationQueue overdueQueue);
}
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java b/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java
index d6b7a44..124fe69 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -18,8 +18,6 @@
package org.killbill.billing.overdue.notification;
-import java.util.Collection;
-
import org.joda.time.DateTime;
import org.killbill.billing.util.cache.CacheControllerDispatcher;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
@@ -31,6 +29,7 @@ import org.killbill.notificationq.api.NotificationQueue;
import org.killbill.notificationq.api.NotificationQueueService;
import org.skife.jdbi.v2.IDBI;
+import com.google.common.collect.Iterables;
import com.google.inject.Inject;
public class OverdueAsyncBusPoster extends DefaultOverduePosterBase {
@@ -44,12 +43,13 @@ public class OverdueAsyncBusPoster extends DefaultOverduePosterBase {
@Override
protected <T extends OverdueCheckNotificationKey> boolean cleanupFutureNotificationsFormTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
- final Collection<NotificationEventWithMetadata<T>> futureNotifications,
+ final Iterable<NotificationEventWithMetadata<T>> futureNotifications,
final DateTime futureNotificationTime,
final NotificationQueue overdueQueue) {
// If we already have notification for that account we don't insert the new one
// Note that this is slightly incorrect because we could for instance already have a REFRESH and insert a CLEAR, but if that were the case,
// if means overdue state would change very rapidly and the behavior would anyway be non deterministic
- return futureNotifications.isEmpty();
+ // Note: don't use isEmpty() to go through all results to close the connection
+ return Iterables.<NotificationEventWithMetadata<T>>size(futureNotifications) == 0;
}
}
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueCheckPoster.java b/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueCheckPoster.java
index 1119e53..25d7eff 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueCheckPoster.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueCheckPoster.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -18,8 +18,6 @@
package org.killbill.billing.overdue.notification;
-import java.util.Collection;
-
import org.joda.time.DateTime;
import org.killbill.billing.util.cache.CacheControllerDispatcher;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
@@ -44,32 +42,31 @@ public class OverdueCheckPoster extends DefaultOverduePosterBase {
@Override
protected <T extends OverdueCheckNotificationKey> boolean cleanupFutureNotificationsFormTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
- final Collection<NotificationEventWithMetadata<T>> futureNotifications,
+ final Iterable<NotificationEventWithMetadata<T>> futureNotifications,
final DateTime futureNotificationTime, final NotificationQueue overdueQueue) {
boolean shouldInsertNewNotification = true;
- if (!futureNotifications.isEmpty()) {
+ int minIndexToDeleteFrom = 0;
+ int index = 0;
+ for (final NotificationEventWithMetadata<T> cur : futureNotifications) {
// Results are ordered by effective date asc
- final DateTime earliestExistingNotificationDate = futureNotifications.iterator().next().getEffectiveDate();
-
- final int minIndexToDeleteFrom;
- if (earliestExistingNotificationDate.isBefore(futureNotificationTime)) {
- // We don't have to insert a new one. For sanity, delete any other future notification
- minIndexToDeleteFrom = 1;
- shouldInsertNewNotification = false;
- } else {
- // We win - we are before any other already recorded. Delete all others.
- minIndexToDeleteFrom = 0;
+ if (index == 0) {
+ if (cur.getEffectiveDate().isBefore(futureNotificationTime)) {
+ // We don't have to insert a new one. For sanity, delete any other future notification
+ minIndexToDeleteFrom = 1;
+ shouldInsertNewNotification = false;
+ } else {
+ // We win - we are before any other already recorded. Delete all others.
+ minIndexToDeleteFrom = 0;
+ }
}
- int index = 0;
- for (final NotificationEventWithMetadata<T> cur : futureNotifications) {
- if (minIndexToDeleteFrom <= index) {
- overdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), cur.getRecordId());
- }
- index++;
+ if (minIndexToDeleteFrom <= index) {
+ overdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), cur.getRecordId());
}
+ index++;
}
+
return shouldInsertNewNotification;
}
}
diff --git a/overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java b/overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java
index 03e91fa..3f8cd00 100644
--- a/overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java
+++ b/overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java
@@ -1,7 +1,9 @@
/*
* Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
@@ -17,24 +19,26 @@
package org.killbill.billing.overdue.notification;
import java.io.IOException;
-import java.util.Collection;
+import java.util.List;
import java.util.UUID;
import org.joda.time.DateTime;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
import org.killbill.billing.account.api.Account;
-import org.killbill.notificationq.api.NotificationEventWithMetadata;
-import org.killbill.notificationq.api.NotificationQueue;
import org.killbill.billing.overdue.OverdueTestSuiteWithEmbeddedDB;
import org.killbill.billing.overdue.service.DefaultOverdueService;
import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
import org.killbill.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
import org.killbill.billing.util.jackson.ObjectMapper;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedDB {
@@ -68,7 +72,7 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
insertOverdueCheckAndVerifyQueueContent(overdueable, 15, 5);
// Verify the final content of the queue
- Assert.assertEquals(overdueQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()).size(), 1);
+ Assert.assertEquals(Iterables.<NotificationEventWithMetadata>size(overdueQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId())), 1);
}
private void insertOverdueCheckAndVerifyQueueContent(final Account account, final int nbDaysInFuture, final int expectedNbDaysInFuture) throws IOException {
@@ -77,18 +81,18 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(account.getId());
checkPoster.insertOverdueNotification(account.getId(), futureNotificationTime, OverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE, notificationKey, internalCallContext);
- final Collection<NotificationEventWithMetadata<OverdueCheckNotificationKey>> notificationsForKey = getNotificationsForOverdueable(account);
+ final List<NotificationEventWithMetadata<OverdueCheckNotificationKey>> notificationsForKey = getNotificationsForOverdueable(account);
Assert.assertEquals(notificationsForKey.size(), 1);
- final NotificationEventWithMetadata nm = notificationsForKey.iterator().next();
+ final NotificationEventWithMetadata nm = notificationsForKey.get(0);
Assert.assertEquals(nm.getEvent(), notificationKey);
Assert.assertEquals(nm.getEffectiveDate(), testReferenceTime.plusDays(expectedNbDaysInFuture));
}
- private Collection<NotificationEventWithMetadata<OverdueCheckNotificationKey>> getNotificationsForOverdueable(final Account account) {
- return entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Collection<NotificationEventWithMetadata<OverdueCheckNotificationKey>>>() {
+ private List<NotificationEventWithMetadata<OverdueCheckNotificationKey>> getNotificationsForOverdueable(final Account account) {
+ return entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<List<NotificationEventWithMetadata<OverdueCheckNotificationKey>>>() {
@Override
- public Collection<NotificationEventWithMetadata<OverdueCheckNotificationKey>> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
- return ((OverdueCheckPoster)checkPoster).getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, overdueQueue, OverdueCheckNotificationKey.class, internalCallContext);
+ public List<NotificationEventWithMetadata<OverdueCheckNotificationKey>> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
+ return ImmutableList.<NotificationEventWithMetadata<OverdueCheckNotificationKey>>copyOf(((OverdueCheckPoster) checkPoster).getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, overdueQueue, OverdueCheckNotificationKey.class, internalCallContext));
}
});
}
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
index f4a0491..b35e99a 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -408,14 +408,14 @@ public class PaymentProcessor extends ProcessorBase {
public void cancelScheduledPaymentTransaction(final UUID lastPaymentAttemptId, final InternalCallContext internalCallContext) throws PaymentApiException {
try {
final NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, DefaultRetryService.QUEUE_NAME);
- final List<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
+ final Iterable<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
retryQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationEventWithMetadatas) {
if (((PaymentRetryNotificationKey) notificationEvent.getEvent()).getAttemptId().equals(lastPaymentAttemptId)) {
retryQueue.removeNotification(notificationEvent.getRecordId());
- break;
}
+ // Go through all results to close the connection
}
} catch (final NoSuchNotificationQueue noSuchNotificationQueue) {
log.error("ERROR Loading Notification Queue - " + noSuchNotificationQueue.getMessage());
@@ -767,7 +767,7 @@ public class PaymentProcessor extends ProcessorBase {
// Get Future Payment Attempts from Notification Queue and add them to the list
try {
final NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, DefaultRetryService.QUEUE_NAME);
- final List<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
+ final Iterable<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
retryQueue.getFutureNotificationForSearchKeys(internalTenantContext.getAccountRecordId(), internalTenantContext.getTenantRecordId());
for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationEventWithMetadatas) {
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
index 5a137ac..191ed5c 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
@@ -1,6 +1,6 @@
/*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -40,6 +40,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
public class TestIncompletePaymentTransactionTaskWithDB extends PaymentTestSuiteWithEmbeddedDB {
@@ -77,7 +78,7 @@ public class TestIncompletePaymentTransactionTaskWithDB extends PaymentTestSuite
final JanitorNotificationKey notificationKey = new JanitorNotificationKey(transactionId, incompletePaymentTransactionTask.getClass().toString(), 1);
final UUID userToken = UUID.randomUUID();
- Assert.assertTrue(incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()).isEmpty());
+ Assert.assertTrue(Iterables.<NotificationEventWithMetadata<NotificationEvent>>isEmpty(incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId())));
GlobalLock lock = null;
try {
@@ -85,16 +86,17 @@ public class TestIncompletePaymentTransactionTaskWithDB extends PaymentTestSuite
incompletePaymentTransactionTask.processNotification(notificationKey, userToken, internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
- final List<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
- Assert.assertFalse(futureNotifications.isEmpty());
- Assert.assertEquals(futureNotifications.get(0).getUserToken(), userToken);
- Assert.assertEquals(futureNotifications.get(0).getEvent().getClass(), JanitorNotificationKey.class);
- final JanitorNotificationKey event = (JanitorNotificationKey) futureNotifications.get(0).getEvent();
+ final Iterable<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+ Assert.assertFalse(Iterables.<NotificationEventWithMetadata<NotificationEvent>>isEmpty(futureNotifications));
+ final NotificationEventWithMetadata<NotificationEvent> notificationEventWithMetadata = ImmutableList.<NotificationEventWithMetadata<NotificationEvent>>copyOf(futureNotifications).get(0);
+ Assert.assertEquals(notificationEventWithMetadata.getUserToken(), userToken);
+ Assert.assertEquals(notificationEventWithMetadata.getEvent().getClass(), JanitorNotificationKey.class);
+ final JanitorNotificationKey event = (JanitorNotificationKey) notificationEventWithMetadata.getEvent();
Assert.assertEquals(event.getUuidKey(), transactionId);
Assert.assertEquals((int) event.getAttemptNumber(), 2);
// Based on config "15s,1m,3m,1h,1d,1d,1d,1d,1d"
- Assert.assertTrue(futureNotifications.get(0).getEffectiveDate().compareTo(clock.getUTCNow().plusMinutes(1).plusSeconds(1)) < 0);
+ Assert.assertTrue(notificationEventWithMetadata.getEffectiveDate().compareTo(clock.getUTCNow().plusMinutes(1).plusSeconds(1)) < 0);
} catch (final LockFailedException e) {
Assert.fail();
} finally {
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
index b8d044b..0fcfb7f 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -1,6 +1,6 @@
/*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -71,6 +71,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import static com.jayway.awaitility.Awaitility.await;
@@ -508,12 +509,14 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
await().atMost(timeoutSec, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
+ boolean completed = true;
for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId())) {
if (!notificationEvent.getEffectiveDate().isAfter(clock.getUTCNow())) {
- return false;
+ completed = false;
}
+ // Go through all results to close the connection
}
- return true;
+ return completed;
}
});
} catch (final Exception e) {
@@ -523,7 +526,7 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
private int getPendingNotificationCnt(final InternalCallContext internalCallContext) {
try {
- return notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()).size();
+ return Iterables.<NotificationEventWithMetadata>size(notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
} catch (final Exception e) {
fail("Test failed ", e);
}
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java b/subscription/src/main/java/org/killbill/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java
index c72cf42..ccc67eb 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -731,7 +731,7 @@ public class DefaultSubscriptionInternalApi extends SubscriptionApiBase implemen
try {
final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultSubscriptionBaseService.SUBSCRIPTION_SERVICE_NAME,
DefaultSubscriptionBaseService.NOTIFICATION_QUEUE_NAME);
- final List<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+ final Iterable<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
return Iterables.transform(futureNotifications, new Function<NotificationEventWithMetadata<NotificationEvent>, DateTime>() {
@Nullable
@Override