killbill-memoizeit

Details

diff --git a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
index 9e4460b..31105c8 100644
--- a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
@@ -28,9 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public class InMemoryBus implements Bus {
 
-    // STEPH config ?
-    private final static int MAX_EVENT_THREADS = 1;
-
     private final static String EVENT_BUS_IDENTIFIER = "bus-service";
     private final static String EVENT_BUS_GROUP_NAME = "bus-grp";
     private final static String EVENT_BUS_TH_NAME = "bus-th";
@@ -68,7 +65,7 @@ public class InMemoryBus implements Bus {
     public InMemoryBus() {
 
         final ThreadGroup group = new ThreadGroup(EVENT_BUS_GROUP_NAME);
-        Executor executor = Executors.newFixedThreadPool(MAX_EVENT_THREADS, new ThreadFactory() {
+        Executor executor = Executors.newCachedThreadPool(new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                 return new Thread(group, r, EVENT_BUS_TH_NAME);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 4b4aa89..8e2aaf8 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -38,27 +38,29 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     }
 
     @Override
-    protected boolean doProcessEvents(final int sequenceId) {
+    protected int doProcessEvents(final int sequenceId) {
 
         logDebug("ENTER doProcessEvents");
         List<Notification> notifications = getReadyNotifications(sequenceId);
         if (notifications.size() == 0) {
             logDebug("EXIT doProcessEvents");
-            return false;
+            return 0;
         }
 
         logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
 
+        int result = 0;
         for (final Notification cur : notifications) {
             nbProcessedEvents.incrementAndGet();
             logDebug("handling notification %s, key = %s for time %s",
                     cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
             handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
+            result++;
             clearNotification(cur);
             logDebug("done handling notification %s, key = %s for time %s",
                     cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
         }
-        return true;
+        return result;
     }
 
     @Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 4826356..e1dcdbf 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -38,9 +38,9 @@ public interface NotificationQueue {
     * This is only valid when the queue has been configured with isNotificationProcessingOff is true
     * In which case, it will callback users for all the ready notifications.
     *
-    * @return true if we processed some active notifications
+    * @return the number of entries we processed
     */
-   public boolean processReadyNotification();
+   public int processReadyNotification();
 
    /**
     * Stops the queue. Blocks until queue is completely stopped.
@@ -56,4 +56,10 @@ public interface NotificationQueue {
     */
    public void startQueue();
 
+   /**
+    *
+    * @return the name of that queue
+    */
+   public String getFullQName();
+
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 15679f6..cc1ea28 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -88,7 +88,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
 
     @Override
-    public boolean processReadyNotification() {
+    public int processReadyNotification() {
         return doProcessEvents(sequenceId.incrementAndGet());
     }
 
@@ -233,9 +233,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
         }
     }
 
-    protected String getFullQName() {
+    @Override
+    public String getFullQName() {
         return svcName + ":" +  queueName;
     }
 
-    protected abstract boolean doProcessEvents(int sequenceId);
+    protected abstract int doProcessEvents(int sequenceId);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index 72816f2..4d56b03 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -78,7 +78,7 @@ public interface NotificationQueueService {
     /**
      *
      * @param services
-     * @return whether or not things were ready in the queue
+     * @return the number of processed notifications
      */
-    public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
+    public int triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 85d92c9..3f8f26f 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -87,9 +87,9 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
     // Test ONLY
     //
     @Override
-    public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
+    public int triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
 
-        boolean result = false;
+        int result = 0;
 
         List<NotificationQueue> manualQueues = null;
         if (services == null) {
@@ -107,13 +107,12 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
             }
         }
         for (NotificationQueue cur : manualQueues) {
-            boolean processedNotifications = true;
+            int processedNotifications = 0;
             do {
                 processedNotifications = cur.processReadyNotification();
-                if (result == false) {
-                    result = processedNotifications;
-                }
-            } while(keepRunning && processedNotifications);
+                log.info("Got {} results from queue {}", processedNotifications, cur.getFullQName());
+                result += processedNotifications;
+            } while(keepRunning && processedNotifications > 0);
         }
         return result;
     }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index c3eecc0..e96d2cf 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -59,9 +59,9 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    protected boolean doProcessEvents(int sequenceId) {
+    protected int doProcessEvents(int sequenceId) {
 
-        boolean result = false;
+        int result = 0;
 
         List<Notification> processedNotifications = new ArrayList<Notification>();
         List<Notification> oldNotifications = new ArrayList<Notification>();
@@ -76,7 +76,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
                 }
             }
 
-            result = readyNotifications.size() > 0;
+            result = readyNotifications.size();
             for (Notification cur : readyNotifications) {
                 handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
                 DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());