killbill-memoizeit

jaxrs: improve waiting for notifications in TestResource This

2/12/2015 12:06:21 PM

Details

diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
index 2801eed..f16acf3 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2015 Groupon, Inc
+ * Copyright 2014-2015 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -16,10 +18,11 @@
 
 package org.killbill.billing.jaxrs.resources;
 
+import java.util.Collection;
 import java.util.List;
 
-import javax.annotation.Nullable;
 import javax.inject.Inject;
+import javax.servlet.ServletRequest;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -44,8 +47,13 @@ import org.killbill.billing.util.api.CustomFieldUserApi;
 import org.killbill.billing.util.api.RecordIdApi;
 import org.killbill.billing.util.api.TagUserApi;
 import org.killbill.billing.util.callcontext.TenantContext;
+import org.killbill.bus.api.BusEvent;
+import org.killbill.bus.api.BusEventWithMetadata;
+import org.killbill.bus.api.PersistentBus;
 import org.killbill.clock.Clock;
 import org.killbill.clock.ClockMock;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
 import org.killbill.notificationq.api.NotificationQueue;
 import org.killbill.notificationq.api.NotificationQueueService;
 import org.slf4j.Logger;
@@ -54,7 +62,7 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.Collections2;
 import com.wordnik.swagger.annotations.Api;
 import com.wordnik.swagger.annotations.ApiOperation;
 import com.wordnik.swagger.annotations.ApiResponse;
