diff --git a/util/src/test/java/org/killbill/billing/api/TestApiListener.java b/util/src/test/java/org/killbill/billing/api/TestApiListener.java
index 90d142d..cff1e27 100644
--- a/util/src/test/java/org/killbill/billing/api/TestApiListener.java
+++ b/util/src/test/java/org/killbill/billing/api/TestApiListener.java
@@ -43,6 +43,7 @@ import org.killbill.billing.events.PaymentInfoInternalEvent;
import org.killbill.billing.events.PaymentPluginErrorInternalEvent;
import org.killbill.billing.events.TagDefinitionInternalEvent;
import org.killbill.billing.events.TagInternalEvent;
+import org.killbill.clock.Clock;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
@@ -67,6 +68,7 @@ public class TestApiListener {
private final List<NextEvent> nextExpectedEvent;
private final IDBI idbi;
+ private final Clock clock;
private boolean isListenerFailed = false;
private String listenerFailedMsg;
@@ -74,10 +76,11 @@ public class TestApiListener {
private volatile boolean completed;
@Inject
- public TestApiListener(final IDBI idbi) {
+ public TestApiListener(final IDBI idbi, final Clock clock) {
nextExpectedEvent = new Stack<NextEvent>();
this.completed = false;
this.idbi = idbi;
+ this.clock = clock;
}
public void assertListenerStatus() {
@@ -330,16 +333,9 @@ public class TestApiListener {
await().atMost(timeout, TimeUnit.MILLISECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- final long inProcessing = idbi.withHandle(new HandleCallback<Long>() {
- @Override
- public Long withHandle(final Handle handle) throws Exception {
- return (Long) handle.select("select count(distinct record_id) count from bus_events").get(0).get("count") +
- // We assume all ready notifications have been picked up
- (Long) handle.select("select count(distinct record_id) count from notifications where processing_state = 'IN_PROCESSING'").get(0).get("count");
- }
- });
- log.debug("Events still in processing: {}", inProcessing);
- return inProcessing == 0;
+ final long pending = idbi.withHandle(new PendingBusOrNotificationCallback(clock));
+ log.debug("Events still in processing: {}", pending);
+ return pending == 0;
}
});
return completed;
@@ -347,19 +343,9 @@ public class TestApiListener {
final long after = System.currentTimeMillis();
waitTimeMs -= (after - before);
} catch (final Exception ignore) {
- final StringBuilder errorBuilder = new StringBuilder("isCompleted got interrupted. Exception: ").append(ignore)
- .append("\nRemaining bus events:\n");
- idbi.withHandle(new HandleCallback<Void>() {
- @Override
- public Void withHandle(final Handle handle) throws Exception {
- final List<Map<String, Object>> busEvents = handle.select("select * from bus_events");
- for (final Map<String, Object> busEvent : busEvents) {
- errorBuilder.append(busEvent).append("\n");
- }
- return null;
- }
- });
- log.error(errorBuilder.toString());
+ // Rerun one more time to provide details
+ final long pending = idbi.withHandle(new PendingBusOrNotificationCallback(clock));
+ log.error("isCompleted : Received all events but found remaining unprocessed bus events/notifications = {}", pending);
return false;
}
} while (waitTimeMs > 0 && !completed);
@@ -369,10 +355,24 @@ public class TestApiListener {
final Joiner joiner = Joiner.on(" ");
log.error("TestApiListener did not complete in " + timeout + " ms, remaining events are " + joiner.join(nextExpectedEvent));
}
-
return completed;
}
+ private static class PendingBusOrNotificationCallback implements HandleCallback<Long> {
+
+ private final Clock clock;
+
+ public PendingBusOrNotificationCallback(final Clock clock) {
+ this.clock = clock;
+ }
+ @Override
+ public Long withHandle(final Handle handle) throws Exception {
+ return (Long) handle.select("select count(distinct record_id) count from bus_events").get(0).get("count") +
+ (Long) handle.select("select count(distinct record_id) count from notifications where effective_date < ?", clock.getUTCNow().toDate()).get(0).get("count") +
+ (Long) handle.select("select count(distinct record_id) count from notifications where processing_state = 'IN_PROCESSING'").get(0).get("count");
+ }
+ }
+
private void notifyIfStackEmpty() {
log.debug("TestApiListener notifyIfStackEmpty ENTER");
synchronized (this) {