killbill-memoizeit

Changes

Details

diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
index 24c7cca..ab0cd11 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
@@ -625,6 +625,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
         });
     }
 
+    @Override
     public void repair(final UUID accountId, final UUID bundleId, final List<SubscriptionDataRepair> inRepair, final CallContext context) {
         subscriptionsDao.inTransaction(new Transaction<Void, SubscriptionSqlDao>() {
             @Override
@@ -673,7 +674,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
         try {
             final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
                                                                                                            Engine.NOTIFICATION_QUEUE_NAME);
-            subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
+            subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, null, notificationKey);
         } catch (NoSuchNotificationQueue e) {
             throw new RuntimeException(e);
         } catch (IOException e) {
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index 06032f7..c1a2793 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -411,11 +411,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
         try {
             final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
                                                                                                      Engine.NOTIFICATION_QUEUE_NAME);
-            subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
+            subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, null, notificationKey);
         } catch (NoSuchNotificationQueue e) {
             throw new RuntimeException(e);
         } catch (IOException e) {
-            throw new RuntimeException(e);            
+            throw new RuntimeException(e);
         }
     }
 
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index 75ef1aa..11dc976 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -160,8 +160,7 @@ public class DefaultInvoiceDao implements InvoiceDao {
                     audits.addAll(createAudits(TableName.INVOICE_ITEMS, recordIdList));
 
                     final List<InvoiceItem> recurringInvoiceItems = invoice.getInvoiceItems(RecurringInvoiceItem.class);
-
-                    notifyOfFutureBillingEvents(transactional, invoice, recurringInvoiceItems, billCycleDay);
+                    notifyOfFutureBillingEvents(transactional, invoice.getAccountId(), billCycleDay, recurringInvoiceItems);
 
                     final List<InvoicePayment> invoicePayments = invoice.getPayments();
                     final InvoicePaymentSqlDao invoicePaymentSqlDao = transactional.become(InvoicePaymentSqlDao.class);
@@ -507,7 +506,8 @@ public class DefaultInvoiceDao implements InvoiceDao {
         invoice.addPayments(invoicePayments);
     }
 
-    private void notifyOfFutureBillingEvents(final InvoiceSqlDao dao, final Invoice invoice, final List<InvoiceItem> invoiceItems, final int billCycleDay) {
+
+    private void notifyOfFutureBillingEvents(final InvoiceSqlDao dao, final UUID accountId, final int billCycleDay, final List<InvoiceItem> invoiceItems) {
         DateTime nextBCD = null;
         UUID subscriptionForNextBCD = null;
         for (final InvoiceItem item : invoiceItems) {
@@ -523,7 +523,6 @@ public class DefaultInvoiceDao implements InvoiceDao {
                 }
             }
         }
-
         // We need to be notified if and only if the maximum end date of the invoiced recurring items is equal
         // to the next bill cycle day.
         // We take the maximum because we're guaranteed to have invoiced all subscriptions up until that date
@@ -531,8 +530,7 @@ public class DefaultInvoiceDao implements InvoiceDao {
         // Also, we only need to get notified on the BDC. For other invoice events (e.g. phase changes),
         // we'll be notified by entitlement.
         if (subscriptionForNextBCD != null && nextBCD != null && nextBCD.getDayOfMonth() == billCycleDay) {
-            final UUID accountId = invoice.getAccountId();
-            nextBillingDatePoster.insertNextBillingNotification(dao, subscriptionForNextBCD, nextBCD);
+            nextBillingDatePoster.insertNextBillingNotification(dao, accountId, subscriptionForNextBCD, nextBCD);
         }
     }
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
index f2521de..5428580 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
@@ -45,18 +45,18 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
     }
 
     @Override
-    public void insertNextBillingNotification(final Transmogrifier transactionalDao, final UUID subscriptionId, final DateTime futureNotificationTime) {
+    public void insertNextBillingNotification(final Transmogrifier transactionalDao, final UUID acountId, final UUID subscriptionId, final DateTime futureNotificationTime) {
         final NotificationQueue nextBillingQueue;
         try {
             nextBillingQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
                                                                              DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
             log.info("Queuing next billing date notification. id: {}, timestamp: {}", subscriptionId.toString(), futureNotificationTime.toString());
 
-            nextBillingQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NextBillingDateNotificationKey(subscriptionId));
+            nextBillingQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, acountId, new NextBillingDateNotificationKey(subscriptionId));
         } catch (NoSuchNotificationQueue e) {
             log.error("Attempting to put items on a non-existent queue (NextBillingDateNotifier).", e);
         } catch (IOException e) {
-            log.error("Failed to serialize notficationKey for subscriptionId {}", subscriptionId);            
+            log.error("Failed to serialize notficationKey for subscriptionId {}", subscriptionId);
         }
     }
 }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