@@ -78,15 +86,17 @@ public class TestResource extends JaxRsResourceBase {
     private static final Logger log = LoggerFactory.getLogger(TestResource.class);
     private static final int MILLIS_IN_SEC = 1000;
 
+    private final PersistentBus persistentBus;
     private final NotificationQueueService notificationQueueService;
     private final RecordIdApi recordIdApi;
 
     @Inject
     public TestResource(final JaxrsUriBuilder uriBuilder, final TagUserApi tagUserApi, final CustomFieldUserApi customFieldUserApi,
                         final AuditUserApi auditUserApi, final AccountUserApi accountUserApi, final RecordIdApi recordIdApi,
-                        final NotificationQueueService notificationQueueService, final PaymentApi paymentApi,
+                        final PersistentBus persistentBus, final NotificationQueueService notificationQueueService, final PaymentApi paymentApi,
                         final Clock clock, final Context context) {
         super(uriBuilder, tagUserApi, customFieldUserApi, auditUserApi, accountUserApi, paymentApi, clock, context);
+        this.persistentBus = persistentBus;
         this.notificationQueueService = notificationQueueService;
         this.recordIdApi = recordIdApi;
     }
@@ -121,6 +131,16 @@ public class TestResource extends JaxRsResourceBase {
     }
 
     @GET
+    @Path("/queues")
+    @ApiOperation(value = "Wait for all available bus events and notifications to be processed")
+    @ApiResponses(value = {@ApiResponse(code = 412, message = "Timeout too short")})
+    public Response waitForQueuesToComplete(@QueryParam("timeoutSec") @DefaultValue("5") final Long timeoutSec,
+                                            @javax.ws.rs.core.Context final HttpServletRequest request) {
+        final boolean areAllNotificationsProcessed = waitForNotificationToComplete(request, timeoutSec);
+        return Response.status(areAllNotificationsProcessed ? Status.OK : Status.PRECONDITION_FAILED).build();
+    }
+
+    @GET
     @Path("/clock")
     @Produces(APPLICATION_JSON)
     @ApiOperation(value = "Get the current time", response = ClockResource.class)
@@ -185,16 +205,20 @@ public class TestResource extends JaxRsResourceBase {
         return getCurrentTime(timeZoneStr);
     }
 
-    private void waitForNotificationToComplete(final HttpServletRequest request, final Long timeoutSec) {
+    private boolean waitForNotificationToComplete(final ServletRequest request, final Long timeoutSec) {
         final TenantContext tenantContext = context.createContext(request);
         final Long tenantRecordId = recordIdApi.getRecordId(tenantContext.getTenantId(), ObjectType.TENANT, tenantContext);
-        final List<NotificationQueue> queues = notificationQueueService.getNotificationQueues();
 
         int nbTryLeft = timeoutSec != null ? timeoutSec.intValue() : 0;
+        boolean areAllNotificationsProcessed = false;
         try {
-            boolean areAllNotificationsProcessed = false;
             while (!areAllNotificationsProcessed && nbTryLeft > 0) {
-                areAllNotificationsProcessed = areAllNotificationsProcessed(queues, tenantRecordId);
+                areAllNotificationsProcessed = areAllNotificationsProcessed(tenantRecordId);
+                // Processing of notifications may have triggered bus events, which may trigger other notifications
+                // effective immediately. Hence, we need to make sure all bus events have been processed too.
+                areAllNotificationsProcessed = areAllNotificationsProcessed && areAllBusEventsProcessed(tenantRecordId);
+                // We do a re-check of the notification queues in case of race conditions.
+                areAllNotificationsProcessed = areAllNotificationsProcessed && areAllNotificationsProcessed(tenantRecordId);
                 if (!areAllNotificationsProcessed) {
                     Thread.sleep(MILLIS_IN_SEC);
                     nbTryLeft--;
@@ -202,16 +226,39 @@ public class TestResource extends JaxRsResourceBase {
             }
         } catch (final InterruptedException ignore) {
         }
+
+        if (!areAllNotificationsProcessed) {
+            log.warn("TestResource: there are more notifications or bus events to process, consider increasing the timeout (currently {}s)", timeoutSec);
+        }
+
+        return areAllNotificationsProcessed;
     }
 
-    private boolean areAllNotificationsProcessed(final Iterable<NotificationQueue> queues, final Long tenantRecordId) {
-        final Iterable<NotificationQueue> filtered = Iterables.filter(queues, new Predicate<NotificationQueue>() {
-            @Override
-            public boolean apply(@Nullable final NotificationQueue input) {
-                return input.getFutureNotificationForSearchKey2(tenantRecordId).size() > 0;
-            }
-        });
-        return !filtered.iterator().hasNext();
+    private boolean areAllNotificationsProcessed(final Long tenantRecordId) {
+        final Collection<NotificationQueue> filtered = Collections2.<NotificationQueue>filter(notificationQueueService.getNotificationQueues(),
+                                                                                              new Predicate<NotificationQueue>() {
+                                                                                                  @Override
+                                                                                                  public boolean apply(final NotificationQueue notificationQueue) {
+                                                                                                      for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationQueue.getFutureNotificationForSearchKey2(tenantRecordId)) {
+                                                                                                          if (!notificationEvent.getEffectiveDate().isAfter(clock.getUTCNow())) {
+                                                                                                              return true;
+                                                                                                          }
+                                                                                                      }
+                                                                                                      return false;
+                                                                                                  }
+                                                                                              });
+        if (!filtered.isEmpty()) {
+            log.info("TestResource: {} more notification(s) to process", filtered.size());
+        }
+        return filtered.isEmpty() && notificationQueueService.inProcessingNotificationsCount() == 0;
+    }
+
+    private boolean areAllBusEventsProcessed(final Long tenantRecordId) {
+        final List<BusEventWithMetadata<BusEvent>> availableBusEventForSearchKey2 = persistentBus.getAvailableBusEventsForSearchKey2(tenantRecordId);
+        if (!availableBusEventForSearchKey2.isEmpty()) {
+            log.info("TestResource: at least {} more bus event(s) to process", availableBusEventForSearchKey2.size());
+        }
+        return availableBusEventForSearchKey2.isEmpty() && persistentBus.inProcessingBusEventsCount() == 0;
     }
 
     private ClockMock getClockMock() {