killbill-memoizeit

Details

diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index 9986bbf..3b27fa6 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -43,6 +43,7 @@ import com.ning.billing.invoice.api.user.DefaultInvoiceCreationNotification;
 import com.ning.billing.invoice.model.FixedPriceInvoiceItem;
 import com.ning.billing.invoice.model.RecurringInvoiceItem;
 import com.ning.billing.invoice.notification.DefaultNextBillingDateNotifier;
+import com.ning.billing.invoice.notification.NextBillingDatePoster;
 import com.ning.billing.util.bus.Bus;
 import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
@@ -60,19 +61,19 @@ public class DefaultInvoiceDao implements InvoiceDao {
 
     private final Bus eventBus;
 
-	private NotificationQueueService notificationQueueService;
+	private NextBillingDatePoster nextBillingDatePoster;
 
     @Inject
     public DefaultInvoiceDao(final IDBI dbi, final Bus eventBus,
                              final EntitlementBillingApi entitlementBillingApi,
-                             NotificationQueueService notificationQueueService) {
+                             NextBillingDatePoster nextBillingDatePoster) {
         this.invoiceSqlDao = dbi.onDemand(InvoiceSqlDao.class);
         this.recurringInvoiceItemSqlDao = dbi.onDemand(RecurringInvoiceItemSqlDao.class);
         this.fixedPriceInvoiceItemSqlDao = dbi.onDemand(FixedPriceInvoiceItemSqlDao.class);
         this.invoicePaymentSqlDao = dbi.onDemand(InvoicePaymentSqlDao.class);
         this.eventBus = eventBus;
         this.entitlementBillingApi = entitlementBillingApi;
-        this.notificationQueueService = notificationQueueService;
+        this.nextBillingDatePoster = nextBillingDatePoster;
     }
 
     @Override
@@ -282,30 +283,12 @@ public class DefaultInvoiceDao implements InvoiceDao {
                 if ((recurringInvoiceItem.getEndDate() != null) &&
                         (recurringInvoiceItem.getAmount() == null ||
                                 recurringInvoiceItem.getAmount().compareTo(BigDecimal.ZERO) >= 0)) {
-                insertNextBillingNotification(dao, item.getSubscriptionId(), recurringInvoiceItem.getEndDate());
+                	nextBillingDatePoster.insertNextBillingNotification(dao, item.getSubscriptionId(), recurringInvoiceItem.getEndDate());
                 }
             }
         }
     }
     
