killbill-uncached

util: improve TestApiListener synchronization Make sure

1/20/2016 1:29:23 AM

Details

diff --git a/util/src/test/java/org/killbill/billing/api/TestApiListener.java b/util/src/test/java/org/killbill/billing/api/TestApiListener.java
index 9f28aa5..a844636 100644
--- a/util/src/test/java/org/killbill/billing/api/TestApiListener.java
+++ b/util/src/test/java/org/killbill/billing/api/TestApiListener.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2011 Ning, Inc.
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -25,7 +27,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.inject.Inject;
 
-import org.joda.time.DateTime;
 import org.killbill.billing.events.BlockingTransitionInternalEvent;
 import org.killbill.billing.events.BroadcastInternalEvent;
 import org.killbill.billing.events.CustomFieldEvent;
@@ -291,31 +292,35 @@ public class TestApiListener {
             long waitTimeMs = timeout;
             do {
                 try {
-                    final DateTime before = new DateTime();
-                    wait(500);
+                    final long before = System.currentTimeMillis();
+                    wait(100);
                     if (completed) {
                         // TODO PIERRE Kludge alert!
                         // When we arrive here, we got notified by the current thread (Bus listener) that we received
                         // all expected events. But other handlers might still be processing them.
                         // Since there is only one bus thread, and that the test thread waits for all events to be processed,
                         // we're guaranteed that all are processed when the bus events table is empty.
+                        // We also need to wait for in-processing notifications (see https://github.com/killbill/killbill/issues/475).
+                        // This is really similar to TestResource#waitForNotificationToComplete.
                         await().atMost(timeout, TimeUnit.MILLISECONDS).until(new Callable<Boolean>() {
                             @Override
                             public Boolean call() throws Exception {
-                                final long inProcessingBusEvents = idbi.withHandle(new HandleCallback<Long>() {
+                                final long inProcessing = idbi.withHandle(new HandleCallback<Long>() {
                                     @Override
                                     public Long withHandle(final Handle handle) throws Exception {
-                                        return (Long) handle.select("select count(distinct record_id) count from bus_events").get(0).get("count");
+                                        return (Long) handle.select("select count(distinct record_id) count from bus_events").get(0).get("count") +
+                                               // We assume all ready notifications have been picked up
+                                               (Long) handle.select("select count(distinct record_id) count from notifications where processing_state = 'IN_PROCESSING'").get(0).get("count");
                                     }
                                 });
-                                log.debug("Events still in processing: " + inProcessingBusEvents);
-                                return inProcessingBusEvents == 0;
+                                log.debug("Events still in processing: {}", inProcessing);
+                                return inProcessing == 0;
                             }
                         });
                         return completed;
                     }
-                    final DateTime after = new DateTime();
-                    waitTimeMs -= after.getMillis() - before.getMillis();
+                    final long after = System.currentTimeMillis();
+                    waitTimeMs -= (after - before);
                 } catch (final Exception ignore) {
                     final StringBuilder errorBuilder = new StringBuilder("isCompleted got interrupted. Exception: ").append(ignore)
                                                                                                                     .append("\nRemaining bus events:\n");