killbill-uncached
Changes
overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java 61(+44 -17)
overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java 26(+19 -7)
Details
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
index 4090fc6..946d8b6 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -16,6 +16,7 @@
package com.ning.billing.ovedue.notification;
+import java.util.ArrayList;
import java.util.List;
import org.joda.time.DateTime;
@@ -35,11 +36,12 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
import com.ning.billing.util.notificationq.Notification;
-import com.ning.billing.util.notificationq.NotificationKey;
import com.ning.billing.util.notificationq.NotificationQueue;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+import com.ning.billing.util.queue.PersistentQueueBase;
+import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
@@ -63,9 +65,6 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
try {
checkOverdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
- log.info("Queuing overdue check notification. id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
-
- final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(overdueable.getId(), Type.get(overdueable));
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@Override
@@ -73,10 +72,10 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
boolean shouldInsertNewNotification = true;
// Check if we already have notifications for that key
- final List<Notification> futureNotificationsForKey = checkOverdueQueue.getFutureNotificationsForKeyFromTransaction(entitySqlDaoWrapperFactory, notificationKey, context);
- if (futureNotificationsForKey.size() > 0) {
+ final List<Notification> futureNotifications = getFutureNotificationsForAccountAndOverdueableInTransaction(entitySqlDaoWrapperFactory, checkOverdueQueue, overdueable, context);
+ if (futureNotifications.size() > 0) {
// Results are ordered by effective date asc
- final DateTime earliestExistingNotificationDate = futureNotificationsForKey.get(0).getEffectiveDate();
+ final DateTime earliestExistingNotificationDate = futureNotifications.get(0).getEffectiveDate();
final int minIndexToDeleteFrom;
if (earliestExistingNotificationDate.isBefore(futureNotificationTime)) {
@@ -88,13 +87,17 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
minIndexToDeleteFrom = 0;
}
- for (int i = minIndexToDeleteFrom; i < futureNotificationsForKey.size(); i++) {
- checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationsForKey.get(i).getId(), context);
+ for (int i = minIndexToDeleteFrom; i < futureNotifications.size(); i++) {
+ checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotifications.get(i).getId(), context);
}
}
if (shouldInsertNewNotification) {
+ log.info("Queuing overdue check notification. Overdueable id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
+ final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(overdueable.getId(), Type.get(overdueable));
checkOverdueQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationTime, notificationKey, context);
+ } else {
+ log.info("Skipping queuing overdue check notification. Overdueable id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
}
return null;
@@ -107,20 +110,44 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
@Override
public void clearNotificationsFor(final Blockable overdueable, final InternalCallContext context) {
- final NotificationQueue checkOverdueQueue;
try {
- checkOverdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
- DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
- final NotificationKey key = new NotificationKey() {
+ final NotificationQueue checkOverdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
+ DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
+ transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+
@Override
- public String toString() {
- return overdueable.getId().toString();
+ public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+ final List<Notification> futureNotifications = getFutureNotificationsForAccountAndOverdueableInTransaction(entitySqlDaoWrapperFactory, checkOverdueQueue, overdueable, context);
+ for (final Notification notification : futureNotifications) {
+ checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory, notification.getId(), context);
+ }
+
+ return null;
}
- };
- checkOverdueQueue.removeNotificationsByKey(key, context);
+ });
} catch (NoSuchNotificationQueue e) {
log.error("Attempting to clear items from a non-existent queue (DefaultOverdueCheck).", e);
}
}
+ @VisibleForTesting
+ List<Notification> getFutureNotificationsForAccountAndOverdueableInTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory,
+ final NotificationQueue checkOverdueQueue,
+ final Blockable overdueable,
+ final InternalCallContext context) {
+ final List<Notification> notifications = new ArrayList<Notification>();
+
+ final List<Notification> candidates = checkOverdueQueue.getFutureNotificationsForAccountFromTransaction(entitySqlDaoWrapperFactory, context);
+ for (final Notification candidate : candidates) {
+ if (OverdueCheckNotificationKey.class.getName().equals(candidate.getNotificationKeyClass())) {
+ final OverdueCheckNotificationKey key = PersistentQueueBase.deserializeEvent(candidate.getNotificationKeyClass(), candidate.getNotificationKey());
+
+ if (Type.get(overdueable).equals(key.getType()) && overdueable.getId().equals(key.getUuidKey())) {
+ notifications.add(candidate);
+ }
+ }
+ }
+
+ return notifications;
+ }
}
diff --git a/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java b/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
index 4f4363d..d84cbb5 100644
--- a/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
+++ b/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
@@ -29,9 +29,12 @@ import org.testng.annotations.Test;
import com.ning.billing.entitlement.api.user.Subscription;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.junction.api.Blockable;
-import com.ning.billing.junction.api.Blockable.Type;
import com.ning.billing.overdue.OverdueTestSuiteWithEmbeddedDB;
import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.entity.dao.EntitySqlDao;
+import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
+import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
+import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
import com.ning.billing.util.jackson.ObjectMapper;
import com.ning.billing.util.notificationq.Notification;
import com.ning.billing.util.notificationq.NotificationQueue;
@@ -40,11 +43,14 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private EntitySqlDaoTransactionalJdbiWrapper entitySqlDaoTransactionalJdbiWrapper;
private NotificationQueue overdueQueue;
private DateTime testReferenceTime;
@BeforeMethod(groups = "slow")
public void setUp() throws Exception {
+ entitySqlDaoTransactionalJdbiWrapper = new EntitySqlDaoTransactionalJdbiWrapper(getDBI(), clock, cacheControllerDispatcher, nonEntityDao);
+
overdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
Assert.assertTrue(overdueQueue.isStarted());
@@ -71,11 +77,8 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
verifyQueueContent(otherOverdueable, 5, 5);
verifyQueueContent(otherOverdueable, 15, 5);
- // Verify the final content of the queue for each key
- final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(subscriptionId, Type.SUBSCRIPTION);
- Assert.assertEquals(overdueQueue.getFutureNotificationsForKey(notificationKey, internalCallContext).size(), 1);
- final OverdueCheckNotificationKey otherNotificationKey = new OverdueCheckNotificationKey(bundleId, Type.SUBSCRIPTION_BUNDLE);
- Assert.assertEquals(overdueQueue.getFutureNotificationsForKey(otherNotificationKey, internalCallContext).size(), 1);
+ // Verify the final content of the queue
+ Assert.assertEquals(overdueQueue.getFutureNotificationsForAccount(internalCallContext).size(), 2);
}
private void verifyQueueContent(final Blockable overdueable, final int nbDaysInFuture, final int expectedNbDaysInFuture) throws IOException {
@@ -83,9 +86,18 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
poster.insertOverdueCheckNotification(overdueable, futureNotificationTime, internalCallContext);
final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(overdueable.getId(), Blockable.Type.get(overdueable));
- final List<Notification> notificationsForKey = overdueQueue.getFutureNotificationsForKey(notificationKey, internalCallContext);
+ final List<Notification> notificationsForKey = getNotificationsForOverdueable(overdueable);
Assert.assertEquals(notificationsForKey.size(), 1);
Assert.assertEquals(notificationsForKey.get(0).getNotificationKey(), objectMapper.writeValueAsString(notificationKey));
Assert.assertEquals(notificationsForKey.get(0).getEffectiveDate(), testReferenceTime.plusDays(expectedNbDaysInFuture));
}
+
+ private List<Notification> getNotificationsForOverdueable(final Blockable overdueable) {
+ return entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<List<Notification>>() {
+ @Override
+ public List<Notification> inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+ return ((DefaultOverdueCheckPoster) poster).getFutureNotificationsForAccountAndOverdueableInTransaction(entitySqlDaoWrapperFactory, overdueQueue, overdueable, internalCallContext);
+ }
+ });
+ }
}
diff --git a/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteWithEmbeddedDB.java b/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteWithEmbeddedDB.java
index a3c4e12..b7ab466 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteWithEmbeddedDB.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/OverdueTestSuiteWithEmbeddedDB.java
@@ -30,7 +30,9 @@ import com.ning.billing.overdue.calculator.BillingStateCalculatorBundle;
import com.ning.billing.overdue.glue.TestOverdueModuleWithEmbeddedDB;
import com.ning.billing.overdue.service.DefaultOverdueService;
import com.ning.billing.overdue.wrapper.OverdueWrapperFactory;
+import com.ning.billing.util.cache.CacheControllerDispatcher;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.dao.NonEntityDao;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
@@ -58,6 +60,8 @@ public abstract class OverdueTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
@Inject
protected EntitlementInternalApi entitlementApi;
@Inject
+ protected CacheControllerDispatcher cacheControllerDispatcher;
+ @Inject
protected InternalBus bus;
@Inject
protected InternalCallContextFactory internalCallContextFactory;
@@ -80,6 +84,8 @@ public abstract class OverdueTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
@Inject
protected OverdueWrapperFactory overdueWrapperFactory;
@Inject
+ protected NonEntityDao nonEntityDao;
+ @Inject
protected TestOverdueHelper testOverdueHelper;
@BeforeClass(groups = "slow")
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 78c8a35..38a3086 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -32,7 +32,7 @@ import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
-import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.UseStringTemplate3StatementLocator;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import com.ning.billing.util.callcontext.InternalCallContext;
@@ -44,12 +44,9 @@ import com.ning.billing.util.notificationq.DefaultNotification;
import com.ning.billing.util.notificationq.Notification;
import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
-@ExternalizedSqlViaStringTemplate3()
+@UseStringTemplate3StatementLocator()
public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, CloseMe {
- //
- // APIs for event notifications
- //
@SqlQuery
@Mapper(NotificationSqlMapper.class)
public List<Notification> getReadyNotifications(@Bind("now") Date now,
@@ -63,10 +60,9 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
@SqlQuery
@Mapper(NotificationSqlMapper.class)
- public List<Notification> getFutureNotificationsForKey(@Bind("notificationKey") String key,
- @Bind("className") String className,
- @Bind("queueName") String queueName,
- @InternalTenantContextBinder final InternalTenantContext context);
+ public List<Notification> getFutureNotificationsForAccount(@Bind("now") Date now,
+ @Bind("queueName") String queueName,
+ @InternalTenantContextBinder final InternalTenantContext context);
@SqlUpdate
public void removeNotification(@Bind("id") String id,
@@ -85,10 +81,6 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
@InternalTenantContextBinder final InternalCallContext context);
@SqlUpdate
- public void removeNotificationsByKey(@Bind("notificationKey") String key,
- @InternalTenantContextBinder final InternalCallContext context);
-
- @SqlUpdate
public void insertNotification(@Bind(binder = NotificationSqlDaoBinder.class) Notification evt,
@InternalTenantContextBinder final InternalCallContext context);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index a414463..563939e 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -22,13 +22,10 @@ import java.util.UUID;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.IDBI;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.ning.billing.util.Hostname;
-import com.ning.billing.util.cache.CacheControllerDispatcher;
import com.ning.billing.util.callcontext.InternalCallContext;
-import com.ning.billing.util.dao.NonEntityDao;
+import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.entity.dao.EntitySqlDao;
import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
@@ -38,36 +35,28 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class DefaultNotificationQueue implements NotificationQueue {
- private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
-
private final NotificationSqlDao dao;
private final String hostname;
-
private final String svcName;
private final String queueName;
-
- private final ObjectMapper objectMapper;
-
private final NotificationQueueHandler handler;
-
private final NotificationQueueService notificationQueueService;
- private final NonEntityDao nonEntityDao;
- private final CacheControllerDispatcher cacheControllerDispatcher;
+ private final ObjectMapper objectMapper;
+ private final Clock clock;
private volatile boolean isStarted;
public DefaultNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler,
final IDBI dbi, final NotificationQueueService notificationQueueService,
- final NonEntityDao nonEntityDao, final CacheControllerDispatcher cacheControllerDispatcher) {
+ final Clock clock) {
this.svcName = svcName;
this.queueName = queueName;
this.handler = handler;
- this.nonEntityDao = nonEntityDao;
- this.cacheControllerDispatcher = cacheControllerDispatcher;
this.dao = dbi.onDemand(NotificationSqlDao.class);
this.hostname = Hostname.get();
this.notificationQueueService = notificationQueueService;
this.objectMapper = new ObjectMapper();
+ this.clock = clock;
}
@Override
@@ -99,31 +88,19 @@ public class DefaultNotificationQueue implements NotificationQueue {
}
@Override
- public void removeNotificationsByKey(final NotificationKey notificationKey, final InternalCallContext context) {
- // TODO Pierre Don't we want to check for the notification key class and queue name as well?
- dao.removeNotificationsByKey(notificationKey.toString(), context);
- }
-
- @Override
- public List<Notification> getFutureNotificationsForKey(final NotificationKey notificationKey, final InternalCallContext context) {
- return getFutureNotificationsForKeyInternal(dao, notificationKey, context);
+ public List<Notification> getFutureNotificationsForAccount(final InternalCallContext context) {
+ return getFutureNotificationsForAccountInternal(dao, context);
}
@Override
- public List<Notification> getFutureNotificationsForKeyFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
- final NotificationKey notificationKey, final InternalCallContext context) {
+ public List<Notification> getFutureNotificationsForAccountFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+ final InternalCallContext context) {
final NotificationSqlDao transactionalNotificationDao = transactionalDao.transmogrify(NotificationSqlDao.class);
- return getFutureNotificationsForKeyInternal(transactionalNotificationDao, notificationKey, context);
+ return getFutureNotificationsForAccountInternal(transactionalNotificationDao, context);
}
- private List<Notification> getFutureNotificationsForKeyInternal(final NotificationSqlDao transactionalDao,
- final NotificationKey notificationKey, final InternalCallContext context) {
- try {
- final String json = objectMapper.writeValueAsString(notificationKey);
- return transactionalDao.getFutureNotificationsForKey(json, notificationKey.getClass().getName(), getFullQName(), context);
- } catch (IOException e) {
- throw new IllegalArgumentException(e);
- }
+ private List<Notification> getFutureNotificationsForAccountInternal(final NotificationSqlDao transactionalDao, final InternalCallContext context) {
+ return transactionalDao.getFutureNotificationsForAccount(clock.getUTCNow().toDate(), getFullQName(), context);
}
@Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index d1a31db..88d17cd 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -18,34 +18,26 @@ package com.ning.billing.util.notificationq;
import org.skife.jdbi.v2.IDBI;
-import com.ning.billing.util.cache.CacheControllerDispatcher;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.dao.NonEntityDao;
import com.google.inject.Inject;
public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
private final IDBI dbi;
- private final NonEntityDao nonEntityDao;
- private final CacheControllerDispatcher cacheControllerDispatcher;
-
@Inject
public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final NotificationQueueConfig config,
- final InternalCallContextFactory internalCallContextFactory, final NonEntityDao nonEntityDao, final CacheControllerDispatcher cacheControllerDispatcher) {
+ final InternalCallContextFactory internalCallContextFactory) {
super(clock, config, dbi, internalCallContextFactory);
this.dbi = dbi;
- this.nonEntityDao = nonEntityDao;
- this.cacheControllerDispatcher = cacheControllerDispatcher;
}
-
@Override
protected NotificationQueue createNotificationQueueInternal(final String svcName,
final String queueName,
final NotificationQueueHandler handler) {
- return new DefaultNotificationQueue(svcName, queueName, handler, dbi, this, nonEntityDao, cacheControllerDispatcher);
+ return new DefaultNotificationQueue(svcName, queueName, handler, dbi, this, clock);
}
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index f009a6c..afb4cc5 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -55,33 +55,23 @@ public interface NotificationQueue extends QueueLifecycle {
throws IOException;
/**
- * Remove all notifications associated with this key
- */
- public void removeNotificationsByKey(final NotificationKey notificationKey,
- final InternalCallContext context);
-
- /**
- * Retrieve all future pending notifications for a given key
+ * Retrieve all future pending notifications for a given account (taken from the context)
* Results are ordered by effective date asc.
*
- * @param notificationKey notification key to look for
- * @param context internal call context
+ * @param context internal call context
* @return future notifications matching that key
*/
- public List<Notification> getFutureNotificationsForKey(final NotificationKey notificationKey,
- final InternalCallContext context);
+ public List<Notification> getFutureNotificationsForAccount(final InternalCallContext context);
/**
- * Retrieve all future pending notifications for a given key in a transaction.
+ * Retrieve all future pending notifications for a given account (taken from the context) in a transaction.
* Results are ordered by effective date asc.
*
- * @param notificationKey notification key to look for
- * @param context internal call context
+ * @param context internal call context
* @return future notifications matching that key
*/
- public List<Notification> getFutureNotificationsForKeyFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
- final NotificationKey notificationKey,
- final InternalCallContext context);
+ public List<Notification> getFutureNotificationsForAccountFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+ final InternalCallContext context);
public void removeNotification(final UUID notificationId,
final InternalCallContext context);
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index d3706bb..247cf8f 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -41,7 +41,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
private boolean isProcessingEvents;
private int curActiveThreads;
- protected final ObjectMapper objectMapper;
+ protected static final ObjectMapper objectMapper = new ObjectMapper();
private final AtomicBoolean isStarted = new AtomicBoolean(false);
@@ -54,7 +54,6 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
this.nbThreads = nbThreads;
this.svcQName = svcQName;
this.config = config;
- this.objectMapper = new ObjectMapper();
this.isProcessingEvents = false;
this.curActiveThreads = 0;
this.isProcessingSuspended = new AtomicBoolean(false);
@@ -188,8 +187,8 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
return isProcessingSuspended.get();
}
-
- protected <T> T deserializeEvent(final String className, final String json) {
+ // TODO PIERRE Better API?
+ public static <T> T deserializeEvent(final String className, final String json) {
try {
final Class<?> claz = Class.forName(className);
return (T) objectMapper.readValue(json, claz);
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 71482a8..3afc5e3 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -44,7 +44,7 @@ getPendingCountNotifications() ::= <<
;
>>
-getFutureNotificationsForKey() ::= <<
+getFutureNotificationsForAccount() ::= <<
select
record_id
, id
@@ -62,10 +62,10 @@ select
, account_record_id
, tenant_record_id
from notifications
-where notification_key = :notificationKey
-and class_name = :className
-and queue_name = :queueName
+where queue_name = :queueName
and processing_state != 'REMOVED'
+and account_record_id = :accountRecordId
+and effective_date >= :now
order by effective_date, record_id
;
>>
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index c62a365..4c5c817 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -88,33 +88,17 @@ public class MockNotificationQueue implements NotificationQueue {
}
@Override
- public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
- final List<Notification> toClearNotifications = new ArrayList<Notification>();
- for (final Notification notification : notifications) {
- if (notification.getNotificationKey().equals(key.toString())) {
- toClearNotifications.add(notification);
- }
- }
-
- synchronized (notifications) {
- if (toClearNotifications.size() > 0) {
- notifications.removeAll(toClearNotifications);
- }
- }
- }
-
- @Override
- public List<Notification> getFutureNotificationsForKey(final NotificationKey notificationKey, final InternalCallContext context) {
- return getFutureNotificationsForKeyFromTransaction(null, notificationKey, context);
+ public List<Notification> getFutureNotificationsForAccount(final InternalCallContext context) {
+ return getFutureNotificationsForAccountFromTransaction(null, context);
}
@Override
- public List<Notification> getFutureNotificationsForKeyFromTransaction(@Nullable final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
- final NotificationKey notificationKey, final InternalCallContext context) {
+ public List<Notification> getFutureNotificationsForAccountFromTransaction(@Nullable final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+ final InternalCallContext context) {
final List<Notification> result = new ArrayList<Notification>();
synchronized (notifications) {
for (final Notification notification : notifications) {
- if (notificationKey.toString().equals(notification.getNotificationKey()) && notification.getEffectiveDate().isAfter(clock.getUTCNow())) {
+ if (notification.getAccountRecordId().equals(context.getAccountRecordId()) && notification.getEffectiveDate().isAfter(clock.getUTCNow())) {
result.add(notification);
}
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 6bd9ed1..3018299 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
@@ -90,6 +89,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
super.setup();
entitySqlDaoTransactionalJdbiWrapper = new EntitySqlDaoTransactionalJdbiWrapper(getDBI(), clock, cacheControllerDispatcher, nonEntityDao);
}
+
@Override
@BeforeMethod(groups = "slow")
public void setupTest() throws Exception {
@@ -341,95 +341,4 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
Assert.assertTrue(expectedNotificationsFred.get(notificationKeyFred));
Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney));
}
-
- @Test(groups = "slow")
- public void testRemoveNotifications() throws Exception {
- final UUID key = UUID.randomUUID();
- final NotificationKey notificationKey = new TestNotificationKey(key.toString());
- final UUID key2 = UUID.randomUUID();
- final NotificationKey notificationKey2 = new TestNotificationKey(key2.toString());
-
- final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
- "remove",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
- if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
- log.info("Received notification with key: " + notificationKey);
- eventsReceived++;
- }
- }
- });
- queue.startQueue();
-
- final DateTime start = clock.getUTCNow().plusHours(1);
- final int nextReadyTimeIncrementMs = 1000;
-
- // add 3 events
-
- entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
- @Override
- public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
- queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(nextReadyTimeIncrementMs), notificationKey, internalCallContext);
- queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(2 * nextReadyTimeIncrementMs), notificationKey, internalCallContext);
- queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(3 * nextReadyTimeIncrementMs), notificationKey2, internalCallContext);
- return null;
- }
- });
-
- queue.removeNotificationsByKey(notificationKey, internalCallContext); // should remove 2 of the 3
-
- // Move time in the future after the notification effectiveDate
- ((ClockMock) clock).setDeltaFromReality(4000000 + nextReadyTimeIncrementMs * 3);
-
- try {
- await().atMost(10, TimeUnit.SECONDS).until(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return eventsReceived >= 2;
- }
- });
- Assert.fail("There should only have been only one event left in the queue we got: " + eventsReceived);
- } catch (Exception e) {
- // expected behavior
- }
- log.info("Received " + eventsReceived + " events");
- queue.stopQueue();
- }
-
- static NotificationQueueConfig getNotificationConfig(final boolean off, final long sleepTime) {
- return new NotificationQueueConfig() {
- @Override
- public boolean isProcessingOff() {
- return off;
- }
-
- @Override
- public int getPrefetchAmount() {
- return 10;
- }
-
- @Override
- public long getSleepTimeMs() {
- return sleepTime;
- }
- };
- }
-
- /*
- public static class TestNotificationQueueModule extends AbstractModule {
-
- @Override
- protected void configure() {
- bind(Clock.class).to(ClockMock.class).asEagerSingleton();
-
- final IDBI dbi = getDBI();
- bind(IDBI.class).toInstance(dbi);
- final IDBI otherDbi = getDBI();
- bind(IDBI.class).annotatedWith(Names.named("global-lock")).toInstance(otherDbi);
- bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
- bind(NotificationQueueConfig.class).toInstance(getNotificationConfig(false, 100));
- }
- }
- */
}