diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
index 2801eed..f16acf3 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
@@ -1,7 +1,9 @@
/*
* Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
*
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
@@ -16,10 +18,11 @@
package org.killbill.billing.jaxrs.resources;
+import java.util.Collection;
import java.util.List;
-import javax.annotation.Nullable;
import javax.inject.Inject;
+import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@@ -44,8 +47,13 @@ import org.killbill.billing.util.api.CustomFieldUserApi;
import org.killbill.billing.util.api.RecordIdApi;
import org.killbill.billing.util.api.TagUserApi;
import org.killbill.billing.util.callcontext.TenantContext;
+import org.killbill.bus.api.BusEvent;
+import org.killbill.bus.api.BusEventWithMetadata;
+import org.killbill.bus.api.PersistentBus;
import org.killbill.clock.Clock;
import org.killbill.clock.ClockMock;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
import org.killbill.notificationq.api.NotificationQueue;
import org.killbill.notificationq.api.NotificationQueueService;
import org.slf4j.Logger;
@@ -54,7 +62,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Collections2;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiResponse;
@@ -78,15 +86,17 @@ public class TestResource extends JaxRsResourceBase {
private static final Logger log = LoggerFactory.getLogger(TestResource.class);
private static final int MILLIS_IN_SEC = 1000;
+ private final PersistentBus persistentBus;
private final NotificationQueueService notificationQueueService;
private final RecordIdApi recordIdApi;
@Inject
public TestResource(final JaxrsUriBuilder uriBuilder, final TagUserApi tagUserApi, final CustomFieldUserApi customFieldUserApi,
final AuditUserApi auditUserApi, final AccountUserApi accountUserApi, final RecordIdApi recordIdApi,
- final NotificationQueueService notificationQueueService, final PaymentApi paymentApi,
+ final PersistentBus persistentBus, final NotificationQueueService notificationQueueService, final PaymentApi paymentApi,
final Clock clock, final Context context) {
super(uriBuilder, tagUserApi, customFieldUserApi, auditUserApi, accountUserApi, paymentApi, clock, context);
+ this.persistentBus = persistentBus;
this.notificationQueueService = notificationQueueService;
this.recordIdApi = recordIdApi;
}
@@ -121,6 +131,16 @@ public class TestResource extends JaxRsResourceBase {
}
@GET
+ @Path("/queues")
+ @ApiOperation(value = "Wait for all available bus events and notifications to be processed")
+ @ApiResponses(value = {@ApiResponse(code = 412, message = "Timeout too short")})
+ public Response waitForQueuesToComplete(@QueryParam("timeoutSec") @DefaultValue("5") final Long timeoutSec,
+ @javax.ws.rs.core.Context final HttpServletRequest request) {
+ final boolean areAllNotificationsProcessed = waitForNotificationToComplete(request, timeoutSec);
+ return Response.status(areAllNotificationsProcessed ? Status.OK : Status.PRECONDITION_FAILED).build();
+ }
+
+ @GET
@Path("/clock")
@Produces(APPLICATION_JSON)
@ApiOperation(value = "Get the current time", response = ClockResource.class)
@@ -185,16 +205,20 @@ public class TestResource extends JaxRsResourceBase {
return getCurrentTime(timeZoneStr);
}
- private void waitForNotificationToComplete(final HttpServletRequest request, final Long timeoutSec) {
+ private boolean waitForNotificationToComplete(final ServletRequest request, final Long timeoutSec) {
final TenantContext tenantContext = context.createContext(request);
final Long tenantRecordId = recordIdApi.getRecordId(tenantContext.getTenantId(), ObjectType.TENANT, tenantContext);
- final List<NotificationQueue> queues = notificationQueueService.getNotificationQueues();
int nbTryLeft = timeoutSec != null ? timeoutSec.intValue() : 0;
+ boolean areAllNotificationsProcessed = false;
try {
- boolean areAllNotificationsProcessed = false;
while (!areAllNotificationsProcessed && nbTryLeft > 0) {
- areAllNotificationsProcessed = areAllNotificationsProcessed(queues, tenantRecordId);
+ areAllNotificationsProcessed = areAllNotificationsProcessed(tenantRecordId);
+ // Processing of notifications may have triggered bus events, which may trigger other notifications
+ // effective immediately. Hence, we need to make sure all bus events have been processed too.
+ areAllNotificationsProcessed = areAllNotificationsProcessed && areAllBusEventsProcessed(tenantRecordId);
+ // We do a re-check of the notification queues in case of race conditions.
+ areAllNotificationsProcessed = areAllNotificationsProcessed && areAllNotificationsProcessed(tenantRecordId);
if (!areAllNotificationsProcessed) {
Thread.sleep(MILLIS_IN_SEC);
nbTryLeft--;
@@ -202,16 +226,39 @@ public class TestResource extends JaxRsResourceBase {
}
} catch (final InterruptedException ignore) {
}
+
+ if (!areAllNotificationsProcessed) {
+ log.warn("TestResource: there are more notifications or bus events to process, consider increasing the timeout (currently {}s)", timeoutSec);
+ }
+
+ return areAllNotificationsProcessed;
}
- private boolean areAllNotificationsProcessed(final Iterable<NotificationQueue> queues, final Long tenantRecordId) {
- final Iterable<NotificationQueue> filtered = Iterables.filter(queues, new Predicate<NotificationQueue>() {
- @Override
- public boolean apply(@Nullable final NotificationQueue input) {
- return input.getFutureNotificationForSearchKey2(tenantRecordId).size() > 0;
- }
- });
- return !filtered.iterator().hasNext();
+ private boolean areAllNotificationsProcessed(final Long tenantRecordId) {
+ final Collection<NotificationQueue> filtered = Collections2.<NotificationQueue>filter(notificationQueueService.getNotificationQueues(),
+ new Predicate<NotificationQueue>() {
+ @Override
+ public boolean apply(final NotificationQueue notificationQueue) {
+ for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationQueue.getFutureNotificationForSearchKey2(tenantRecordId)) {
+ if (!notificationEvent.getEffectiveDate().isAfter(clock.getUTCNow())) {
+ return true;
+ }
+ }
+ return false;
+ }
+ });
+ if (!filtered.isEmpty()) {
+ log.info("TestResource: {} more notification(s) to process", filtered.size());
+ }
+ return filtered.isEmpty() && notificationQueueService.inProcessingNotificationsCount() == 0;
+ }
+
+ private boolean areAllBusEventsProcessed(final Long tenantRecordId) {
+ final List<BusEventWithMetadata<BusEvent>> availableBusEventForSearchKey2 = persistentBus.getAvailableBusEventsForSearchKey2(tenantRecordId);
+ if (!availableBusEventForSearchKey2.isEmpty()) {
+ log.info("TestResource: at least {} more bus event(s) to process", availableBusEventForSearchKey2.size());
+ }
+ return availableBusEventForSearchKey2.isEmpty() && persistentBus.inProcessingBusEventsCount() == 0;
}
private ClockMock getClockMock() {