diff --git a/util/src/test/java/org/killbill/billing/api/TestApiListener.java b/util/src/test/java/org/killbill/billing/api/TestApiListener.java
index 9f28aa5..a844636 100644
--- a/util/src/test/java/org/killbill/billing/api/TestApiListener.java
+++ b/util/src/test/java/org/killbill/billing/api/TestApiListener.java
@@ -1,7 +1,9 @@
/*
* Copyright 2010-2011 Ning, Inc.
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
*
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
@@ -25,7 +27,6 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
-import org.joda.time.DateTime;
import org.killbill.billing.events.BlockingTransitionInternalEvent;
import org.killbill.billing.events.BroadcastInternalEvent;
import org.killbill.billing.events.CustomFieldEvent;
@@ -291,31 +292,35 @@ public class TestApiListener {
long waitTimeMs = timeout;
do {
try {
- final DateTime before = new DateTime();
- wait(500);
+ final long before = System.currentTimeMillis();
+ wait(100);
if (completed) {
// TODO PIERRE Kludge alert!
// When we arrive here, we got notified by the current thread (Bus listener) that we received
// all expected events. But other handlers might still be processing them.
// Since there is only one bus thread, and that the test thread waits for all events to be processed,
// we're guaranteed that all are processed when the bus events table is empty.
+ // We also need to wait for in-processing notifications (see https://github.com/killbill/killbill/issues/475).
+ // This is really similar to TestResource#waitForNotificationToComplete.
await().atMost(timeout, TimeUnit.MILLISECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- final long inProcessingBusEvents = idbi.withHandle(new HandleCallback<Long>() {
+ final long inProcessing = idbi.withHandle(new HandleCallback<Long>() {
@Override
public Long withHandle(final Handle handle) throws Exception {
- return (Long) handle.select("select count(distinct record_id) count from bus_events").get(0).get("count");
+ return (Long) handle.select("select count(distinct record_id) count from bus_events").get(0).get("count") +
+ // We assume all ready notifications have been picked up
+ (Long) handle.select("select count(distinct record_id) count from notifications where processing_state = 'IN_PROCESSING'").get(0).get("count");
}
});
- log.debug("Events still in processing: " + inProcessingBusEvents);
- return inProcessingBusEvents == 0;
+ log.debug("Events still in processing: {}", inProcessing);
+ return inProcessing == 0;
}
});
return completed;
}
- final DateTime after = new DateTime();
- waitTimeMs -= after.getMillis() - before.getMillis();
+ final long after = System.currentTimeMillis();
+ waitTimeMs -= (after - before);
} catch (final Exception ignore) {
final StringBuilder errorBuilder = new StringBuilder("isCompleted got interrupted. Exception: ").append(ignore)
.append("\nRemaining bus events:\n");