-    public void insertNextBillingNotification(final Transmogrifier transactionalDao, final UUID subscriptionId, final DateTime futureNotificationTime) {
-    	NotificationQueue nextBillingQueue;
-		try {
-			nextBillingQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
-					DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
-			 log.info("Queuing next billing date notification. id: {}, timestamp: {}", subscriptionId.toString(), futureNotificationTime.toString());
-
-	            nextBillingQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NotificationKey(){
-	                @Override
-	                public String toString() {
-	                    return subscriptionId.toString();
-	                }
-	    	    });
-		} catch (NoSuchNotificationQueue e) {
-			log.error("Attempting to put items on a non-existent queue (NextBillingDateNotifier).", e);
-		}
-    }
-
     private void setChargedThroughDates(final InvoiceSqlDao dao, final Collection<InvoiceItem> fixedPriceItems,
                                         final Collection<InvoiceItem> recurringItems) {
         Map<UUID, DateTime> chargeThroughDates = new HashMap<UUID, DateTime>();
diff --git a/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java b/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java
index 20765eb..1dfac5b 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java
@@ -16,8 +16,8 @@
 
 package com.ning.billing.invoice.glue;
 
-import com.ning.billing.util.glue.GlobalLockerModule;
 import org.skife.config.ConfigurationObjectFactory;
+
 import com.google.inject.AbstractModule;
 import com.ning.billing.config.InvoiceConfig;
 import com.ning.billing.invoice.InvoiceListener;
@@ -32,8 +32,11 @@ import com.ning.billing.invoice.dao.InvoiceDao;
 import com.ning.billing.invoice.model.DefaultInvoiceGenerator;
 import com.ning.billing.invoice.model.InvoiceGenerator;
 import com.ning.billing.invoice.notification.DefaultNextBillingDateNotifier;
+import com.ning.billing.invoice.notification.DefaultNextBillingDatePoster;
 import com.ning.billing.invoice.notification.NextBillingDateNotifier;
+import com.ning.billing.invoice.notification.NextBillingDatePoster;
 import com.ning.billing.util.glue.ClockModule;
+import com.ning.billing.util.glue.GlobalLockerModule;
 
 
 public class InvoiceModule extends AbstractModule {
@@ -64,6 +67,7 @@ public class InvoiceModule extends AbstractModule {
 
     protected void installNotifier() {
         bind(NextBillingDateNotifier.class).to(DefaultNextBillingDateNotifier.class).asEagerSingleton();
+        bind(NextBillingDatePoster.class).to(DefaultNextBillingDatePoster.class).asEagerSingleton();
     }
 
     protected void installInvoiceListener() {
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
new file mode 100644
index 0000000..8ddb29d
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.invoice.api.DefaultInvoiceService;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
+public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
+    private final static Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+
+	private final NotificationQueueService notificationQueueService;
+
+	@Inject
+    public DefaultNextBillingDatePoster(
+			NotificationQueueService notificationQueueService) {
+		super();
+		this.notificationQueueService = notificationQueueService;
+	}
+
+	@Override
+	public void insertNextBillingNotification(final Transmogrifier transactionalDao, final UUID subscriptionId, final DateTime futureNotificationTime) {
+    	NotificationQueue nextBillingQueue;
+		try {
+			nextBillingQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
+					DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
+			 log.info("Queuing next billing date notification. id: {}, timestamp: {}", subscriptionId.toString(), futureNotificationTime.toString());
+
+	            nextBillingQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NotificationKey(){
+	                @Override
+	                public String toString() {
+	                    return subscriptionId.toString();
+	                }
+	    	    });
+		} catch (NoSuchNotificationQueue e) {
+			log.error("Attempting to put items on a non-existent queue (NextBillingDateNotifier).", e);
+		}
+    }
+}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
index febdc75..ea630aa 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
@@ -16,10 +16,6 @@
 
 package com.ning.billing.invoice.notification;
 
-import java.util.UUID;
-
-import org.joda.time.DateTime;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 public interface NextBillingDateNotifier {
 
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
new file mode 100644
index 0000000..67598d1
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+public interface NextBillingDatePoster {
+
+	void insertNextBillingNotification(Transmogrifier transactionalDao,
+			UUID subscriptionId, DateTime futureNotificationTime);
+
+}
\ No newline at end of file
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
index 4fe003c..67c27fa 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
@@ -74,6 +74,7 @@ public class TestNextBillingDateNotifier {
 	private Bus eventBus;
 	private MysqlTestingHelper helper;
 	private InvoiceListenerMock listener = new InvoiceListenerMock();
+	private NotificationQueueService notificationQueueService;
 
 	private static final class InvoiceListenerMock extends InvoiceListener {
 		int eventCount = 0;
@@ -251,7 +252,8 @@ public class TestNextBillingDateNotifier {
         dao = dbi.onDemand(DummySqlTest.class);
         eventBus = g.getInstance(Bus.class);
         helper = g.getInstance(MysqlTestingHelper.class);
-        notifier = new DefaultNextBillingDateNotifier(g.getInstance(NotificationQueueService.class),g.getInstance(InvoiceConfig.class), new MockEntitlementDao(), listener);
+        notificationQueueService = g.getInstance(NotificationQueueService.class);
+        notifier = new DefaultNextBillingDateNotifier(notificationQueueService,g.getInstance(InvoiceConfig.class), new MockEntitlementDao(), listener);
         startMysql();
 	}
 
@@ -266,22 +268,24 @@ public class TestNextBillingDateNotifier {
 	}
 
 
-	@Test(enabled=false, groups="slow")
+	@Test(enabled=true, groups="slow")
 	public void test() throws Exception {
 		final UUID subscriptionId = new UUID(0L,1L);
 		final DateTime now = new DateTime();
 		final DateTime readyTime = now.plusMillis(2000);
+		final NextBillingDatePoster poster = new DefaultNextBillingDatePoster(notificationQueueService); 
 
 		eventBus.start();
 		notifier.initialize();
 		notifier.start();
+		
 
 		dao.inTransaction(new Transaction<Void, DummySqlTest>() {
 			@Override
 			public Void inTransaction(DummySqlTest transactional,
 					TransactionStatus status) throws Exception {
-				//DefaultInvoiceDao insertDao = new DefaultInvoiceDao();
-				((DefaultInvoiceDao)dao).insertNextBillingNotification(transactional, subscriptionId, readyTime);
+
+				poster.insertNextBillingNotification(transactional, subscriptionId, readyTime);
 				return null;
 			}
 		});
diff --git a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
index 9e4460b..31105c8 100644
--- a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
@@ -28,9 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public class InMemoryBus implements Bus {
 
-    // STEPH config ?
-    private final static int MAX_EVENT_THREADS = 1;
-
     private final static String EVENT_BUS_IDENTIFIER = "bus-service";
     private final static String EVENT_BUS_GROUP_NAME = "bus-grp";
     private final static String EVENT_BUS_TH_NAME = "bus-th";
@@ -68,7 +65,7 @@ public class InMemoryBus implements Bus {
     public InMemoryBus() {
 
         final ThreadGroup group = new ThreadGroup(EVENT_BUS_GROUP_NAME);
-        Executor executor = Executors.newFixedThreadPool(MAX_EVENT_THREADS, new ThreadFactory() {
+        Executor executor = Executors.newCachedThreadPool(new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                 return new Thread(group, r, EVENT_BUS_TH_NAME);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 4b4aa89..8e2aaf8 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -38,27 +38,29 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     }
 
     @Override
-    protected boolean doProcessEvents(final int sequenceId) {
+    protected int doProcessEvents(final int sequenceId) {
 
         logDebug("ENTER doProcessEvents");
         List<Notification> notifications = getReadyNotifications(sequenceId);
         if (notifications.size() == 0) {
             logDebug("EXIT doProcessEvents");
-            return false;
+            return 0;
         }
 
         logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
 
+        int result = 0;
         for (final Notification cur : notifications) {
             nbProcessedEvents.incrementAndGet();
             logDebug("handling notification %s, key = %s for time %s",
                     cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
             handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
+            result++;
             clearNotification(cur);
             logDebug("done handling notification %s, key = %s for time %s",
                     cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
         }
-        return true;
+        return result;
     }
 
     @Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 4826356..e1dcdbf 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -38,9 +38,9 @@ public interface NotificationQueue {
     * This is only valid when the queue has been configured with isNotificationProcessingOff is true
     * In which case, it will callback users for all the ready notifications.
     *
-    * @return true if we processed some active notifications
+    * @return the number of entries we processed
     */
-   public boolean processReadyNotification();
+   public int processReadyNotification();
 
    /**
     * Stops the queue. Blocks until queue is completely stopped.
@@ -56,4 +56,10 @@ public interface NotificationQueue {
     */
    public void startQueue();
 
+   /**
+    *
+    * @return the name of that queue
+    */
+   public String getFullQName();
+
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 15679f6..cc1ea28 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -88,7 +88,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
 
 
     @Override
-    public boolean processReadyNotification() {
+    public int processReadyNotification() {
         return doProcessEvents(sequenceId.incrementAndGet());
     }
 
@@ -233,9 +233,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
         }
     }
 
-    protected String getFullQName() {
+    @Override
+    public String getFullQName() {
         return svcName + ":" +  queueName;
     }
 
-    protected abstract boolean doProcessEvents(int sequenceId);
+    protected abstract int doProcessEvents(int sequenceId);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index 72816f2..4d56b03 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -78,7 +78,7 @@ public interface NotificationQueueService {
     /**
      *
      * @param services
-     * @return whether or not things were ready in the queue
+     * @return the number of processed notifications
      */
-    public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
+    public int triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 85d92c9..3f8f26f 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -87,9 +87,9 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
     // Test ONLY
     //
     @Override
-    public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
+    public int triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
 
-        boolean result = false;
+        int result = 0;
 
         List<NotificationQueue> manualQueues = null;
         if (services == null) {
@@ -107,13 +107,12 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
             }
         }
         for (NotificationQueue cur : manualQueues) {
-            boolean processedNotifications = true;
+            int processedNotifications = 0;
             do {
                 processedNotifications = cur.processReadyNotification();
-                if (result == false) {
-                    result = processedNotifications;
-                }
-            } while(keepRunning && processedNotifications);
+                log.info("Got {} results from queue {}", processedNotifications, cur.getFullQName());
+                result += processedNotifications;
+            } while(keepRunning && processedNotifications > 0);
         }
         return result;
     }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index c3eecc0..e96d2cf 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -59,9 +59,9 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    protected boolean doProcessEvents(int sequenceId) {
+    protected int doProcessEvents(int sequenceId) {
 
-        boolean result = false;
+        int result = 0;
 
         List<Notification> processedNotifications = new ArrayList<Notification>();
         List<Notification> oldNotifications = new ArrayList<Notification>();
@@ -76,7 +76,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
                 }
             }
 
-            result = readyNotifications.size() > 0;
+            result = readyNotifications.size();
             for (Notification cur : readyNotifications) {
                 handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
                 DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());