index 279c61a..60f8ec4 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
@@ -23,7 +23,7 @@ import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 public interface NextBillingDatePoster {
 
-    void insertNextBillingNotification(Transmogrifier transactionalDao,
+    void insertNextBillingNotification(Transmogrifier transactionalDao, UUID accountId,
                                        UUID subscriptionId, DateTime futureNotificationTime);
 
 }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java b/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java
index 1079a2f..5742247 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java
@@ -23,7 +23,7 @@ import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 public class MockNextBillingDatePoster implements NextBillingDatePoster {
     @Override
-    public void insertNextBillingNotification(final Transmogrifier transactionalDao, final UUID subscriptionId, final DateTime futureNotificationTime) {
+    public void insertNextBillingNotification(final Transmogrifier transactionalDao, final UUID accountId, final UUID subscriptionId, final DateTime futureNotificationTime) {
         // do nothing
     }
 }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
index c4bcc5a..e2ac16c 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
@@ -112,6 +112,7 @@ public class TestNextBillingDateNotifier {
         //TestApiBase.loadSystemPropertiesFromClasspath("/entitlement.properties");
         final Injector g = Guice.createInjector(Stage.PRODUCTION, new AbstractModule() {
 
+            @Override
             protected void configure() {
                 install(new MockClockModule());
                 install(new BusModule(BusType.MEMORY));
@@ -164,6 +165,8 @@ public class TestNextBillingDateNotifier {
 
     @Test(enabled = true, groups = "slow")
     public void testInvoiceNotifier() throws Exception {
+
+        final UUID accountId = UUID.randomUUID();
         final UUID subscriptionId = new UUID(0L, 1L);
         final DateTime now = new DateTime();
         final DateTime readyTime = now.plusMillis(2000);
@@ -178,7 +181,7 @@ public class TestNextBillingDateNotifier {
             public Void inTransaction(final DummySqlTest transactional,
                                       final TransactionStatus status) throws Exception {
 
-                poster.insertNextBillingNotification(transactional, subscriptionId, readyTime);
+                poster.insertNextBillingNotification(transactional, accountId, subscriptionId, readyTime);
                 return null;
             }
         });
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
index 79cfdf9..ad11522 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -50,11 +50,11 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
                                                                               DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
             log.info("Queuing overdue check notification. id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
 
-            checkOverdueQueue.recordFutureNotification(futureNotificationTime, new OverdueCheckNotificationKey(overdueable.getId()));
+            checkOverdueQueue.recordFutureNotification(futureNotificationTime, null, new OverdueCheckNotificationKey(overdueable.getId()));
         } catch (NoSuchNotificationQueue e) {
             log.error("Attempting to put items on a non-existent queue (DefaultOverdueCheck).", e);
         } catch (IOException e) {
-            log.error("Failed to serialize notifcationKey for {}", overdueable.toString());            
+            log.error("Failed to serialize notifcationKey for {}", overdueable.toString());
         }
     }
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
index 292583e..cd48c67 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Copyright 2010-2011 Ning, Inc.
  *
  * Ning licenses this file to you under the Apache License, version 2.0
@@ -66,7 +66,7 @@ public abstract class BaseRetryService implements RetryService {
         },
         config);
     }
-    
+
     @Override
     public void start() {
         retryQueue.startQueue();
@@ -80,6 +80,7 @@ public abstract class BaseRetryService implements RetryService {
         }
     }
 
+    @Override
     public abstract String getQueueName();
 
 
@@ -125,9 +126,9 @@ public abstract class BaseRetryService implements RetryService {
                 final NotificationKey key = new PaymentRetryNotificationKey(paymentId);
                 if (retryQueue != null) {
                     if (transactionalDao == null) {
-                        retryQueue.recordFutureNotification(timeOfRetry, key);
+                        retryQueue.recordFutureNotification(timeOfRetry, null, key);
                     } else {
-                        retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, key);
+                        retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, null, key);
                     }
                 }
             } catch (NoSuchNotificationQueue e) {
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index dc3ee4d..0e56efd 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -51,6 +51,13 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
     @Mapper(NotificationSqlMapper.class)
     public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("owner") String owner, @Bind("max") int max, @Bind("queueName") String queueName);
 
+    @SqlQuery
+    @Mapper(NotificationSqlMapper.class)
+    public List<Notification> getNotificationForAccountAndDate(@Bind("accountId") final String accountId, @Bind("effectiveDate") final Date effectiveDate);
+
+    @SqlUpdate
+    public void removeNotification(@Bind("id") String id);
+
     @SqlUpdate
     public int claimNotification(@Bind("owner") String owner, @Bind("nextAvailable") Date nextAvailable,
                                  @Bind("id") String id, @Bind("now") Date now);
@@ -73,7 +80,8 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             stmt.bind("id", evt.getId().toString());
             stmt.bind("createdDate", getDate(new DateTime()));
             stmt.bind("creatingOwner", evt.getCreatedOwner());
-            stmt.bind("className", evt.getNotificationKeyClass());            
+            stmt.bind("className", evt.getNotificationKeyClass());
+            stmt.bind("accountId", evt.getAccountId() != null ? evt.getAccountId().toString() : null);
             stmt.bind("notificationKey", evt.getNotificationKey());
             stmt.bind("effectiveDate", getDate(evt.getEffectiveDate()));
             stmt.bind("queueName", evt.getQueueName());
@@ -92,8 +100,9 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             final Long ordering = r.getLong("record_id");
             final UUID id = getUUID(r, "id");
             final String createdOwner = r.getString("creating_owner");
-            final String className = r.getString("class_name");            
+            final String className = r.getString("class_name");
             final String notificationKey = r.getString("notification_key");
+            final UUID accountId = getUUID(r, "account_id");
             final String queueName = r.getString("queue_name");
             final DateTime effectiveDate = getDate(r, "effective_date");
             final DateTime nextAvailableDate = getDate(r, "processing_available_date");
@@ -101,7 +110,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             final PersistentQueueEntryLifecycleState processingState = PersistentQueueEntryLifecycleState.valueOf(r.getString("processing_state"));
 
             return new DefaultNotification(ordering, id, createdOwner, processingOwner, queueName, nextAvailableDate,
-                                           processingState, className, notificationKey, effectiveDate);
+                                           processingState, className, notificationKey, accountId, effectiveDate);
 
         }
     }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 1d74210..70c49c4 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -32,11 +32,12 @@ public class DefaultNotification extends EntityBase implements Notification {
     private final String notificationKeyClass;
     private final String notificationKey;
     private final DateTime effectiveDate;
+    private final UUID accountId;
 
 
     public DefaultNotification(final long ordering, final UUID id, final String createdOwner, final String owner, final String queueName, final DateTime nextAvailableDate,
                                final PersistentQueueEntryLifecycleState lifecycleState,
-                               final String notificationKeyClass, final String notificationKey, final DateTime effectiveDate) {
+                               final String notificationKeyClass, final String notificationKey, final UUID accountId, final DateTime effectiveDate) {
         super(id);
         this.ordering = ordering;
         this.owner = owner;
@@ -46,11 +47,12 @@ public class DefaultNotification extends EntityBase implements Notification {
         this.lifecycleState = lifecycleState;
         this.notificationKeyClass = notificationKeyClass;
         this.notificationKey = notificationKey;
+        this.accountId = accountId;
         this.effectiveDate = effectiveDate;
     }
 
-    public DefaultNotification(final String queueName, final String createdOwner, final String notificationKeyClass, final String notificationKey, final DateTime effectiveDate) {
-        this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE, notificationKeyClass, notificationKey, effectiveDate);
+    public DefaultNotification(final String queueName, final String createdOwner, final String notificationKeyClass, final String notificationKey, final UUID accountId, final DateTime effectiveDate) {
+        this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE, notificationKeyClass, notificationKey, accountId, effectiveDate);
     }
 
     @Override
@@ -97,7 +99,7 @@ public class DefaultNotification extends EntityBase implements Notification {
         return notificationKeyClass;
     }
 
-    
+
     @Override
     public String getNotificationKey() {
         return notificationKey;
@@ -117,4 +119,9 @@ public class DefaultNotification extends EntityBase implements Notification {
     public String getCreatedOwner() {
         return createdOwner;
     }
+
+    @Override
+    public UUID getAccountId() {
+        return accountId;
+    }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index c89f908..246b187 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
@@ -69,23 +70,26 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     }
 
     @Override
-    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
-        recordFutureNotificationInternal(futureNotificationTime, notificationKey, dao);
+    public void recordFutureNotification(final DateTime futureNotificationTime, final UUID accountId, final NotificationKey notificationKey) throws IOException {
+        recordFutureNotificationInternal(futureNotificationTime, accountId, notificationKey, dao);
     }
 
     @Override
     public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
                                                         final DateTime futureNotificationTime,
+                                                        final UUID accountId,
                                                         final NotificationKey notificationKey) throws IOException {
         final NotificationSqlDao transactionalNotificationDao = transactionalDao.become(NotificationSqlDao.class);
-        recordFutureNotificationInternal(futureNotificationTime, notificationKey, transactionalNotificationDao);
+        recordFutureNotificationInternal(futureNotificationTime, accountId, notificationKey, transactionalNotificationDao);
     }
 
     private void recordFutureNotificationInternal(final DateTime futureNotificationTime,
-                                                  final NotificationKey notificationKey,
+            final UUID accountId,
+            final NotificationKey notificationKey,
+
                                                   final NotificationSqlDao thisDao) throws IOException {
         final String json = objectMapper.writeValueAsString(notificationKey);
-        final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json, futureNotificationTime);
+        final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json, accountId, futureNotificationTime);
         thisDao.insertNotification(notification);
     }
 
