killbill-memoizeit

Details

diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java
index 6bc9169..d04d768 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestBasic.java
@@ -187,7 +187,7 @@ public class TestBasic {
             public Void inTransaction(Handle h, TransactionStatus status)
                     throws Exception {
                 h.execute("truncate table accounts");
-                h.execute("truncate table events");
+                h.execute("truncate table entitlement_events");
                 h.execute("truncate table subscriptions");
                 h.execute("truncate table bundles");
                 h.execute("truncate table notifications");
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
index 55ad7f4..dfdc746 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
@@ -1,5 +1,5 @@
-DROP TABLE IF EXISTS events;
-CREATE TABLE events (
+DROP TABLE IF EXISTS entitlement_events;
+CREATE TABLE entitlement_events (
     id int(11) unsigned NOT NULL AUTO_INCREMENT,
     event_id char(36) NOT NULL,
     event_type varchar(9) NOT NULL,
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
index 704e2c7..10f565d 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
@@ -15,14 +15,14 @@ getEventById(event_id) ::= <<
       , plist_name
       , current_version
       , is_active  
-  from events
+  from entitlement_events
   where
       event_id = :event_id
   ;
 >>
 
 insertEvent() ::= <<
-    insert into events (
+    insert into entitlement_events (
       event_id
       , event_type
       , user_type
@@ -54,14 +54,14 @@ insertEvent() ::= <<
 >>
 
 removeEvents(subscription_id) ::= <<
-    delete from events
+    delete from entitlement_events
       where
     subscription_id = :subscription_id
     ;
 >>
 
 unactiveEvent(event_id, now) ::= <<
-    update events
+    update entitlement_events
     set
       is_active = 0
       , updated_dt = :now
@@ -71,7 +71,7 @@ unactiveEvent(event_id, now) ::= <<
 >>
 
 reactiveEvent(event_id, now) ::= <<
-    update events
+    update entitlement_events
     set
       is_active = 1
       , updated_dt = :now
@@ -95,7 +95,7 @@ getFutureActiveEventForSubscription(subscription_id, now) ::= <<
       , plist_name
       , current_version
       , is_active
-    from events
+    from entitlement_events
     where
       subscription_id = :subscription_id
       and is_active = 1
@@ -123,7 +123,7 @@ getEventsForSubscription(subscription_id) ::= <<
       , plist_name
       , current_version
       , is_active
-    from events
+    from entitlement_events
     where
       subscription_id = :subscription_id
     order by
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
index 2204274..c5881f9 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
@@ -58,7 +58,7 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
 
     public static interface ResetSqlDao extends Transactional<ResetSqlDao>, CloseMe {
 
-        @SqlUpdate("truncate table events")
+        @SqlUpdate("truncate table entitlement_events")
         public void resetEvents();
 
         @SqlUpdate("truncate table subscriptions")
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 3c19a2b..97d3bf9 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -38,13 +38,13 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     }
 
     @Override
-    protected void doProcessEvents(final int sequenceId) {
+    protected boolean doProcessEvents(final int sequenceId) {
 
         logDebug("ENTER doProcessEvents");
         List<Notification> notifications = getReadyNotifications(sequenceId);
         if (notifications.size() == 0) {
             logDebug("EXIT doProcessEvents");
-            return;
+            return false;
         }
 
         logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
@@ -58,6 +58,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
             logDebug("done handling notification %s, key = %s for time %s",
                     cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
         }
+        return true;
     }
 
     @Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 4ea38f7..4826356 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -38,8 +38,9 @@ public interface NotificationQueue {
     * This is only valid when the queue has been configured with isNotificationProcessingOff is true
     * In which case, it will callback users for all the ready notifications.
     *
+    * @return true if we processed some active notifications
     */
-   public void processReadyNotification();
+   public boolean processReadyNotification();
 
    /**
     * Stops the queue. Blocks until queue is completely stopped.
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 9a42d2e..15679f6 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -57,10 +57,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
     // Use this object's monitor for synchronization (no need for volatile)
     protected boolean isProcessingEvents;
-    
+
     private boolean startedComplete = false;
     private boolean stoppedComplete = false;
-    
+
     // Package visibility on purpose
     NotificationQueueBase(final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
         this.clock = clock;
@@ -88,8 +88,8 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
 
     @Override
-    public void processReadyNotification() {
-        doProcessEvents(sequenceId.incrementAndGet());
+    public boolean processReadyNotification() {
+        return doProcessEvents(sequenceId.incrementAndGet());
     }
 
 
@@ -181,14 +181,14 @@ public abstract class NotificationQueueBase implements NotificationQueue {
         });
         waitForNotificationStartCompletion();
     }
-    
+
     private void completedQueueStop() {
     	synchronized (this) {
     		stoppedComplete = true;
             this.notifyAll();
         }
     }
-    
+
     private void completedQueueStart() {
         synchronized (this) {
         	startedComplete = true;
@@ -237,5 +237,5 @@ public abstract class NotificationQueueBase implements NotificationQueue {
         return svcName + ":" +  queueName;
     }
 
-    protected abstract void doProcessEvents(int sequenceId);
+    protected abstract boolean doProcessEvents(int sequenceId);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index 5cb00aa..ee04781 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -76,7 +76,7 @@ public interface NotificationQueueService {
     /**
      *
      * @param services
-     * @return
+     * @return whether or not things were ready in the queue
      */
-    public void triggerManualQueueProcessing(final String [] services);
+    public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 7833529..85d92c9 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -83,8 +83,13 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
     }
 
 
+    //
+    // Test ONLY
+    //
     @Override
-    public void triggerManualQueueProcessing(final String [] services) {
+    public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
+
+        boolean result = false;
 
         List<NotificationQueue> manualQueues = null;
         if (services == null) {
@@ -102,8 +107,15 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
             }
         }
         for (NotificationQueue cur : manualQueues) {
-            cur.processReadyNotification();
+            boolean processedNotifications = true;
+            do {
+                processedNotifications = cur.processReadyNotification();
+                if (result == false) {
+                    result = processedNotifications;
+                }
+            } while(keepRunning && processedNotifications);
         }
+        return result;
     }
 
     private final void addQueuesForService(final List<NotificationQueue> result, final String svcName) {
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index e1da366..922fb29 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -59,7 +59,9 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    protected void doProcessEvents(int sequenceId) {
+    protected boolean doProcessEvents(int sequenceId) {
+
+        boolean result = false;
 
         List<Notification> processedNotifications = new ArrayList<Notification>();
         List<Notification> oldNotifications = new ArrayList<Notification>();
@@ -73,7 +75,10 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
                     readyNotifications.add(cur);
                 }
             }
+
+            result = readyNotifications.size() > 0;
             for (Notification cur : readyNotifications) {
+
                 handler.handleReadyNotification(cur.getNotificationKey());
                 DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
                 oldNotifications.add(cur);
@@ -87,6 +92,6 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
                 notifications.addAll(processedNotifications);
             }
         }
-
+        return result;
     }
 }