killbill-uncached
Changes
overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java 60(+52 -8)
overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java 91(+91 -0)
Details
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
index b2e91c5..4090fc6 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -16,15 +16,25 @@
package com.ning.billing.ovedue.notification;
-import java.io.IOException;
+import java.util.List;
import org.joda.time.DateTime;
+import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ning.billing.junction.api.Blockable;
+import com.ning.billing.junction.api.Blockable.Type;
import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.cache.CacheControllerDispatcher;
import com.ning.billing.util.callcontext.InternalCallContext;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.dao.NonEntityDao;
+import com.ning.billing.util.entity.dao.EntitySqlDao;
+import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
+import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
+import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import com.ning.billing.util.notificationq.Notification;
import com.ning.billing.util.notificationq.NotificationKey;
import com.ning.billing.util.notificationq.NotificationQueue;
import com.ning.billing.util.notificationq.NotificationQueueService;
@@ -33,15 +43,18 @@ import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotifi
import com.google.inject.Inject;
public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
+
private static final Logger log = LoggerFactory.getLogger(DefaultOverdueCheckNotifier.class);
private final NotificationQueueService notificationQueueService;
+ private final EntitySqlDaoTransactionalJdbiWrapper transactionalSqlDao;
@Inject
- public DefaultOverdueCheckPoster(
- final NotificationQueueService notificationQueueService) {
- super();
+ public DefaultOverdueCheckPoster(final NotificationQueueService notificationQueueService,
+ final IDBI dbi, final Clock clock,
+ final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
this.notificationQueueService = notificationQueueService;
+ this.transactionalSqlDao = new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, cacheControllerDispatcher, nonEntityDao);
}
@Override
@@ -52,15 +65,46 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
log.info("Queuing overdue check notification. id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
- checkOverdueQueue.recordFutureNotification(futureNotificationTime, new OverdueCheckNotificationKey(overdueable.getId(), Blockable.Type.get(overdueable)), context);
+ final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(overdueable.getId(), Type.get(overdueable));
+
+ transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+ @Override
+ public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+ boolean shouldInsertNewNotification = true;
+
+ // Check if we already have notifications for that key
+ final List<Notification> futureNotificationsForKey = checkOverdueQueue.getFutureNotificationsForKeyFromTransaction(entitySqlDaoWrapperFactory, notificationKey, context);
+ if (futureNotificationsForKey.size() > 0) {
+ // Results are ordered by effective date asc
+ final DateTime earliestExistingNotificationDate = futureNotificationsForKey.get(0).getEffectiveDate();
+
+ final int minIndexToDeleteFrom;
+ if (earliestExistingNotificationDate.isBefore(futureNotificationTime)) {
+ // We don't have to insert a new one. For sanity, delete any other future notification
+ minIndexToDeleteFrom = 1;
+ shouldInsertNewNotification = false;
+ } else {
+ // We win - we are before any other already recorded. Delete all others.
+ minIndexToDeleteFrom = 0;
+ }
+
+ for (int i = minIndexToDeleteFrom; i < futureNotificationsForKey.size(); i++) {
+ checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationsForKey.get(i).getId(), context);
+ }
+ }
+
+ if (shouldInsertNewNotification) {
+ checkOverdueQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationTime, notificationKey, context);
+ }
+
+ return null;
+ }
+ });
} catch (NoSuchNotificationQueue e) {
log.error("Attempting to put items on a non-existent queue (DefaultOverdueCheck).", e);
- } catch (IOException e) {
- log.error("Failed to serialize notifcationKey for {}", overdueable.toString());
}
}
-
@Override
public void clearNotificationsFor(final Blockable overdueable, final InternalCallContext context) {
final NotificationQueue checkOverdueQueue;
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java
index 1c0d441..9768a62 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java
@@ -26,5 +26,4 @@ public interface OverdueCheckPoster {
void insertOverdueCheckNotification(Blockable blockable, DateTime futureNotificationTime, final InternalCallContext context);
void clearNotificationsFor(Blockable blockable, final InternalCallContext context);
-
}
diff --git a/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java b/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
new file mode 100644
index 0000000..4f4363d
--- /dev/null
+++ b/overdue/src/test/java/com/ning/billing/ovedue/notification/TestDefaultOverdueCheckPoster.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2010-2013 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.ovedue.notification;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBundle;
+import com.ning.billing.junction.api.Blockable;
+import com.ning.billing.junction.api.Blockable.Type;
+import com.ning.billing.overdue.OverdueTestSuiteWithEmbeddedDB;
+import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.jackson.ObjectMapper;
+import com.ning.billing.util.notificationq.Notification;
+import com.ning.billing.util.notificationq.NotificationQueue;
+
+public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedDB {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ private NotificationQueue overdueQueue;
+ private DateTime testReferenceTime;
+
+ @BeforeMethod(groups = "slow")
+ public void setUp() throws Exception {
+ overdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
+ DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
+ Assert.assertTrue(overdueQueue.isStarted());
+
+ testReferenceTime = clock.getUTCNow();
+ }
+
+ @Test(groups = "slow")
+ public void testShouldntInsertMultipleNotificationsPerOverdueable() throws Exception {
+ final UUID subscriptionId = UUID.randomUUID();
+ final Blockable overdueable = Mockito.mock(Subscription.class);
+ Mockito.when(overdueable.getId()).thenReturn(subscriptionId);
+
+ verifyQueueContent(overdueable, 10, 10);
+ verifyQueueContent(overdueable, 5, 5);
+ verifyQueueContent(overdueable, 15, 5);
+
+ // Check we don't conflict with other overdueables
+ final UUID bundleId = UUID.randomUUID();
+ final Blockable otherOverdueable = Mockito.mock(SubscriptionBundle.class);
+ Mockito.when(otherOverdueable.getId()).thenReturn(bundleId);
+
+ verifyQueueContent(otherOverdueable, 10, 10);
+ verifyQueueContent(otherOverdueable, 5, 5);
+ verifyQueueContent(otherOverdueable, 15, 5);
+
+ // Verify the final content of the queue for each key
+ final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(subscriptionId, Type.SUBSCRIPTION);
+ Assert.assertEquals(overdueQueue.getFutureNotificationsForKey(notificationKey, internalCallContext).size(), 1);
+ final OverdueCheckNotificationKey otherNotificationKey = new OverdueCheckNotificationKey(bundleId, Type.SUBSCRIPTION_BUNDLE);
+ Assert.assertEquals(overdueQueue.getFutureNotificationsForKey(otherNotificationKey, internalCallContext).size(), 1);
+ }
+
+ private void verifyQueueContent(final Blockable overdueable, final int nbDaysInFuture, final int expectedNbDaysInFuture) throws IOException {
+ final DateTime futureNotificationTime = testReferenceTime.plusDays(nbDaysInFuture);
+ poster.insertOverdueCheckNotification(overdueable, futureNotificationTime, internalCallContext);
+
+ final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(overdueable.getId(), Blockable.Type.get(overdueable));
+ final List<Notification> notificationsForKey = overdueQueue.getFutureNotificationsForKey(notificationKey, internalCallContext);
+ Assert.assertEquals(notificationsForKey.size(), 1);
+ Assert.assertEquals(notificationsForKey.get(0).getNotificationKey(), objectMapper.writeValueAsString(notificationKey));
+ Assert.assertEquals(notificationsForKey.get(0).getEffectiveDate(), testReferenceTime.plusDays(expectedNbDaysInFuture));
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 4e59096..78c8a35 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -63,9 +63,10 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
@SqlQuery
@Mapper(NotificationSqlMapper.class)
- public List<Notification> getNotificationForAccountAndDate(@Bind("accountRecordId") final long accountRecordId,
- @Bind("effectiveDate") final Date effectiveDate,
- @InternalTenantContextBinder final InternalTenantContext context);
+ public List<Notification> getFutureNotificationsForKey(@Bind("notificationKey") String key,
+ @Bind("className") String className,
+ @Bind("queueName") String queueName,
+ @InternalTenantContextBinder final InternalTenantContext context);
@SqlUpdate
public void removeNotification(@Bind("id") String id,
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 208a211..a414463 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -18,20 +18,14 @@ package com.ning.billing.util.notificationq;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import org.joda.time.DateTime;
-import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.tweak.HandleCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.ning.billing.ObjectType;
import com.ning.billing.util.Hostname;
-
-import com.ning.billing.util.cache.Cachable.CacheType;
import com.ning.billing.util.cache.CacheControllerDispatcher;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.dao.NonEntityDao;
@@ -41,13 +35,11 @@ import com.ning.billing.util.notificationq.NotificationQueueService.Notification
import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
public class DefaultNotificationQueue implements NotificationQueue {
private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
- private final IDBI dbi;
private final NotificationSqlDao dao;
private final String hostname;
@@ -62,7 +54,6 @@ public class DefaultNotificationQueue implements NotificationQueue {
private final NonEntityDao nonEntityDao;
private final CacheControllerDispatcher cacheControllerDispatcher;
-
private volatile boolean isStarted;
public DefaultNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler,
@@ -71,7 +62,6 @@ public class DefaultNotificationQueue implements NotificationQueue {
this.svcName = svcName;
this.queueName = queueName;
this.handler = handler;
- this.dbi = dbi;
this.nonEntityDao = nonEntityDao;
this.cacheControllerDispatcher = cacheControllerDispatcher;
this.dao = dbi.onDemand(NotificationSqlDao.class);
@@ -80,7 +70,6 @@ public class DefaultNotificationQueue implements NotificationQueue {
this.objectMapper = new ObjectMapper();
}
-
@Override
public void recordFutureNotification(final DateTime futureNotificationTime,
final NotificationKey notificationKey,
@@ -109,20 +98,31 @@ public class DefaultNotificationQueue implements NotificationQueue {
thisDao.insertNotification(notification, context);
}
-
-
@Override
public void removeNotificationsByKey(final NotificationKey notificationKey, final InternalCallContext context) {
+ // TODO Pierre Don't we want to check for the notification key class and queue name as well?
dao.removeNotificationsByKey(notificationKey.toString(), context);
}
@Override
- public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
- final Long accountRecordId = nonEntityDao.retrieveRecordIdFromObject(accountId, ObjectType.ACCOUNT, cacheControllerDispatcher.getCacheController(CacheType.RECORD_ID));
- if (accountId == null) {
- return ImmutableList.<Notification>of();
- } else {
- return dao.getNotificationForAccountAndDate(accountRecordId, effectiveDate.toDate(), context);
+ public List<Notification> getFutureNotificationsForKey(final NotificationKey notificationKey, final InternalCallContext context) {
+ return getFutureNotificationsForKeyInternal(dao, notificationKey, context);
+ }
+
+ @Override
+ public List<Notification> getFutureNotificationsForKeyFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+ final NotificationKey notificationKey, final InternalCallContext context) {
+ final NotificationSqlDao transactionalNotificationDao = transactionalDao.transmogrify(NotificationSqlDao.class);
+ return getFutureNotificationsForKeyInternal(transactionalNotificationDao, notificationKey, context);
+ }
+
+ private List<Notification> getFutureNotificationsForKeyInternal(final NotificationSqlDao transactionalDao,
+ final NotificationKey notificationKey, final InternalCallContext context) {
+ try {
+ final String json = objectMapper.writeValueAsString(notificationKey);
+ return transactionalDao.getFutureNotificationsForKey(json, notificationKey.getClass().getName(), getFullQName(), context);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
}
}
@@ -132,6 +132,12 @@ public class DefaultNotificationQueue implements NotificationQueue {
}
@Override
+ public void removeNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao, final UUID notificationId, final InternalCallContext context) {
+ final NotificationSqlDao transactionalNotificationDao = transactionalDao.transmogrify(NotificationSqlDao.class);
+ transactionalNotificationDao.removeNotification(notificationId.toString(), context);
+ }
+
+ @Override
public String getFullQName() {
return NotificationQueueServiceBase.getCompositeName(svcName, queueName);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 58b49c4..f009a6c 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -60,13 +60,35 @@ public interface NotificationQueue extends QueueLifecycle {
public void removeNotificationsByKey(final NotificationKey notificationKey,
final InternalCallContext context);
- public List<Notification> getNotificationForAccountAndDate(final UUID accountId,
- final DateTime effectiveDate,
- final InternalCallContext context);
+ /**
+ * Retrieve all future pending notifications for a given key
+ * Results are ordered by effective date asc.
+ *
+ * @param notificationKey notification key to look for
+ * @param context internal call context
+ * @return future notifications matching that key
+ */
+ public List<Notification> getFutureNotificationsForKey(final NotificationKey notificationKey,
+ final InternalCallContext context);
+
+ /**
+ * Retrieve all future pending notifications for a given key in a transaction.
+ * Results are ordered by effective date asc.
+ *
+ * @param notificationKey notification key to look for
+ * @param context internal call context
+ * @return future notifications matching that key
+ */
+ public List<Notification> getFutureNotificationsForKeyFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+ final NotificationKey notificationKey,
+ final InternalCallContext context);
public void removeNotification(final UUID notificationId,
final InternalCallContext context);
+ public void removeNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+ final UUID notificationId,
+ final InternalCallContext context);
/**
* @return the name of that queue
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index d2266b9..71482a8 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -44,27 +44,30 @@ getPendingCountNotifications() ::= <<
;
>>
-getNotificationForAccountAndDate() ::= <<
- select
- record_id
- , id
- , class_name
- , notification_key
- , user_token
- , future_user_token
- , created_date
- , creating_owner
- , effective_date
- , queue_name
- , processing_owner
- , processing_available_date
- , processing_state
- , account_record_id
- , tenant_record_id
- from notifications
- where
- account_record_id = :accountRecordId AND effective_date = :effectiveDate
- ;
+getFutureNotificationsForKey() ::= <<
+select
+ record_id
+ , id
+ , class_name
+ , notification_key
+ , user_token
+ , future_user_token
+ , created_date
+ , creating_owner
+ , effective_date
+ , queue_name
+ , processing_owner
+ , processing_available_date
+ , processing_state
+ , account_record_id
+ , tenant_record_id
+from notifications
+where notification_key = :notificationKey
+and class_name = :className
+and queue_name = :queueName
+and processing_state != 'REMOVED'
+order by effective_date, record_id
+;
>>
removeNotification() ::= <<
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 1ef9b84..7bab177 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -25,7 +25,6 @@ import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
@@ -96,33 +95,6 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
validateDate(notification.getNextAvailableDate(), nextAvailable);
}
- @Test(groups = "slow")
- public void testGetByAccountAndDate() throws InterruptedException {
- final long accountRecordId = 1242L;
- final String notificationKey = UUID.randomUUID().toString();
- final DateTime effDt = new DateTime();
- final Notification notif1 = new DefaultNotification("testBasic1", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
- accountRecordId, internalCallContext.getTenantRecordId());
- dao.insertNotification(notif1, internalCallContext);
-
- final Notification notif2 = new DefaultNotification("testBasic2", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
- accountRecordId, internalCallContext.getTenantRecordId());
- dao.insertNotification(notif2, internalCallContext);
-
- List<Notification> notifications = dao.getNotificationForAccountAndDate(accountRecordId, effDt.toDate(), internalCallContext);
- assertEquals(notifications.size(), 2);
- for (final Notification cur : notifications) {
- Assert.assertEquals(cur.getProcessingState(), PersistentQueueEntryLifecycleState.AVAILABLE);
- dao.removeNotification(cur.getId().toString(), internalCallContext);
- }
-
- notifications = dao.getNotificationForAccountAndDate(accountRecordId, effDt.toDate(), internalCallContext);
- assertEquals(notifications.size(), 2);
- for (final Notification cur : notifications) {
- Assert.assertEquals(cur.getProcessingState(), PersistentQueueEntryLifecycleState.REMOVED);
- }
- }
-
private Notification fetchNotification(final String notificationId) {
return getDBI().withHandle(new HandleCallback<Notification>() {
@Override
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index ac267a7..c62a365 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.TreeSet;
import java.util.UUID;
+import javax.annotation.Nullable;
+
import org.joda.time.DateTime;
import com.ning.billing.util.Hostname;
@@ -85,7 +87,6 @@ public class MockNotificationQueue implements NotificationQueue {
recordFutureNotification(futureNotificationTime, notificationKey, context);
}
-
@Override
public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
final List<Notification> toClearNotifications = new ArrayList<Notification>();
@@ -103,33 +104,40 @@ public class MockNotificationQueue implements NotificationQueue {
}
@Override
- public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
- /*
+ public List<Notification> getFutureNotificationsForKey(final NotificationKey notificationKey, final InternalCallContext context) {
+ return getFutureNotificationsForKeyFromTransaction(null, notificationKey, context);
+ }
+
+ @Override
+ public List<Notification> getFutureNotificationsForKeyFromTransaction(@Nullable final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
+ final NotificationKey notificationKey, final InternalCallContext context) {
final List<Notification> result = new ArrayList<Notification>();
synchronized (notifications) {
- for (Notification cur : notifications) {
- if (cur.getAccountId().equals(accountId) || cur.getEffectiveDate().compareTo(effectiveDate) == 0) {
- result.add(cur);
+ for (final Notification notification : notifications) {
+ if (notificationKey.toString().equals(notification.getNotificationKey()) && notification.getEffectiveDate().isAfter(clock.getUTCNow())) {
+ result.add(notification);
}
}
}
+
return result;
- */
- return null;
}
-
@Override
public void removeNotification(final UUID notificationId, final InternalCallContext context) {
+ removeNotificationFromTransaction(null, notificationId, context);
+ }
+
+ @Override
+ public void removeNotificationFromTransaction(@Nullable final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao, final UUID notificationId, final InternalCallContext context) {
synchronized (notifications) {
- for (Notification cur : notifications) {
+ for (final Notification cur : notifications) {
if (cur.getId().equals(notificationId)) {
notifications.remove(cur);
break;
}
}
}
-
}
@Override
@@ -169,12 +177,7 @@ public class MockNotificationQueue implements NotificationQueue {
return isStarted;
}
-
public List<Notification> getReadyNotifications() {
- final int result;
- final List<Notification> processedNotifications = new ArrayList<Notification>();
- final List<Notification> oldNotifications = new ArrayList<Notification>();
-
final List<Notification> readyNotifications = new ArrayList<Notification>();
synchronized (notifications) {
for (final Notification cur : notifications) {