@@ -133,4 +137,14 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     public void removeNotificationsByKey(final NotificationKey notificationKey) {
         dao.removeNotificationsByKey(notificationKey.toString());
     }
+
+    @Override
+    public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate) {
+        return dao.getNotificationForAccountAndDate(accountId.toString(), effectiveDate.toDate());
+    }
+
+    @Override
+    public void removeNotification(UUID notificationId) {
+        dao.removeNotification(notificationId.toString());
+    }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index 2ecea54..8e578c9 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.util.notificationq;
 
+import java.util.UUID;
+
 import org.joda.time.DateTime;
 
 import com.ning.billing.util.entity.Entity;
@@ -25,10 +27,12 @@ public interface Notification extends PersistentQueueEntryLifecycle, Entity {
     public Long getOrdering();
 
     public String getNotificationKeyClass();
-    
+
     public String getNotificationKey();
 
     public DateTime getEffectiveDate();
 
     public String getQueueName();
+
+    public UUID getAccountId();
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 678da46..42cacea 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -17,6 +17,8 @@
 package com.ning.billing.util.notificationq;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
@@ -31,7 +33,7 @@ public interface NotificationQueue extends QueueLifecycle {
      * @param futureNotificationTime the time at which the notification is ready
      * @param notificationKey        the key for that notification
      */
-    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey)
+    public void recordFutureNotification(final DateTime futureNotificationTime, final UUID accountId, final NotificationKey notificationKey)
         throws IOException;
 
     /**
@@ -42,7 +44,9 @@ public interface NotificationQueue extends QueueLifecycle {
      * @param notificationKey        the key for that notification
      */
     public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
-                                                        final DateTime futureNotificationTime, final NotificationKey notificationKey)
+                                                        final DateTime futureNotificationTime,
+                                                        final UUID accountId,
+                                                        final NotificationKey notificationKey)
         throws IOException;
 
 
