killbill-memoizeit
Changes
invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java 63(+63 -0)
Details
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index 9986bbf..3b27fa6 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -43,6 +43,7 @@ import com.ning.billing.invoice.api.user.DefaultInvoiceCreationNotification;
import com.ning.billing.invoice.model.FixedPriceInvoiceItem;
import com.ning.billing.invoice.model.RecurringInvoiceItem;
import com.ning.billing.invoice.notification.DefaultNextBillingDateNotifier;
+import com.ning.billing.invoice.notification.NextBillingDatePoster;
import com.ning.billing.util.bus.Bus;
import com.ning.billing.util.notificationq.NotificationKey;
import com.ning.billing.util.notificationq.NotificationQueue;
@@ -60,19 +61,19 @@ public class DefaultInvoiceDao implements InvoiceDao {
private final Bus eventBus;
- private NotificationQueueService notificationQueueService;
+ private NextBillingDatePoster nextBillingDatePoster;
@Inject
public DefaultInvoiceDao(final IDBI dbi, final Bus eventBus,
final EntitlementBillingApi entitlementBillingApi,
- NotificationQueueService notificationQueueService) {
+ NextBillingDatePoster nextBillingDatePoster) {
this.invoiceSqlDao = dbi.onDemand(InvoiceSqlDao.class);
this.recurringInvoiceItemSqlDao = dbi.onDemand(RecurringInvoiceItemSqlDao.class);
this.fixedPriceInvoiceItemSqlDao = dbi.onDemand(FixedPriceInvoiceItemSqlDao.class);
this.invoicePaymentSqlDao = dbi.onDemand(InvoicePaymentSqlDao.class);
this.eventBus = eventBus;
this.entitlementBillingApi = entitlementBillingApi;
- this.notificationQueueService = notificationQueueService;
+ this.nextBillingDatePoster = nextBillingDatePoster;
}
@Override
@@ -282,30 +283,12 @@ public class DefaultInvoiceDao implements InvoiceDao {
if ((recurringInvoiceItem.getEndDate() != null) &&
(recurringInvoiceItem.getAmount() == null ||
recurringInvoiceItem.getAmount().compareTo(BigDecimal.ZERO) >= 0)) {
- insertNextBillingNotification(dao, item.getSubscriptionId(), recurringInvoiceItem.getEndDate());
+ nextBillingDatePoster.insertNextBillingNotification(dao, item.getSubscriptionId(), recurringInvoiceItem.getEndDate());
}
}
}
}
- public void insertNextBillingNotification(final Transmogrifier transactionalDao, final UUID subscriptionId, final DateTime futureNotificationTime) {
- NotificationQueue nextBillingQueue;
- try {
- nextBillingQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
- DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
- log.info("Queuing next billing date notification. id: {}, timestamp: {}", subscriptionId.toString(), futureNotificationTime.toString());
-
- nextBillingQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NotificationKey(){
- @Override
- public String toString() {
- return subscriptionId.toString();
- }
- });
- } catch (NoSuchNotificationQueue e) {
- log.error("Attempting to put items on a non-existent queue (NextBillingDateNotifier).", e);
- }
- }
-
private void setChargedThroughDates(final InvoiceSqlDao dao, final Collection<InvoiceItem> fixedPriceItems,
final Collection<InvoiceItem> recurringItems) {
Map<UUID, DateTime> chargeThroughDates = new HashMap<UUID, DateTime>();
diff --git a/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java b/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java
index 20765eb..1dfac5b 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/glue/InvoiceModule.java
@@ -16,8 +16,8 @@
package com.ning.billing.invoice.glue;
-import com.ning.billing.util.glue.GlobalLockerModule;
import org.skife.config.ConfigurationObjectFactory;
+
import com.google.inject.AbstractModule;
import com.ning.billing.config.InvoiceConfig;
import com.ning.billing.invoice.InvoiceListener;
@@ -32,8 +32,11 @@ import com.ning.billing.invoice.dao.InvoiceDao;
import com.ning.billing.invoice.model.DefaultInvoiceGenerator;
import com.ning.billing.invoice.model.InvoiceGenerator;
import com.ning.billing.invoice.notification.DefaultNextBillingDateNotifier;
+import com.ning.billing.invoice.notification.DefaultNextBillingDatePoster;
import com.ning.billing.invoice.notification.NextBillingDateNotifier;
+import com.ning.billing.invoice.notification.NextBillingDatePoster;
import com.ning.billing.util.glue.ClockModule;
+import com.ning.billing.util.glue.GlobalLockerModule;
public class InvoiceModule extends AbstractModule {
@@ -64,6 +67,7 @@ public class InvoiceModule extends AbstractModule {
protected void installNotifier() {
bind(NextBillingDateNotifier.class).to(DefaultNextBillingDateNotifier.class).asEagerSingleton();
+ bind(NextBillingDatePoster.class).to(DefaultNextBillingDatePoster.class).asEagerSingleton();
}
protected void installInvoiceListener() {
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
new file mode 100644
index 0000000..8ddb29d
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.invoice.api.DefaultInvoiceService;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
+public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
+ private final static Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+
+ private final NotificationQueueService notificationQueueService;
+
+ @Inject
+ public DefaultNextBillingDatePoster(
+ NotificationQueueService notificationQueueService) {
+ super();
+ this.notificationQueueService = notificationQueueService;
+ }
+
+ @Override
+ public void insertNextBillingNotification(final Transmogrifier transactionalDao, final UUID subscriptionId, final DateTime futureNotificationTime) {
+ NotificationQueue nextBillingQueue;
+ try {
+ nextBillingQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
+ DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
+ log.info("Queuing next billing date notification. id: {}, timestamp: {}", subscriptionId.toString(), futureNotificationTime.toString());
+
+ nextBillingQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NotificationKey(){
+ @Override
+ public String toString() {
+ return subscriptionId.toString();
+ }
+ });
+ } catch (NoSuchNotificationQueue e) {
+ log.error("Attempting to put items on a non-existent queue (NextBillingDateNotifier).", e);
+ }
+ }
+}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
index febdc75..ea630aa 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDateNotifier.java
@@ -16,10 +16,6 @@
package com.ning.billing.invoice.notification;
-import java.util.UUID;
-
-import org.joda.time.DateTime;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
public interface NextBillingDateNotifier {
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
new file mode 100644
index 0000000..67598d1
--- /dev/null
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.invoice.notification;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+public interface NextBillingDatePoster {
+
+ void insertNextBillingNotification(Transmogrifier transactionalDao,
+ UUID subscriptionId, DateTime futureNotificationTime);
+
+}
\ No newline at end of file
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
index 4fe003c..67c27fa 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
@@ -74,6 +74,7 @@ public class TestNextBillingDateNotifier {
private Bus eventBus;
private MysqlTestingHelper helper;
private InvoiceListenerMock listener = new InvoiceListenerMock();
+ private NotificationQueueService notificationQueueService;
private static final class InvoiceListenerMock extends InvoiceListener {
int eventCount = 0;
@@ -251,7 +252,8 @@ public class TestNextBillingDateNotifier {
dao = dbi.onDemand(DummySqlTest.class);
eventBus = g.getInstance(Bus.class);
helper = g.getInstance(MysqlTestingHelper.class);
- notifier = new DefaultNextBillingDateNotifier(g.getInstance(NotificationQueueService.class),g.getInstance(InvoiceConfig.class), new MockEntitlementDao(), listener);
+ notificationQueueService = g.getInstance(NotificationQueueService.class);
+ notifier = new DefaultNextBillingDateNotifier(notificationQueueService,g.getInstance(InvoiceConfig.class), new MockEntitlementDao(), listener);
startMysql();
}
@@ -266,22 +268,24 @@ public class TestNextBillingDateNotifier {
}
- @Test(enabled=false, groups="slow")
+ @Test(enabled=true, groups="slow")
public void test() throws Exception {
final UUID subscriptionId = new UUID(0L,1L);
final DateTime now = new DateTime();
final DateTime readyTime = now.plusMillis(2000);
+ final NextBillingDatePoster poster = new DefaultNextBillingDatePoster(notificationQueueService);
eventBus.start();
notifier.initialize();
notifier.start();
+
dao.inTransaction(new Transaction<Void, DummySqlTest>() {
@Override
public Void inTransaction(DummySqlTest transactional,
TransactionStatus status) throws Exception {
- //DefaultInvoiceDao insertDao = new DefaultInvoiceDao();
- ((DefaultInvoiceDao)dao).insertNextBillingNotification(transactional, subscriptionId, readyTime);
+
+ poster.insertNextBillingNotification(transactional, subscriptionId, readyTime);
return null;
}
});
diff --git a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
index 9e4460b..31105c8 100644
--- a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
@@ -28,9 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class InMemoryBus implements Bus {
- // STEPH config ?
- private final static int MAX_EVENT_THREADS = 1;
-
private final static String EVENT_BUS_IDENTIFIER = "bus-service";
private final static String EVENT_BUS_GROUP_NAME = "bus-grp";
private final static String EVENT_BUS_TH_NAME = "bus-th";
@@ -68,7 +65,7 @@ public class InMemoryBus implements Bus {
public InMemoryBus() {
final ThreadGroup group = new ThreadGroup(EVENT_BUS_GROUP_NAME);
- Executor executor = Executors.newFixedThreadPool(MAX_EVENT_THREADS, new ThreadFactory() {
+ Executor executor = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(group, r, EVENT_BUS_TH_NAME);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 4b4aa89..8e2aaf8 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -38,27 +38,29 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
}
@Override
- protected boolean doProcessEvents(final int sequenceId) {
+ protected int doProcessEvents(final int sequenceId) {
logDebug("ENTER doProcessEvents");
List<Notification> notifications = getReadyNotifications(sequenceId);
if (notifications.size() == 0) {
logDebug("EXIT doProcessEvents");
- return false;
+ return 0;
}
logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
+ int result = 0;
for (final Notification cur : notifications) {
nbProcessedEvents.incrementAndGet();
logDebug("handling notification %s, key = %s for time %s",
cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
+ result++;
clearNotification(cur);
logDebug("done handling notification %s, key = %s for time %s",
cur.getUUID(), cur.getNotificationKey(), cur.getEffectiveDate());
}
- return true;
+ return result;
}
@Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 4826356..e1dcdbf 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -38,9 +38,9 @@ public interface NotificationQueue {
* This is only valid when the queue has been configured with isNotificationProcessingOff is true
* In which case, it will callback users for all the ready notifications.
*
- * @return true if we processed some active notifications
+ * @return the number of entries we processed
*/
- public boolean processReadyNotification();
+ public int processReadyNotification();
/**
* Stops the queue. Blocks until queue is completely stopped.
@@ -56,4 +56,10 @@ public interface NotificationQueue {
*/
public void startQueue();
+ /**
+ *
+ * @return the name of that queue
+ */
+ public String getFullQName();
+
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
index 15679f6..cc1ea28 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -88,7 +88,7 @@ public abstract class NotificationQueueBase implements NotificationQueue {
@Override
- public boolean processReadyNotification() {
+ public int processReadyNotification() {
return doProcessEvents(sequenceId.incrementAndGet());
}
@@ -233,9 +233,10 @@ public abstract class NotificationQueueBase implements NotificationQueue {
}
}
- protected String getFullQName() {
+ @Override
+ public String getFullQName() {
return svcName + ":" + queueName;
}
- protected abstract boolean doProcessEvents(int sequenceId);
+ protected abstract int doProcessEvents(int sequenceId);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index 72816f2..4d56b03 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -78,7 +78,7 @@ public interface NotificationQueueService {
/**
*
* @param services
- * @return whether or not things were ready in the queue
+ * @return the number of processed notifications
*/
- public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
+ public int triggerManualQueueProcessing(final String [] services, final Boolean keepRunning);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 85d92c9..3f8f26f 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -87,9 +87,9 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
// Test ONLY
//
@Override
- public boolean triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
+ public int triggerManualQueueProcessing(final String [] services, final Boolean keepRunning) {
- boolean result = false;
+ int result = 0;
List<NotificationQueue> manualQueues = null;
if (services == null) {
@@ -107,13 +107,12 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
}
}
for (NotificationQueue cur : manualQueues) {
- boolean processedNotifications = true;
+ int processedNotifications = 0;
do {
processedNotifications = cur.processReadyNotification();
- if (result == false) {
- result = processedNotifications;
- }
- } while(keepRunning && processedNotifications);
+ log.info("Got {} results from queue {}", processedNotifications, cur.getFullQName());
+ result += processedNotifications;
+ } while(keepRunning && processedNotifications > 0);
}
return result;
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index c3eecc0..e96d2cf 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -59,9 +59,9 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
}
@Override
- protected boolean doProcessEvents(int sequenceId) {
+ protected int doProcessEvents(int sequenceId) {
- boolean result = false;
+ int result = 0;
List<Notification> processedNotifications = new ArrayList<Notification>();
List<Notification> oldNotifications = new ArrayList<Notification>();
@@ -76,7 +76,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
}
}
- result = readyNotifications.size() > 0;
+ result = readyNotifications.size();
for (Notification cur : readyNotifications) {
handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getUUID(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());