killbill-memoizeit

beatrix: Harden logic in TestApiListener isCompleted method

5/5/2016 1:56:13 PM

Details

diff --git a/util/src/test/java/org/killbill/billing/api/TestApiListener.java b/util/src/test/java/org/killbill/billing/api/TestApiListener.java
index 90d142d..cff1e27 100644
--- a/util/src/test/java/org/killbill/billing/api/TestApiListener.java
+++ b/util/src/test/java/org/killbill/billing/api/TestApiListener.java
@@ -43,6 +43,7 @@ import org.killbill.billing.events.PaymentInfoInternalEvent;
 import org.killbill.billing.events.PaymentPluginErrorInternalEvent;
 import org.killbill.billing.events.TagDefinitionInternalEvent;
 import org.killbill.billing.events.TagInternalEvent;
+import org.killbill.clock.Clock;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.tweak.HandleCallback;
@@ -67,6 +68,7 @@ public class TestApiListener {
 
     private final List<NextEvent> nextExpectedEvent;
     private final IDBI idbi;
+    private final Clock clock;
 
     private boolean isListenerFailed = false;
     private String listenerFailedMsg;
@@ -74,10 +76,11 @@ public class TestApiListener {
     private volatile boolean completed;
 
     @Inject
-    public TestApiListener(final IDBI idbi) {
+    public TestApiListener(final IDBI idbi, final Clock clock) {
         nextExpectedEvent = new Stack<NextEvent>();
         this.completed = false;
         this.idbi = idbi;
+        this.clock = clock;
     }
 
     public void assertListenerStatus() {
@@ -330,16 +333,9 @@ public class TestApiListener {
                         await().atMost(timeout, TimeUnit.MILLISECONDS).until(new Callable<Boolean>() {
                             @Override
                             public Boolean call() throws Exception {
-                                final long inProcessing = idbi.withHandle(new HandleCallback<Long>() {
-                                    @Override
-                                    public Long withHandle(final Handle handle) throws Exception {
-                                        return (Long) handle.select("select count(distinct record_id) count from bus_events").get(0).get("count") +
-                                               // We assume all ready notifications have been picked up
-                                               (Long) handle.select("select count(distinct record_id) count from notifications where processing_state = 'IN_PROCESSING'").get(0).get("count");
-                                    }
-                                });
-                                log.debug("Events still in processing: {}", inProcessing);
-                                return inProcessing == 0;
+                                final long pending = idbi.withHandle(new PendingBusOrNotificationCallback(clock));
+                                log.debug("Events still in processing: {}", pending);
+                                return pending == 0;
                             }
                         });
                         return completed;
@@ -347,19 +343,9 @@ public class TestApiListener {
                     final long after = System.currentTimeMillis();
                     waitTimeMs -= (after - before);
                 } catch (final Exception ignore) {
-                    final StringBuilder errorBuilder = new StringBuilder("isCompleted got interrupted. Exception: ").append(ignore)
-                                                                                                                    .append("\nRemaining bus events:\n");
-                    idbi.withHandle(new HandleCallback<Void>() {
-                        @Override
-                        public Void withHandle(final Handle handle) throws Exception {
-                            final List<Map<String, Object>> busEvents = handle.select("select * from bus_events");
-                            for (final Map<String, Object> busEvent : busEvents) {
-                                errorBuilder.append(busEvent).append("\n");
-                            }
-                            return null;
-                        }
-                    });
-                    log.error(errorBuilder.toString());
+                    // Rerun one more time to provide details
+                    final long pending = idbi.withHandle(new PendingBusOrNotificationCallback(clock));
+                    log.error("isCompleted : Received all events but found remaining unprocessed bus events/notifications =  {}", pending);
                     return false;
                 }
             } while (waitTimeMs > 0 && !completed);
@@ -369,10 +355,24 @@ public class TestApiListener {
             final Joiner joiner = Joiner.on(" ");
             log.error("TestApiListener did not complete in " + timeout + " ms, remaining events are " + joiner.join(nextExpectedEvent));
         }
-
         return completed;
     }
 
+    private static class PendingBusOrNotificationCallback implements HandleCallback<Long> {
+
+        private final Clock clock;
+
+        public PendingBusOrNotificationCallback(final Clock clock) {
+            this.clock = clock;
+        }
+        @Override
+        public Long withHandle(final Handle handle) throws Exception {
+            return (Long) handle.select("select count(distinct record_id) count from bus_events").get(0).get("count") +
+                   (Long) handle.select("select count(distinct record_id) count from notifications where effective_date < ?", clock.getUTCNow().toDate()).get(0).get("count") +
+                   (Long) handle.select("select count(distinct record_id) count from notifications where processing_state = 'IN_PROCESSING'").get(0).get("count");
+        }
+    }
+
     private void notifyIfStackEmpty() {
         log.debug("TestApiListener notifyIfStackEmpty ENTER");
         synchronized (this) {