@@ -53,6 +57,13 @@ public interface NotificationQueue extends QueueLifecycle {
      */
     public void removeNotificationsByKey(final NotificationKey notificationKey);
 
+
+
+    public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate);
+
+    public void removeNotification(final UUID notificationId);
+
+
     /**
      * This is only valid when the queue has been configured with isNotificationProcessingOff is true
      * In which case, it will callback users for all the ready notifications.
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index a0e7dad..1513ec3 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -103,6 +103,7 @@ CREATE TABLE notifications (
     id char(36) NOT NULL,
     created_date datetime NOT NULL,
     class_name varchar(256) NOT NULL,
+    account_id  char(36) NOT NULL,
 	notification_key varchar(2048) NOT NULL,
 	creating_owner char(50) NOT NULL,
     effective_date datetime NOT NULL,
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 3c81e21..8cb0a27 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -5,6 +5,7 @@ getReadyNotifications() ::= <<
       record_id
       , id
       , class_name
+      , account_id
       , notification_key
       , created_date
       , creating_owner
@@ -30,6 +31,35 @@ getReadyNotifications() ::= <<
     ;
 >>
 
+getNotificationForAccountAndDate() ::= <<
+   select
+     record_id
+     , id
+     , class_name
+     , account_id
+     , notification_key
+     , created_date
+     , creating_owner
+     , effective_date
+     , queue_name
+     , processing_owner
+     , processing_available_date
+     , processing_state
+   from notifications
+   where
+   account_id = :accountId AND effective_date = :effectiveDate
+   ;
+>>
+
+removeNotification()  ::= <<
+  update notifications
+    set
+  processing_state = 'REMOVED'
+    where
+  id = :id
+; 
+>>
+
 claimNotification() ::= <<
     update notifications
     set
@@ -66,6 +96,7 @@ insertNotification() ::= <<
     insert into notifications (
       id
       , class_name
+      , account_id
       , notification_key
       , created_date
       , creating_owner
@@ -77,6 +108,7 @@ insertNotification() ::= <<
     ) values (
       :id
       , :className
+      , :accountId
       , :notificationKey
       , :createdDate
       , :creatingOwner
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index d7141d3..5a6c4a1 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -40,6 +40,7 @@ import com.ning.billing.util.io.IOUtils;
 import com.ning.billing.util.notificationq.DefaultNotification;
 import com.ning.billing.util.notificationq.Notification;
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao.NotificationSqlMapper;
+import com.ning.billing.util.queue.PersistentQueueBase;
 import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
 import static org.testng.Assert.assertEquals;
@@ -48,6 +49,8 @@ import static org.testng.Assert.assertNotNull;
 @Test(groups = "slow")
 @Guice(modules = TestNotificationSqlDao.TestNotificationSqlDaoModule.class)
 public class TestNotificationSqlDao {
+
+    private static final UUID accountId = UUID.randomUUID();
     private static final String hostname = "Yop";
 
     @Inject
@@ -102,7 +105,7 @@ public class TestNotificationSqlDao {
 
         final String notificationKey = UUID.randomUUID().toString();
         final DateTime effDt = new DateTime();
-        final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey.getClass().getName(), notificationKey, effDt);
+        final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt);
         dao.insertNotification(notif);
 
         Thread.sleep(1000);
@@ -141,6 +144,36 @@ public class TestNotificationSqlDao {
 
     }
 
+
+
+    @Test
+    public void testGetByAccountAndDate() throws InterruptedException {
+
+
+        final String notificationKey = UUID.randomUUID().toString();
+        final DateTime effDt = new DateTime();
+        final Notification notif1 = new DefaultNotification("testBasic1", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt);
+        dao.insertNotification(notif1);
+
+        final Notification notif2 = new DefaultNotification("testBasic2", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt);
+        dao.insertNotification(notif2);
+
+
+        List<Notification> notifications =  dao.getNotificationForAccountAndDate(accountId.toString(), effDt.toDate());
+        assertEquals(notifications.size(), 2);
+        for (Notification cur : notifications) {
+            Assert.assertEquals(cur.getProcessingState(), PersistentQueueEntryLifecycleState.AVAILABLE);
+            dao.removeNotification(cur.getId().toString());
+        }
+
+        notifications =  dao.getNotificationForAccountAndDate(accountId.toString(), effDt.toDate());
+        assertEquals(notifications.size(), 2);
+        for (Notification cur : notifications) {
+            Assert.assertEquals(cur.getProcessingState(), PersistentQueueEntryLifecycleState.REMOVED);
+        }
+    }
+
+
     private Notification fetchNotification(final String notificationId) {
         final Notification res = dbi.withHandle(new HandleCallback<Notification>() {
 
@@ -149,7 +182,8 @@ public class TestNotificationSqlDao {
                 final Notification res = handle.createQuery("   select" +
                                                                     " record_id " +
                                                                     ", id" +
-                                                                    ", class_name" +                                                                    
+                                                                    ", class_name" +
+                                                                    ", account_id" +
                                                                     ", notification_key" +
                                                                     ", created_date" +
                                                                     ", creating_owner" +
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index edb4f2e..259952c 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.TreeSet;
+import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
@@ -51,17 +52,17 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
+    public void recordFutureNotification(final DateTime futureNotificationTime, final UUID accountId, final NotificationKey notificationKey) throws IOException {
         final String json = objectMapper.writeValueAsString(notificationKey);
-        final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, futureNotificationTime);
+        final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, accountId, futureNotificationTime);
         synchronized (notifications) {
             notifications.add(notification);
         }
     }
 
     @Override
-    public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
-        recordFutureNotification(futureNotificationTime, notificationKey);
+    public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime futureNotificationTime, final UUID accountId, final NotificationKey notificationKey) throws IOException {
+        recordFutureNotification(futureNotificationTime, accountId, notificationKey);
     }
 
     public List<Notification> getPendingEvents() {
@@ -98,7 +99,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
             final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
                                                                                       "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
                                                                                       PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
-                                                                                      cur.getNotificationKey(), cur.getEffectiveDate());
+                                                                                      cur.getNotificationKey(), cur.getAccountId(), cur.getEffectiveDate());
             oldNotifications.add(cur);
             processedNotifications.add(processedNotification);
         }
@@ -131,4 +132,17 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
             }
         }
     }
+
+    @Override
+    public List<Notification> getNotificationForAccountAndDate(UUID accountId,
+            DateTime effectiveDate) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void removeNotification(UUID notificationId) {
+        // TODO Auto-generated method stub
+
+    }
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 11db32e..c9ba744 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -63,6 +63,9 @@ import static org.testng.Assert.assertEquals;
 @Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
 public class TestNotificationQueue {
     Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
+
+    private static final UUID accountId = UUID.randomUUID();
+
     @Inject
     private IDBI dbi;
 
@@ -86,9 +89,9 @@ public class TestNotificationQueue {
         helper.initDb(testDdl);
     }
 
-    
+
     private final static class TestNotificationKey implements NotificationKey, Comparable<TestNotificationKey> {
-        
+
         private final String value;
 
         @JsonCreator
@@ -106,8 +109,8 @@ public class TestNotificationQueue {
             return value.compareTo(arg0.value);
         }
     }
-    
-    
+
+
     @BeforeSuite(groups = "slow")
     public void setup() throws Exception {
         startMysql();
@@ -184,7 +187,7 @@ public class TestNotificationQueue {
 
                 transactional.insertDummy(obj);
                 queue.recordFutureNotificationFromTransaction(transactional,
-                        readyTime, notificationKey);
+                        readyTime, accountId, notificationKey);
                 log.info("Posted key: " + notificationKey);
 
                 return null;
@@ -245,7 +248,7 @@ public class TestNotificationQueue {
 
                     transactional.insertDummy(obj);
                     queue.recordFutureNotificationFromTransaction(transactional,
-                            now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
+                            now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), accountId, notificationKey);
                     return null;
                 }
             });
@@ -348,7 +351,7 @@ public class TestNotificationQueue {
         final NotificationKey notificationKeyFred = new TestNotificationKey("Fred");
 
 
-        final NotificationKey notificationKeyBarney = new TestNotificationKey("Barney"); 
+        final NotificationKey notificationKeyBarney = new TestNotificationKey("Barney");
 
         expectedNotificationsFred.put(notificationKeyFred, Boolean.FALSE);
         expectedNotificationsFred.put(notificationKeyBarney, Boolean.FALSE);
@@ -362,10 +365,10 @@ public class TestNotificationQueue {
 
                 transactional.insertDummy(obj);
                 queueFred.recordFutureNotificationFromTransaction(transactional,
-                        readyTime, notificationKeyFred);
+                        readyTime, accountId, notificationKeyFred);
                 log.info("posted key: " + notificationKeyFred.toString());
                 queueBarney.recordFutureNotificationFromTransaction(transactional,
-                        readyTime, notificationKeyBarney);
+                        readyTime, accountId, notificationKeyBarney);
                 log.info("posted key: " + notificationKeyBarney.toString());
 
                 return null;
@@ -444,11 +447,11 @@ public class TestNotificationQueue {
                     final TransactionStatus status) throws Exception {
 
                 queue.recordFutureNotificationFromTransaction(transactional,
-                        start.plus(nextReadyTimeIncrementMs), notificationKey);
+                        start.plus(nextReadyTimeIncrementMs), accountId, notificationKey);
                 queue.recordFutureNotificationFromTransaction(transactional,
-                        start.plus(2 * nextReadyTimeIncrementMs), notificationKey);
+                        start.plus(2 * nextReadyTimeIncrementMs), accountId, notificationKey);
                 queue.recordFutureNotificationFromTransaction(transactional,
-                        start.plus(3 * nextReadyTimeIncrementMs), notificationKey2);
+                        start.plus(3 * nextReadyTimeIncrementMs), accountId, notificationKey2);
                 return null;
             }
         });