killbill-aplcache

Adding a notification queue for Overdue plus adding the ability

4/6/2012 6:03:25 PM

Changes

overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicatorBundle.java 84(+0 -84)

Details

diff --git a/api/src/main/java/com/ning/billing/overdue/OverdueService.java b/api/src/main/java/com/ning/billing/overdue/OverdueService.java
index 068bc63..8841046 100644
--- a/api/src/main/java/com/ning/billing/overdue/OverdueService.java
+++ b/api/src/main/java/com/ning/billing/overdue/OverdueService.java
@@ -16,9 +16,6 @@
 
 package com.ning.billing.overdue;
 
-import com.ning.billing.catalog.api.overdue.OverdueError;
-import com.ning.billing.catalog.api.overdue.OverdueState;
-import com.ning.billing.catalog.api.overdue.Overdueable;
 import com.ning.billing.lifecycle.KillbillService;
 
 public interface OverdueService extends KillbillService {
@@ -26,5 +23,4 @@ public interface OverdueService extends KillbillService {
 
     public OverdueUserApi getUserApi();
     
-    public <T extends Overdueable> OverdueState<T> refresh(T overdueable) throws OverdueError;
 }
diff --git a/api/src/main/java/com/ning/billing/overdue/OverdueUserApi.java b/api/src/main/java/com/ning/billing/overdue/OverdueUserApi.java
index e2ead75..0cfbb6b 100644
--- a/api/src/main/java/com/ning/billing/overdue/OverdueUserApi.java
+++ b/api/src/main/java/com/ning/billing/overdue/OverdueUserApi.java
@@ -23,8 +23,6 @@ import com.ning.billing.catalog.api.overdue.Overdueable;
 
 public interface OverdueUserApi {
 
-    public <T extends Overdueable> OverdueState<T>  getOverdueStateFor(T overdueable) throws OverdueError;
-
     public <T extends Overdueable> OverdueState<T> refreshOverdueStateFor(T overdueable) throws OverdueError;
 
     public <T extends Overdueable> void setOverrideBillingStateForAccount(T overdueable, BillingState<T> state) throws OverdueError;
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
new file mode 100644
index 0000000..cbcbd9f
--- /dev/null
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.ovedue.notification;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.config.InvoiceConfig;
+import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.notificationq.NotificationConfig;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public class DefaultOverdueCheckNotifier implements  OverdueCheckNotifier {
+
+    private final static Logger log = LoggerFactory.getLogger(DefaultOverdueCheckNotifier.class);
+
+    public static final String OVERDUE_CHECK_NOTIFIER_QUEUE = "overdue-check-queue";
+
+    private final NotificationQueueService notificationQueueService;
+	private final InvoiceConfig config;
+
+    private NotificationQueue overdueQueue;
+	private final OverdueListener listener;
+
+    @Inject
+	public DefaultOverdueCheckNotifier(NotificationQueueService notificationQueueService,
+			InvoiceConfig config, OverdueListener listener){
+		this.notificationQueueService = notificationQueueService;
+		this.config = config;
+        this.listener = listener;
+	}
+
+    @Override
+    public void initialize() {
+		try {
+            overdueQueue = notificationQueueService.createNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
+            		OVERDUE_CHECK_NOTIFIER_QUEUE,
+                    new NotificationQueueHandler() {
+                @Override
+                public void handleReadyNotification(String notificationKey, DateTime eventDate) {
+                	try {
+                 		UUID key = UUID.fromString(notificationKey);
+                        processEvent(key , eventDate);
+                   	} catch (IllegalArgumentException e) {
+                		log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
+                		return;
+                	}
+
+                }
+            },
+            new NotificationConfig() {
+                @Override
+                public boolean isNotificationProcessingOff() {
+                    return config.isEventProcessingOff();
+                }
+                @Override
+                public long getNotificationSleepTimeMs() {
+                    return config.getNotificationSleepTimeMs();
+                }
+                @Override
+                public int getDaoMaxReadyEvents() {
+                    return config.getDaoMaxReadyEvents();
+                }
+                @Override
+                public long getDaoClaimTimeMs() {
+                    return config.getDaoClaimTimeMs();
+                }
+            });
+        } catch (NotificationQueueAlreadyExists e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void start() {
+    	overdueQueue.startQueue();
+    }
+
+    @Override
+    public void stop() {
+        if (overdueQueue != null) {
+        	overdueQueue.stopQueue();
+        }
+    }
+
+    private void processEvent(UUID overdueableId, DateTime eventDateTime) {
+        listener.handleNextOverdueCheck(overdueableId, eventDateTime); 
+    }
+
+
+}
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
new file mode 100644
index 0000000..e699414
--- /dev/null
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.ovedue.notification;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.catalog.api.overdue.Overdueable;
+import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
+public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
+    private final static Logger log = LoggerFactory.getLogger(DefaultOverdueCheckNotifier.class);
+
+	private final NotificationQueueService notificationQueueService;
+
+	@Inject
+    public DefaultOverdueCheckPoster(
+			NotificationQueueService notificationQueueService) {
+		super();
+		this.notificationQueueService = notificationQueueService;
+	}
+
+	@Override
+	public void insertOverdueCheckNotification(final Transmogrifier transactionalDao, final Overdueable overdueable, final DateTime futureNotificationTime) {
+    	NotificationQueue checkOverdueQueue;
+		try {
+			checkOverdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
+					DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
+			 log.info("Queuing overdue check notification. id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
+
+	            checkOverdueQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NotificationKey(){
+	                @Override
+	                public String toString() {
+	                    return overdueable.getId().toString();
+	                }
+	    	    });
+		} catch (NoSuchNotificationQueue e) {
+			log.error("Attempting to put items on a non-existent queue (DefaultOverdueCheck).", e);
+		}
+    }
+	
+	public void clearNotificationEventsFor(final Overdueable overdueable) {
+	    NotificationQueue checkOverdueQueue;
+        try {
+            checkOverdueQueue = notificationQueueService.getNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
+                DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
+            checkOverdueQueue.clearNotificationsFor(overdueable.getId());
+        } catch (NoSuchNotificationQueue e) {
+            log.error("Attempting to clear items from a non-existent queue (DefaultOverdueCheck).", e);
+        }
+	}
+}
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckNotifier.java
new file mode 100644
index 0000000..7ef6ab8
--- /dev/null
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckNotifier.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.ovedue.notification;
+
+
+public interface OverdueCheckNotifier {
+
+    public void initialize();
+
+    public void start();
+
+    public void stop();
+
+}
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java
new file mode 100644
index 0000000..8a0ad68
--- /dev/null
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueCheckPoster.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.ovedue.notification;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.ning.billing.catalog.api.overdue.Overdueable;
+
+public interface OverdueCheckPoster {
+
+	void insertOverdueCheckNotification(Transmogrifier transactionalDao,
+			Overdueable overdueable, DateTime futureNotificationTime);
+
+}
\ No newline at end of file
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueListener.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueListener.java
new file mode 100644
index 0000000..e3ddcf3
--- /dev/null
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/OverdueListener.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.ovedue.notification;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+public class OverdueListener {
+
+    public void handleNextOverdueCheck(UUID subscriptionId, DateTime eventDateTime) {
+        //TODO
+        
+    }
+
+}
diff --git a/overdue/src/main/java/com/ning/billing/overdue/api/DefaultOverdueUserApi.java b/overdue/src/main/java/com/ning/billing/overdue/api/DefaultOverdueUserApi.java
index cec80ad..fd72cee 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/api/DefaultOverdueUserApi.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/api/DefaultOverdueUserApi.java
@@ -31,38 +31,26 @@ import com.ning.billing.catalog.api.overdue.Overdueable;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.overdue.OverdueService;
 import com.ning.billing.overdue.OverdueUserApi;
+import com.ning.billing.overdue.wrapper.OverdueWrapper;
+import com.ning.billing.overdue.wrapper.OverdueWrapperFactory;
 import com.ning.billing.util.overdue.dao.OverdueAccessDao;
 
 public class DefaultOverdueUserApi implements OverdueUserApi{
 
-    private OverdueService service;
-    private CatalogService catalogService;
-    private OverdueAccessDao accessDao;
-
+    
+    private final OverdueWrapperFactory factory;
+   
     @Inject
-    public DefaultOverdueUserApi(OverdueService service, CatalogService catalogService, OverdueAccessDao accessDao) {
-        this.service = service;
-        this.catalogService = catalogService;
-        this.accessDao = accessDao;
+    public DefaultOverdueUserApi(OverdueWrapperFactory factory) {
+        this.factory = factory;
     }
     
-    @SuppressWarnings("unchecked")
-    @Override
-    public <T extends Overdueable> OverdueState<T> getOverdueStateFor(T overdueable) throws OverdueError {
-        try {
-            String stateName = accessDao.getOverdueStateNameFor(overdueable);
-            StaticCatalog catalog = catalogService.getCurrentCatalog();
-            OverdueStateSet<SubscriptionBundle> states = catalog.currentBundleOverdueStateSet();
-            return (OverdueState<T>) states.findState(stateName);
-        } catch (CatalogApiException e) {
-            throw new OverdueError(e, ErrorCode.OVERDUE_CAT_ERROR_ENCOUNTERED,overdueable.getId(), overdueable.getClass().getSimpleName());
-        }
-    }
-
     @Override
     public <T extends Overdueable> OverdueState<T> refreshOverdueStateFor(T overdueable) throws OverdueError {
-        return service.refresh(overdueable);     
+        OverdueWrapper<T> wrapper = factory.createOverdueWrapperFor(overdueable);
+        return wrapper.refresh();
     } 
+ 
 
     @Override
     public <T extends Overdueable> void setOverrideBillingStateForAccount(
diff --git a/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java b/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
index d49d0e6..1323e55 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
@@ -16,12 +16,64 @@
 
 package com.ning.billing.overdue.applicator;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 
+import com.google.inject.Inject;
 import com.ning.billing.catalog.api.overdue.OverdueState;
 import com.ning.billing.catalog.api.overdue.Overdueable;
+import com.ning.billing.overdue.dao.OverdueDao;
+import com.ning.billing.util.clock.Clock;
 
-public interface OverdueStateApplicator<T extends Overdueable>{
+public class OverdueStateApplicator<T extends Overdueable>{
+
+    private final OverdueDao overdueDao;
+    private final Clock clock;
+
+
+
+    @Inject
+    public OverdueStateApplicator(OverdueDao overdueDao, Clock clock) {
+        this.overdueDao = overdueDao;
+        this.clock = clock;
+    }
+
+    public void apply(T overdueable, OverdueState<T> previousOverdueState, OverdueState<T> nextOverdueState, DateTime timeOfNextCheck) {
+        if(previousOverdueState.getName().equals(nextOverdueState.getName())) {
+            return; // nothing to do
+        }
+        
+        storeNewState(overdueable, nextOverdueState);
+  
+        if(timeOfNextCheck != null && !nextOverdueState.isClearState()) {
+            createFutureNotification(overdueable, timeOfNextCheck);
+        }
+
+        if(nextOverdueState.isClearState()) {
+            clear(overdueable);
+        }
+        
+        //If new state is clear state reset next events and override table
+        throw new NotImplementedException();
+    }
+
+ 
+    protected void storeNewState(T overdueable, OverdueState<T> nextOverdueState) {
+       overdueDao.setOverdueState(overdueable, nextOverdueState, clock);
+    }
+    
+    protected void createFutureNotification(T overdueable,
+            DateTime timeOfNextCheck) {
+        // TODO Auto-generated method stub
+        
+    }
+
+
+    
+    protected void clear(T overdueable) {
+        // Clear future notification checks
+        // Clear any overrides
+        
+    }
 
-    public void apply(T overdueable, OverdueState<T> previousOverdueState, OverdueState<T> nextOverdueState, DateTime timeOfNextCheck);
 }
diff --git a/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueDao.java b/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueDao.java
index 75286de..fe69e34 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueDao.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueDao.java
@@ -22,6 +22,6 @@ import com.ning.billing.util.clock.Clock;
 
 public interface OverdueDao {
 
-    <T extends Overdueable> void  setOverdueStateForBundle(T overdueable, OverdueState<T> newOverdueState, Clock clock);
+    <T extends Overdueable> void  setOverdueState(T overdueable, OverdueState<T> newOverdueState, Clock clock);
 
 }
\ No newline at end of file
diff --git a/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueSqlDao.java b/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueSqlDao.java
index dac4050..bd77283 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueSqlDao.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/dao/OverdueSqlDao.java
@@ -32,7 +32,7 @@ public interface OverdueSqlDao extends OverdueDao {
 
     @Override
     @SqlUpdate
-    public abstract <T extends Overdueable> void setOverdueStateForBundle(
+    public abstract <T extends Overdueable> void setOverdueState(
             @Bind(binder = OverdueableBinder.class) T overdueable, 
             @Bind(binder = OverdueStateBinder.class) OverdueState<T> overdueState,
             @Bind(binder = CurrentTimeBinder.class) Clock clock) ;
diff --git a/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java b/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java
index 78fc37d..57c69d2 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java
@@ -17,22 +17,16 @@
 package com.ning.billing.overdue.service;
 
 import com.google.inject.Inject;
-import com.ning.billing.catalog.api.overdue.OverdueError;
-import com.ning.billing.catalog.api.overdue.OverdueState;
-import com.ning.billing.catalog.api.overdue.Overdueable;
 import com.ning.billing.overdue.OverdueService;
 import com.ning.billing.overdue.OverdueUserApi;
-import com.ning.billing.overdue.wrapper.OverdueWrapper;
-import com.ning.billing.overdue.wrapper.OverdueWrapperFactory;
 
 public class DefaultOverdueService implements OverdueService {
     public static final String OVERDUE_SERVICE_NAME = "overdue-service";
-    
-    private final OverdueWrapperFactory factory;
+    private OverdueUserApi userApi;
 
     @Inject
-    public DefaultOverdueService(OverdueWrapperFactory factory) {
-        this.factory = factory;
+    public DefaultOverdueService(OverdueUserApi userApi){
+        this.userApi = userApi;
     }
     
     @Override
@@ -42,13 +36,8 @@ public class DefaultOverdueService implements OverdueService {
 
     @Override
     public OverdueUserApi getUserApi() {
-        return null;
+        return userApi;
     }
 
-    @Override
-    public <T extends Overdueable> OverdueState<T> refresh(T overdueable) throws OverdueError {
-        OverdueWrapper<T> wrapper = factory.createOverdueWrapperFor(overdueable);
-        return wrapper.refresh();
-    } 
- 
+   
 }
diff --git a/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapper.java b/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapper.java
index 1b7feeb..cc943c8 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapper.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapper.java
@@ -21,20 +21,20 @@ import com.ning.billing.catalog.api.overdue.OverdueError;
 import com.ning.billing.catalog.api.overdue.OverdueState;
 import com.ning.billing.catalog.api.overdue.OverdueStateSet;
 import com.ning.billing.catalog.api.overdue.Overdueable;
-import com.ning.billing.overdue.OverdueUserApi;
 import com.ning.billing.overdue.applicator.OverdueStateApplicator;
 import com.ning.billing.overdue.calculator.BillingStateCalculator;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.overdue.OverdueAccessApi;
 
 public class OverdueWrapper<T extends Overdueable> {
     private final T overdueable;
-    private final OverdueUserApi api;
+    private final OverdueAccessApi api;
     private final Clock clock;
     private final OverdueStateSet<T> overdueStateSet;
     private final BillingStateCalculator<T> billingStateCalcuator;
     private final OverdueStateApplicator<T> overdueStateApplicator;
 
-    public OverdueWrapper(T overdueable, OverdueUserApi api,
+    public OverdueWrapper(T overdueable, OverdueAccessApi api,
             OverdueStateSet<T> overdueStateSet,
             Clock clock,
             BillingStateCalculator<T> billingStateCalcuator,
@@ -49,7 +49,7 @@ public class OverdueWrapper<T extends Overdueable> {
 
     public OverdueState<T> refresh() throws OverdueError {
         BillingState<T> billingState = billingStateCalcuator.calculateBillingState(overdueable);
-        OverdueState<T> previousOverdueStateName = api.getOverdueStateFor(overdueable);
+        String previousOverdueStateName = api.getOverdueStateNameFor(overdueable);
         OverdueState<T> nextOverdueState = overdueStateSet.calculateOverdueState(billingState, clock.getUTCNow());
         if(!previousOverdueStateName.equals(nextOverdueState.getName())) {
             overdueStateApplicator.apply(overdueable, nextOverdueState, nextOverdueState, overdueStateSet.dateOfNextCheck(billingState, clock.getUTCNow())); 
diff --git a/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapperFactory.java b/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapperFactory.java
index 76b3dd0..1df769d 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapperFactory.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/wrapper/OverdueWrapperFactory.java
@@ -23,21 +23,22 @@ import com.ning.billing.catalog.api.CatalogService;
 import com.ning.billing.catalog.api.overdue.OverdueError;
 import com.ning.billing.catalog.api.overdue.Overdueable;
 import com.ning.billing.entitlement.api.user.SubscriptionBundle;
-import com.ning.billing.overdue.OverdueUserApi;
-import com.ning.billing.overdue.applicator.OverdueStateApplicatorBundle;
+import com.ning.billing.overdue.applicator.OverdueStateApplicator;
 import com.ning.billing.overdue.calculator.BillingStateCalculatorBundle;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.overdue.OverdueAccessApi;
 
 public class OverdueWrapperFactory {
 
     private final CatalogService catalogService;
     private final BillingStateCalculatorBundle billingStateCalcuatorBundle;
-    private final OverdueStateApplicatorBundle overdueStateApplicatorBundle;
-    private final OverdueUserApi api;
+    private final OverdueStateApplicator<SubscriptionBundle> overdueStateApplicatorBundle;
+    private final OverdueAccessApi api;
     private final Clock clock;
 
     @Inject
-    public OverdueWrapperFactory(OverdueUserApi api, CatalogService catalogService, Clock clock, BillingStateCalculatorBundle billingStateCalcuatorBundle, OverdueStateApplicatorBundle overdueStateApplicatorBundle) {
+    public OverdueWrapperFactory(OverdueAccessApi api, CatalogService catalogService, Clock clock, 
+            BillingStateCalculatorBundle billingStateCalcuatorBundle, OverdueStateApplicator<SubscriptionBundle> overdueStateApplicatorBundle) {
         this.billingStateCalcuatorBundle = billingStateCalcuatorBundle;
         this.overdueStateApplicatorBundle = overdueStateApplicatorBundle;
         this.catalogService = catalogService;
@@ -45,7 +46,6 @@ public class OverdueWrapperFactory {
         this.clock = clock;
     }
 
-
     @SuppressWarnings("unchecked")
     public <T extends Overdueable> OverdueWrapper<T> createOverdueWrapperFor(T overdueable) throws OverdueError {
         try {
diff --git a/overdue/src/test/java/com/ning/billing/overdue/dao/TestOverdueDao.java b/overdue/src/test/java/com/ning/billing/overdue/dao/TestOverdueDao.java
index b49a670..c0efba2 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/dao/TestOverdueDao.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/dao/TestOverdueDao.java
@@ -78,12 +78,12 @@ public class TestOverdueDao {
         OverdueState<SubscriptionBundle> state = BrainDeadProxyFactory.createBrainDeadProxyFor(OverdueState.class);
         ((ZombieControl)state).addResult("getName", overdueStateName);
         
-        dao.setOverdueStateForBundle(bundle, state, clock);
+        dao.setOverdueState(bundle, state, clock);
         clock.setDeltaFromReality(1000 * 3600 * 24);
         
         String overdueStateName2 = "NoReallyThisCantGoOn";
         ((ZombieControl)state).addResult("getName", overdueStateName2);
-        dao.setOverdueStateForBundle(bundle, state, clock);
+        dao.setOverdueState(bundle, state, clock);
         
         Assert.assertEquals(accessDao.getOverdueStateNameFor(bundle), overdueStateName2);
         
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 9253e2d..f0a54f3 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -60,6 +60,9 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
     public void clearNotification(@Bind("id") long id, @Bind("owner") String owner);
 
     @SqlUpdate
+    public void removeNotificationsByKey(@Bind("notification_key") String key);
+    
+    @SqlUpdate
     public void insertNotification(@Bind(binder = NotificationSqlDaoBinder.class) Notification evt);
 
     @SqlUpdate
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index c0c88fe..a0e1827 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.notificationq;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
@@ -118,4 +119,10 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
             log.debug(String.format("Thread %d [queue = %s] %s", Thread.currentThread().getId(), getFullQName(), realDebug));
         }
     }
+
+    @Override
+    public void removeNotificationsByKey(UUID key) {
+        dao.removeNotificationsByKey(key.toString());
+        
+    }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
index 3200424..32cba9d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
@@ -24,7 +24,8 @@ public interface NotificationLifecycle {
     public enum NotificationLifecycleState {
         AVAILABLE,
         IN_PROCESSING,
-        PROCESSED
+        PROCESSED,
+        REMOVED
     }
 
     public String getOwner();
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index fb88d4c..14b68e0 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.util.notificationq;
 
+import java.util.UUID;
+
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
@@ -44,6 +46,13 @@ public interface NotificationQueue {
            final DateTime futureNotificationTime, final NotificationKey notificationKey);
 
    /**
+    * Remove all notifications associated with this key   
+    * 
+    * @param key
+    */
+   public void removeNotificationsByKey(UUID key);
+
+   /**
     * This is only valid when the queue has been configured with isNotificationProcessingOff is true
     * In which case, it will callback users for all the ready notifications.
     *
@@ -71,4 +80,6 @@ public interface NotificationQueue {
     */
    public String getFullQName();
 
+   
+
 }
diff --git a/util/src/main/java/com/ning/billing/util/overdue/DefaultOverdueAcessApi.java b/util/src/main/java/com/ning/billing/util/overdue/DefaultOverdueAcessApi.java
index 5b7e6be..eb1860e 100644
--- a/util/src/main/java/com/ning/billing/util/overdue/DefaultOverdueAcessApi.java
+++ b/util/src/main/java/com/ning/billing/util/overdue/DefaultOverdueAcessApi.java
@@ -17,7 +17,14 @@
 package com.ning.billing.util.overdue;
 
 import com.google.inject.Inject;
+import com.ning.billing.ErrorCode;
+import com.ning.billing.catalog.api.CatalogApiException;
+import com.ning.billing.catalog.api.StaticCatalog;
+import com.ning.billing.catalog.api.overdue.OverdueError;
+import com.ning.billing.catalog.api.overdue.OverdueState;
+import com.ning.billing.catalog.api.overdue.OverdueStateSet;
 import com.ning.billing.catalog.api.overdue.Overdueable;
+import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.util.overdue.dao.OverdueAccessDao;
 
 public class DefaultOverdueAcessApi implements OverdueAccessApi {
@@ -33,4 +40,16 @@ public class DefaultOverdueAcessApi implements OverdueAccessApi {
         return dao.getOverdueStateNameFor(overdueable);
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T extends Overdueable> OverdueState<T> getOverdueStateFor(T overdueable, StaticCatalog catalog) throws OverdueError {
+        try {
+            String stateName = getOverdueStateNameFor(overdueable);
+            OverdueStateSet<SubscriptionBundle> states = catalog.currentBundleOverdueStateSet();
+            return (OverdueState<T>) states.findState(stateName);
+        } catch (CatalogApiException e) {
+            throw new OverdueError(e, ErrorCode.OVERDUE_CAT_ERROR_ENCOUNTERED,overdueable.getId(), overdueable.getClass().getSimpleName());
+        }
+    }
+
 }
diff --git a/util/src/main/java/com/ning/billing/util/overdue/OverdueAccessApi.java b/util/src/main/java/com/ning/billing/util/overdue/OverdueAccessApi.java
index 4ca82d8..496a059 100644
--- a/util/src/main/java/com/ning/billing/util/overdue/OverdueAccessApi.java
+++ b/util/src/main/java/com/ning/billing/util/overdue/OverdueAccessApi.java
@@ -16,6 +16,9 @@
 
 package com.ning.billing.util.overdue;
 
+import com.ning.billing.catalog.api.StaticCatalog;
+import com.ning.billing.catalog.api.overdue.OverdueError;
+import com.ning.billing.catalog.api.overdue.OverdueState;
 import com.ning.billing.catalog.api.overdue.Overdueable;
 
 public interface OverdueAccessApi {
@@ -23,5 +26,8 @@ public interface OverdueAccessApi {
 
     public String getOverdueStateNameFor(Overdueable overdueable);
 
+    public <T extends Overdueable> OverdueState<T> getOverdueStateFor(T overdueable, StaticCatalog catalog)
+            throws OverdueError;
+
 
 }
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 7a7ecab..899e828 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -16,6 +16,7 @@ getReadyNotifications(now, max) ::= <<
       effective_dt \<= :now
       and queue_name = :queue_name
       and processing_state != 'PROCESSED'
+      and processing_state != 'REMOVED'
       and (processing_owner IS NULL OR processing_available_dt \<= :now)
     order by
       effective_dt asc
@@ -35,6 +36,7 @@ claimNotification(owner, next_available, id, now) ::= <<
     where
       id = :id
       and processing_state != 'PROCESSED'
+      and processing_state != 'REMOVED'
       and (processing_owner IS NULL OR processing_available_dt \<= :now)
     ;
 >>
@@ -48,6 +50,16 @@ clearNotification(id, owner) ::= <<
     ;
 >>
 
+removeNotificationsByKey(notification_key) ::= <<
+    update notifications
+    set
+      processing_state = 'REMOVED'
+    where
+      notification_key = :notification_key
+    ;
+>>
+
+
 insertNotification() ::= <<
     insert into notifications (
       notification_id
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 1c40988..c78c772 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -21,6 +21,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.TreeSet;
+import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
@@ -109,4 +110,20 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
         }
         return result;
     }
+
+    @Override
+    public void removeNotificationsByKey(UUID key) {
+        List<Notification> toClearNotifications = new ArrayList<Notification>();
+        for (Notification notification : notifications) {
+            if (notification.getNotificationKey().equals(key.toString())) {
+                    toClearNotifications.add(notification);
+            }
+        }
+        synchronized(notifications) {
+            if (toClearNotifications.size() > 0) {
+                notifications.removeAll(toClearNotifications);
+            }
+        }
+        
+    }
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index fefbcdb..6c60a8c 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -108,6 +108,7 @@ public class TestNotificationQueue {
         });
         // Reset time to real value
         ((ClockMock) clock).resetDeltaFromReality();
+        eventsReceived=0;
     }
 
 
@@ -406,6 +407,82 @@ public class TestNotificationQueue {
             }
         };
     }
+    
+    
+    @Test(groups="slow")
+    public void testRemoveNotifications() throws InterruptedException {
+        
+        final UUID key = UUID.randomUUID();
+        final NotificationKey notificationKey = new NotificationKey() {
+            @Override
+            public String toString() {
+                return key.toString();
+            }
+        };        
+        final UUID key2 = UUID.randomUUID();
+        final NotificationKey notificationKey2 = new NotificationKey() {
+            @Override
+            public String toString() {
+                return key2.toString();
+            }
+        };        
+
+        final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
+                new NotificationQueueHandler() {
+            @Override
+            public void handleReadyNotification(String key, DateTime eventDateTime) {
+                    if(key.equals(notificationKey) || key.equals(notificationKey2)) { //ignore stray events from other tests
+                        log.info("Received notification with key: " + notificationKey);
+                        eventsReceived++;
+                    }
+            }
+        },
+        getNotificationConfig(false, 100, 10, 10000));
+
+
+        queue.startQueue();
+
+        final DateTime start = clock.getUTCNow().plusHours(1);
+        final int nextReadyTimeIncrementMs = 1000;
+ 
+        // add 3 events
+
+        dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+            @Override
+            public Void inTransaction(DummySqlTest transactional,
+                    TransactionStatus status) throws Exception {
+
+                queue.recordFutureNotificationFromTransaction(transactional,
+                        start.plus(nextReadyTimeIncrementMs), notificationKey);
+                queue.recordFutureNotificationFromTransaction(transactional,
+                        start.plus(2 *nextReadyTimeIncrementMs), notificationKey);
+                queue.recordFutureNotificationFromTransaction(transactional,
+                        start.plus(3 * nextReadyTimeIncrementMs), notificationKey2);
+                return null;
+            }
+        });
+    
+    
+      queue.removeNotificationsByKey(key); // should remove 2 of the 3
+
+    // Move time in the future after the notification effectiveDate
+        ((ClockMock) clock).setDeltaFromReality(4000000 + nextReadyTimeIncrementMs * 3 );
+        
+        try {
+            await().atMost(10, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    return eventsReceived >= 2;
+                }
+            });
+            Assert.fail("There should only have been only one event left in the queue we got: " + eventsReceived);
+        } catch (Exception e) {
+            // expected behavior
+        }
+        log.info("Received " + eventsReceived + " events");
+        queue.stopQueue();
+    }
+
 
 
     public static class TestNotificationQueueModule extends AbstractModule {