killbill-aplcache

Details

diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 3c19a2b..97d3bf9 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -38,13 +38,13 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     }
 
     @Override
-    protected void doProcessEvents(final int sequenceId) {
+    protected boolean doProcessEvents(final int sequenceId) {
 
         logDebug("ENTER doProcessEvents");
         List<Notification> notifications = getReadyNotifications(sequenceId);
         if (notifications.size() == 0) {
             logDebug("EXIT doProcessEvents");
-            return;
+            return false;
         }
 
         logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
@@ -58,6 +58,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
             logDebug("done handling notification %s, key = %s for time %s",
                     cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
         }
+        return true;
     }
 
     @Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 4ea38f7..4826356 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -38,8 +38,9 @@ public interface NotificationQueue {
     * This is only valid when the queue has been configured with isNotificationProcessingOff is true
     * In which case, it will callback users for all the ready notifications.
     *
+    * @return true if we processed some active notifications
     */
-   public void processReadyNotification();
+   public boolean processReadyNotification();
 
    /**
     * Stops the queue. Blocks until queue is completely stopped.
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 9a42d2e..15679f6 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -57,10 +57,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
     // Use this object's monitor for synchronization (no need for volatile)
     protected boolean isProcessingEvents;
-    
+
     private boolean startedComplete = false;
     private boolean stoppedComplete = false;
-    
+
     // Package visibility on purpose
     NotificationQueueBase(final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
         this.clock = clock;
@@ -88,8 +88,8 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
 
     @Override
-    public void processReadyNotification() {
-        doProcessEvents(sequenceId.incrementAndGet());
+    public boolean processReadyNotification() {
+        return doProcessEvents(sequenceId.incrementAndGet());
     }
 
 
@@ -181,14 +181,14 @@ public abstract class NotificationQueueBase implements NotificationQueue {
         });
         waitForNotificationStartCompletion();
     }
-    
+
     private void completedQueueStop() {
     	synchronized (this) {
     		stoppedComplete = true;
             this.notifyAll();
         }
     }
-    
+
     private void completedQueueStart() {
         synchronized (this) {
         	startedComplete = true;
@@ -237,5 +237,5 @@ public abstract class NotificationQueueBase implements NotificationQueue {
         return svcName + ":" +  queueName;
     }
 
-    protected abstract void doProcessEvents(int sequenceId);
+    protected abstract boolean doProcessEvents(int sequenceId);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index 5cb00aa..ee04781 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -76,7 +76,7 @@ public interface NotificationQueueService {
     /**
      *
      * @param services
-     * @return
+     * @return whether or not things were ready in the queue
      */
-    public void triggerManualQueueProcessing(final String [] services);
+    public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 7833529..85d92c9 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -83,8 +83,13 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
     }
 
 
+    //
+    // Test ONLY
+    //
     @Override
-    public void triggerManualQueueProcessing(final String [] services) {
+    public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
+
+        boolean result = false;
 
         List<NotificationQueue> manualQueues = null;
         if (services == null) {
@@ -102,8 +107,15 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
             }
         }
         for (NotificationQueue cur : manualQueues) {
-            cur.processReadyNotification();
+            boolean processedNotifications = true;
+            do {
+                processedNotifications = cur.processReadyNotification();
+                if (result == false) {
+                    result = processedNotifications;
+                }
+            } while(keepRunning && processedNotifications);
         }
+        return result;
     }
 
     private final void addQueuesForService(final List<NotificationQueue> result, final String svcName) {
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index e1da366..922fb29 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -59,7 +59,9 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    protected void doProcessEvents(int sequenceId) {
+    protected boolean doProcessEvents(int sequenceId) {
+
+        boolean result = false;
 
         List<Notification> processedNotifications = new ArrayList<Notification>();
         List<Notification> oldNotifications = new ArrayList<Notification>();
@@ -73,7 +75,10 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
                     readyNotifications.add(cur);
                 }
             }
+
+            result = readyNotifications.size() > 0;
             for (Notification cur : readyNotifications) {
+
                 handler.handleReadyNotification(cur.getNotificationKey());
                 DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
                 oldNotifications.add(cur);
@@ -87,6 +92,6 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
                 notifications.addAll(processedNotifications);
             }
         }
-
+        return result;
     }
 }