Details
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 3c19a2b..97d3bf9 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -38,13 +38,13 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
}
@Override
- protected void doProcessEvents(final int sequenceId) {
+ protected boolean doProcessEvents(final int sequenceId) {
logDebug("ENTER doProcessEvents");
List<Notification> notifications = getReadyNotifications(sequenceId);
if (notifications.size() == 0) {
logDebug("EXIT doProcessEvents");
- return;
+ return false;
}
logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
@@ -58,6 +58,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
logDebug("done handling notification %s, key = %s for time %s",
cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
}
+ return true;
}
@Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 4ea38f7..4826356 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -38,8 +38,9 @@ public interface NotificationQueue {
* This is only valid when the queue has been configured with isNotificationProcessingOff is true
* In which case, it will callback users for all the ready notifications.
*
+ * @return true if we processed some active notifications
*/
- public void processReadyNotification();
+ public boolean processReadyNotification();
/**
* Stops the queue. Blocks until queue is completely stopped.
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 9a42d2e..15679f6 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -57,10 +57,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
// Use this object's monitor for synchronization (no need for volatile)
protected boolean isProcessingEvents;
-
+
private boolean startedComplete = false;
private boolean stoppedComplete = false;
-
+
// Package visibility on purpose
NotificationQueueBase(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
this.clock = clock;
@@ -88,8 +88,8 @@ public abstract class NotificationQueueBase implements NotificationQueue {
@Override
- public void processReadyNotification() {
- doProcessEvents(sequenceId.incrementAndGet());
+ public boolean processReadyNotification() {
+ return doProcessEvents(sequenceId.incrementAndGet());
}
@@ -181,14 +181,14 @@ public abstract class NotificationQueueBase implements NotificationQueue {
});
waitForNotificationStartCompletion();
}
-
+
private void completedQueueStop() {
synchronized (this) {
stoppedComplete = true;
this.notifyAll();
}
}
-
+
private void completedQueueStart() {
synchronized (this) {
startedComplete = true;
@@ -237,5 +237,5 @@ public abstract class NotificationQueueBase implements NotificationQueue {
return svcName + ":" + queueName;
}
- protected abstract void doProcessEvents(int sequenceId);
+ protected abstract boolean doProcessEvents(int sequenceId);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index 5cb00aa..ee04781 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -76,7 +76,7 @@ public interface NotificationQueueService {
/**
*
* @param services
- * @return
+ * @return whether or not things were ready in the queue
*/
- public void triggerManualQueueProcessing(final String [] services);
+ public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 7833529..85d92c9 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -83,8 +83,13 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
}
+ //
+ // Test ONLY
+ //
@Override
- public void triggerManualQueueProcessing(final String [] services) {
+ public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
+
+ boolean result = false;
List<NotificationQueue> manualQueues = null;
if (services == null) {
@@ -102,8 +107,15 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
}
}
for (NotificationQueue cur : manualQueues) {
- cur.processReadyNotification();
+ boolean processedNotifications = true;
+ do {
+ processedNotifications = cur.processReadyNotification();
+ if (result == false) {
+ result = processedNotifications;
+ }
+ } while(keepRunning && processedNotifications);
}
+ return result;
}
private final void addQueuesForService(final List<NotificationQueue> result, final String svcName) {
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index e1da366..922fb29 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -59,7 +59,9 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
}
@Override
- protected void doProcessEvents(int sequenceId) {
+ protected boolean doProcessEvents(int sequenceId) {
+
+ boolean result = false;
List<Notification> processedNotifications = new ArrayList<Notification>();
List<Notification> oldNotifications = new ArrayList<Notification>();
@@ -73,7 +75,10 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
readyNotifications.add(cur);
}
}
+
+ result = readyNotifications.size() > 0;
for (Notification cur : readyNotifications) {
+
handler.handleReadyNotification(cur.getNotificationKey());
DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
oldNotifications.add(cur);
@@ -87,6 +92,6 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
notifications.addAll(processedNotifications);
}
}
-
+ return result;
}
}