killbill-aplcache

Changes

entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java 251(+0 -251)

entitlement/src/main/java/com/ning/billing/entitlement/engine/core/DefaultApiEventProcessor.java 63(+0 -63)

entitlement/src/test/java/com/ning/billing/entitlement/engine/core/MockApiEventProcessorMemory.java 63(+0 -63)

Details

diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
index c2f5878..19b851d 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
@@ -18,7 +18,11 @@ package com.ning.billing.entitlement.api.test;
 
 import com.google.inject.Inject;
 import com.ning.billing.config.EntitlementConfig;
-import com.ning.billing.entitlement.engine.core.EventNotifier;
+import com.ning.billing.entitlement.engine.core.Engine;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,20 +32,32 @@ public class DefaultEntitlementTestApi implements EntitlementTestApi {
 
     private final static Logger log = LoggerFactory.getLogger(DefaultEntitlementTestApi.class);
 
-    private final EventNotifier apiEventProcessor;
     private final EntitlementConfig config;
+    private final NotificationQueueService notificationQueueService;
 
     @Inject
-    public DefaultEntitlementTestApi(EventNotifier apiEventProcessor, EntitlementConfig config) {
-        this.apiEventProcessor = apiEventProcessor;
+    public DefaultEntitlementTestApi(NotificationQueueService notificationQueueService, EntitlementConfig config) {
         this.config = config;
+        this.notificationQueueService = notificationQueueService;
     }
 
     @Override
     public void doProcessReadyEvents(UUID [] subscriptionsIds, Boolean recursive, Boolean oneEventOnly) {
         if (config.isEventProcessingOff()) {
             log.warn("Running event processing loop");
-            apiEventProcessor.processAllReadyEvents(subscriptionsIds, recursive, oneEventOnly);
+            NotificationQueue queue = getNotificationQueue();
+            queue.processReadyNotification();
+
+        }
+    }
+
+    private NotificationQueue getNotificationQueue() {
+        try {
+            NotificationQueue subscritionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+                    Engine.NOTIFICATION_QUEUE_NAME);
+            return subscritionEventQueue;
+        } catch (NoSuchNotificationQueue e) {
+            throw new RuntimeException(e);
         }
     }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 3eb5dd3..7bd1124 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -16,6 +16,9 @@
 
 package com.ning.billing.entitlement.engine.core;
 
+import java.util.UUID;
+
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,10 +48,16 @@ import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.eventbus.EventBus;
 import com.ning.billing.util.eventbus.EventBus.EventBusException;
+import com.ning.billing.util.notificationq.NotificationConfig;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotficationQueueAlreadyExists;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 
 public class Engine implements EventListener, EntitlementService {
 
-    private static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
+    public static final String NOTIFICATION_QUEUE_NAME = "subscription-events";
+    public static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
 
     private final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
     private final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
@@ -58,33 +67,36 @@ public class Engine implements EventListener, EntitlementService {
 
     private final Clock clock;
     private final EntitlementDao dao;
-    private final EventNotifier apiEventProcessor;
     private final PlanAligner planAligner;
     private final EntitlementUserApi userApi;
     private final EntitlementBillingApi billingApi;
     private final EntitlementTestApi testApi;
     private final EntitlementMigrationApi migrationApi;
     private final EventBus eventBus;
+    private final EntitlementConfig config;
+    private final NotificationQueueService notificationQueueService;
 
     private boolean startedNotificationThread;
+    private boolean stoppedNotificationThread;
+    private NotificationQueue subscritionEventQueue;
 
     @Inject
-    public Engine(Clock clock, EntitlementDao dao, EventNotifier apiEventProcessor,
-            PlanAligner planAligner, EntitlementConfig config, DefaultEntitlementUserApi userApi,
+    public Engine(Clock clock, EntitlementDao dao, PlanAligner planAligner,
+            EntitlementConfig config, DefaultEntitlementUserApi userApi,
             DefaultEntitlementBillingApi billingApi, DefaultEntitlementTestApi testApi,
-            DefaultEntitlementMigrationApi migrationApi, EventBus eventBus) {
+            DefaultEntitlementMigrationApi migrationApi, EventBus eventBus,
+            NotificationQueueService notificationQueueService) {
         super();
         this.clock = clock;
         this.dao = dao;
-        this.apiEventProcessor = apiEventProcessor;
         this.planAligner = planAligner;
         this.userApi = userApi;
         this.testApi = testApi;
         this.billingApi = billingApi;
         this.migrationApi = migrationApi;
+        this.config = config;
         this.eventBus = eventBus;
-
-        this.startedNotificationThread = false;
+        this.notificationQueueService = notificationQueueService;
     }
 
     @Override
@@ -92,20 +104,75 @@ public class Engine implements EventListener, EntitlementService {
         return ENTITLEMENT_SERVICE_NAME;
     }
 
-
     @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
     public void initialize() {
+
+        try {
+            this.stoppedNotificationThread = false;
+            this.startedNotificationThread = false;
+            subscritionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
+                    NOTIFICATION_QUEUE_NAME,
+                    new NotificationQueueHandler() {
+                @Override
+                public void handleReadyNotification(String notificationKey) {
+                    EntitlementEvent event = dao.getEventById(UUID.fromString(notificationKey));
+                    if (event == null) {
+                        log.warn("Failed to extract event for notification key {}", notificationKey);
+                    } else {
+                        processEventReady(event);
+                    }
+                }
+
+                @Override
+                public void completedQueueStop() {
+                    synchronized (this) {
+                        stoppedNotificationThread = true;
+                        this.notifyAll();
+                    }
+                }
+                @Override
+                public void completedQueueStart() {
+                    synchronized (this) {
+                        startedNotificationThread = true;
+                        this.notifyAll();
+                    }
+                }
+            },
+            new NotificationConfig() {
+                @Override
+                public boolean isNotificationProcessingOff() {
+                    return config.isEventProcessingOff();
+                }
+                @Override
+                public long getNotificationSleepTimeMs() {
+                    return config.getNotificationSleepTimeMs();
+                }
+                @Override
+                public int getDaoMaxReadyEvents() {
+                    return config.getDaoMaxReadyEvents();
+                }
+                @Override
+                public long getDaoClaimTimeMs() {
+                    return config.getDaoMaxReadyEvents();
+                }
+            });
+        } catch (NotficationQueueAlreadyExists e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
     public void start() {
-        apiEventProcessor.startNotifications(this);
+        subscritionEventQueue.startQueue();
         waitForNotificationStartCompletion();
     }
 
     @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
     public void stop() {
-        apiEventProcessor.stopNotifications();
+        if (subscritionEventQueue != null) {
+            subscritionEventQueue.stopQueue();
+            waitForNotificationStopCompletion();
+        }
         startedNotificationThread = false;
     }
 
@@ -133,6 +200,9 @@ public class Engine implements EventListener, EntitlementService {
 
     @Override
     public void processEventReady(EntitlementEvent event) {
+        if (!event.isActive()) {
+            return;
+        }
         SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(event.getSubscriptionId());
         if (subscription == null) {
             log.warn("Failed to retrieve subscription for id %s", event.getSubscriptionId());
@@ -148,23 +218,20 @@ public class Engine implements EventListener, EntitlementService {
         }
     }
 
-    //
-    // We want to ensure the notification thread is indeed started when we return from start()
-    //
-    @Override
-    public void completedNotificationStart() {
-        synchronized (this) {
-            startedNotificationThread = true;
-            this.notifyAll();
-        }
+    private void waitForNotificationStartCompletion() {
+        waitForNotificationEventCompletion(true);
     }
 
-    private void waitForNotificationStartCompletion() {
+    private void waitForNotificationStopCompletion() {
+        waitForNotificationEventCompletion(false);
+    }
+
+    private void waitForNotificationEventCompletion(boolean startEvent) {
 
         long ini = System.nanoTime();
         synchronized(this) {
             do {
-                if (startedNotificationThread) {
+                if ((startEvent ? startedNotificationThread : stoppedNotificationThread)) {
                     break;
                 }
                 try {
@@ -173,14 +240,18 @@ public class Engine implements EventListener, EntitlementService {
                     Thread.currentThread().interrupt();
                     throw new EntitlementError(e);
                 }
-            } while (!startedNotificationThread &&
+            } while (!(startEvent ? startedNotificationThread : stoppedNotificationThread) &&
                     (System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
 
-            if (!startedNotificationThread) {
-                log.error("Could not start notification thread in %d msec !!!", MAX_NOTIFICATION_THREAD_WAIT_MS);
+            if (!(startEvent ? startedNotificationThread : stoppedNotificationThread)) {
+                log.error("Could not {} notification thread in {} msec !!!",
+                        (startEvent ? "start" : "stop"),
+                        MAX_NOTIFICATION_THREAD_WAIT_MS);
                 throw new EntitlementError("Failed to start service!!");
             }
-            log.info("Notification thread has been started in {} ms", (System.nanoTime() - ini) / NANO_TO_MS);
+            log.info("Notification thread has been {} in {} ms",
+                    (startEvent ? "started" : "stopped"),
+                    (System.nanoTime() - ini) / NANO_TO_MS);
         }
     }
 
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java
index 7a84c51..e9962d8 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java
@@ -24,5 +24,4 @@ public interface EventListener {
 
     public void processEventReady(EntitlementEvent event);
 
-    public void completedNotificationStart();
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
index b118110..ea62b84 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
@@ -54,14 +54,12 @@ public interface EntitlementDao {
     // Event apis
     public void createNextPhaseEvent(UUID subscriptionId, EntitlementEvent nextPhase);
 
+    public EntitlementEvent getEventById(UUID eventId);
+
     public List<EntitlementEvent> getEventsForSubscription(UUID subscriptionId);
 
     public List<EntitlementEvent> getPendingEventsForSubscription(UUID subscriptionId);
 
-    public List<EntitlementEvent> getEventsReady(UUID ownerId, int sequenceId);
-
-    public void clearEventsReady(UUID ownerId, Collection<EntitlementEvent> cleared);
-
     // Subscription creation, cancellation, changePlan apis
     public void createSubscription(SubscriptionData subscription, List<EntitlementEvent> initialEvents);
 
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
index fc4db0f..72a71d8 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
@@ -17,7 +17,6 @@
 package com.ning.billing.entitlement.engine.dao;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
@@ -25,7 +24,6 @@ import java.util.UUID;
 
 import com.google.inject.Inject;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.config.EntitlementConfig;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
@@ -35,16 +33,23 @@ import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.api.user.SubscriptionFactory;
 import com.ning.billing.entitlement.api.user.SubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.engine.core.Engine;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
 import com.ning.billing.entitlement.events.user.ApiEvent;
 import com.ning.billing.entitlement.events.user.ApiEventType;
 import com.ning.billing.entitlement.exceptions.EntitlementError;
-import com.ning.billing.util.Hostname;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
+import org.joda.time.DateTime;
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,19 +62,17 @@ public class EntitlementSqlDao implements EntitlementDao {
     private final SubscriptionSqlDao subscriptionsDao;
     private final BundleSqlDao bundlesDao;
     private final EventSqlDao eventsDao;
-    private final EntitlementConfig config;
-    private final String hostname;
     private final SubscriptionFactory factory;
+    private final NotificationQueueService notificationQueueService;
 
     @Inject
-    public EntitlementSqlDao(DBI dbi, Clock clock, EntitlementConfig config, SubscriptionFactory factory) {
+    public EntitlementSqlDao(DBI dbi, Clock clock, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
         this.clock = clock;
-        this.config = config;
         this.factory = factory;
         this.subscriptionsDao = dbi.onDemand(SubscriptionSqlDao.class);
         this.eventsDao = dbi.onDemand(EventSqlDao.class);
         this.bundlesDao = dbi.onDemand(BundleSqlDao.class);
-        this.hostname = Hostname.get();
+        this.notificationQueueService = notificationQueueService;
     }
 
     @Override
@@ -146,11 +149,24 @@ public class EntitlementSqlDao implements EntitlementDao {
                     TransactionStatus status) throws Exception {
                 cancelNextPhaseEventFromTransaction(subscriptionId, dao);
                 dao.insertEvent(nextPhase);
+                recordFutureNotificationFromTransaction(dao,
+                        nextPhase.getEffectiveDate(),
+                        new NotificationKey() {
+                            @Override
+                            public String toString() {
+                                return nextPhase.getId().toString();
+                            }
+                        });
                 return null;
             }
         });
     }
 
+    @Override
+    public EntitlementEvent getEventById(UUID eventId) {
+        return eventsDao.getEventById(eventId.toString());
+    }
+
 
     @Override
     public List<EntitlementEvent> getEventsForSubscription(UUID subscriptionId) {
@@ -165,61 +181,6 @@ public class EntitlementSqlDao implements EntitlementDao {
         return results;
     }
 
-    @Override
-    public List<EntitlementEvent> getEventsReady(final UUID ownerId, final int sequenceId) {
-
-        final Date now = clock.getUTCNow().toDate();
-        final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
-
-        log.debug(String.format("EntitlementDao getEventsReady START effectiveNow =  %s", now));
-
-        List<EntitlementEvent> events = eventsDao.inTransaction(new Transaction<List<EntitlementEvent>, EventSqlDao>() {
-
-            @Override
-            public List<EntitlementEvent> inTransaction(EventSqlDao dao,
-                    TransactionStatus status) throws Exception {
-
-                List<EntitlementEvent> claimedEvents = new ArrayList<EntitlementEvent>();
-                List<EntitlementEvent> input = dao.getReadyEvents(now, config.getDaoMaxReadyEvents());
-                for (EntitlementEvent cur : input) {
-                    final boolean claimed = (dao.claimEvent(ownerId.toString(), nextAvailable, cur.getId().toString(), now) == 1);
-                    if (claimed) {
-                        claimedEvents.add(cur);
-                        dao.insertClaimedHistory(sequenceId, ownerId.toString(), hostname, now, cur.getId().toString());
-                    }
-                }
-                return claimedEvents;
-            }
-        });
-
-        for (EntitlementEvent cur : events) {
-            log.debug(String.format("EntitlementDao %s [host %s] claimed events %s", ownerId, hostname, cur.getId()));
-            if (cur.getOwner() != null && !cur.getOwner().equals(ownerId)) {
-                log.warn(String.format("EventProcessor %s stealing event %s from %s", ownerId, cur, cur.getOwner()));
-            }
-        }
-        return events;
-    }
-
-    @Override
-    public void clearEventsReady(final UUID ownerId, final Collection<EntitlementEvent> cleared) {
-
-        log.debug(String.format("EntitlementDao clearEventsReady START cleared size = %d", cleared.size()));
-
-        eventsDao.inTransaction(new Transaction<Void, EventSqlDao>() {
-
-            @Override
-            public Void inTransaction(EventSqlDao dao,
-                    TransactionStatus status) throws Exception {
-                // STEPH Same here batch would nice
-                for (EntitlementEvent cur : cleared) {
-                    dao.clearEvent(cur.getId().toString(), ownerId.toString());
-                    log.debug(String.format("EntitlementDao %s [host %s] cleared events %s", ownerId, hostname, cur.getId()));
-                }
-                return null;
-            }
-        });
-    }
 
     @Override
     public void createSubscription(final SubscriptionData subscription,
@@ -234,8 +195,16 @@ public class EntitlementSqlDao implements EntitlementDao {
                 dao.insertSubscription(subscription);
                 // STEPH batch as well
                 EventSqlDao eventsDaoFromSameTranscation = dao.become(EventSqlDao.class);
-                for (EntitlementEvent cur : initialEvents) {
+                for (final EntitlementEvent cur : initialEvents) {
                     eventsDaoFromSameTranscation.insertEvent(cur);
+                    recordFutureNotificationFromTransaction(dao,
+                            cur.getEffectiveDate(),
+                            new NotificationKey() {
+                                @Override
+                                public String toString() {
+                                    return cur.getId().toString();
+                                }
+                            });
                 }
                 return null;
             }
@@ -252,6 +221,14 @@ public class EntitlementSqlDao implements EntitlementDao {
                 cancelNextChangeEventFromTransaction(subscriptionId, dao);
                 cancelNextPhaseEventFromTransaction(subscriptionId, dao);
                 dao.insertEvent(cancelEvent);
+                recordFutureNotificationFromTransaction(dao,
+                        cancelEvent.getEffectiveDate(),
+                        new NotificationKey() {
+                            @Override
+                            public String toString() {
+                                return cancelEvent.getId().toString();
+                            }
+                        });
                 return null;
             }
         });
@@ -281,8 +258,16 @@ public class EntitlementSqlDao implements EntitlementDao {
 
                 if (existingCancelId != null) {
                     dao.unactiveEvent(existingCancelId.toString(), now);
-                    for (EntitlementEvent cur : uncancelEvents) {
+                    for (final EntitlementEvent cur : uncancelEvents) {
                         dao.insertEvent(cur);
+                        recordFutureNotificationFromTransaction(dao,
+                                cur.getEffectiveDate(),
+                                new NotificationKey() {
+                            @Override
+                            public String toString() {
+                                return cur.getId().toString();
+                            }
+                        });
                     }
                 }
                 return null;
@@ -298,8 +283,16 @@ public class EntitlementSqlDao implements EntitlementDao {
                     TransactionStatus status) throws Exception {
                 cancelNextChangeEventFromTransaction(subscriptionId, dao);
                 cancelNextPhaseEventFromTransaction(subscriptionId, dao);
-                for (EntitlementEvent cur : changeEvents) {
+                for (final EntitlementEvent cur : changeEvents) {
                     dao.insertEvent(cur);
+                    recordFutureNotificationFromTransaction(dao,
+                            cur.getEffectiveDate(),
+                            new NotificationKey() {
+                        @Override
+                        public String toString() {
+                            return cur.getId().toString();
+                        }
+                    });
                 }
                 return null;
             }
@@ -372,8 +365,16 @@ public class EntitlementSqlDao implements EntitlementDao {
                     SubscriptionBundleData bundleData = curBundle.getData();
                     for (SubscriptionMigrationData curSubscription : curBundle.getSubscriptions()) {
                         SubscriptionData subData = curSubscription.getData();
-                        for (EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
+                        for (final EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
                             transEventDao.insertEvent(curEvent);
+                            recordFutureNotificationFromTransaction(transEventDao,
+                                    curEvent.getEffectiveDate(),
+                                    new NotificationKey() {
+                                @Override
+                                public String toString() {
+                                    return curEvent.getId().toString();
+                                }
+                            });
                         }
                         transSubDao.insertSubscription(subData);
                     }
@@ -384,6 +385,7 @@ public class EntitlementSqlDao implements EntitlementDao {
         });
     }
 
+
     @Override
     public void undoMigration(final UUID accountId) {
 
@@ -412,4 +414,14 @@ public class EntitlementSqlDao implements EntitlementDao {
             transBundleDao.removeBundle(curBundle.getId().toString());
         }
     }
+
+    private void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime effectiveDate, final NotificationKey notificationKey) {
+        try {
+            NotificationQueue subscritionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+                Engine.NOTIFICATION_QUEUE_NAME);
+            subscritionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
+        } catch (NoSuchNotificationQueue e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java
index de61217..5f485e5 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java
@@ -19,7 +19,6 @@ package com.ning.billing.entitlement.engine.dao;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
 import com.ning.billing.entitlement.events.EventBaseBuilder;
-import com.ning.billing.entitlement.events.EventLifecycle.EventLifecycleState;
 import com.ning.billing.entitlement.events.phase.PhaseEvent;
 import com.ning.billing.entitlement.events.phase.PhaseEventBuilder;
 import com.ning.billing.entitlement.events.phase.PhaseEventData;
@@ -49,43 +48,31 @@ import java.util.UUID;
 @ExternalizedSqlViaStringTemplate3()
 public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transmogrifier  {
 
-    //
-    // APIs for event notifications
-    //
     @SqlQuery
-    @Mapper(IEventSqlMapper.class)
-    public List<EntitlementEvent> getReadyEvents(@Bind("now") Date now, @Bind("max") int max);
+    @Mapper(EventSqlMapper.class)
+    public EntitlementEvent getEventById(@Bind("event_id") String eventId);
 
     @SqlUpdate
-    public int claimEvent(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("event_id") String eventId, @Bind("now") Date now);
+    public void insertEvent(@Bind(binder = EventSqlDaoBinder.class) EntitlementEvent evt);
 
     @SqlUpdate
     public void removeEvents(@Bind("subscription_id") String subscriptionId);
 
     @SqlUpdate
-    public void clearEvent(@Bind("event_id") String eventId, @Bind("owner") String owner);
-
-    @SqlUpdate
-    public void insertEvent(@Bind(binder = IEventSqlDaoBinder.class) EntitlementEvent evt);
-
-    @SqlUpdate
-    public void insertClaimedHistory(@Bind("sequence_id") int sequenceId, @Bind("owner_id") String ownerId, @Bind("hostname") String hostname, @Bind("claimed_dt") Date clainedDate, @Bind("event_id") String eventId);
-
-    @SqlUpdate
     public void unactiveEvent(@Bind("event_id")String eventId, @Bind("now") Date now);
 
     @SqlUpdate
     public void reactiveEvent(@Bind("event_id")String eventId, @Bind("now") Date now);
 
     @SqlQuery
-    @Mapper(IEventSqlMapper.class)
+    @Mapper(EventSqlMapper.class)
     public List<EntitlementEvent> getFutureActiveEventForSubscription(@Bind("subscription_id") String subscriptionId, @Bind("now") Date now);
 
     @SqlQuery
-    @Mapper(IEventSqlMapper.class)
+    @Mapper(EventSqlMapper.class)
     public List<EntitlementEvent> getEventsForSubscription(@Bind("subscription_id") String subscriptionId);
 
-    public static class IEventSqlDaoBinder implements Binder<Bind, EntitlementEvent> {
+    public static class EventSqlDaoBinder implements Binder<Bind, EntitlementEvent> {
 
         private Date getDate(DateTime dateTime) {
             return dateTime == null ? null : dateTime.toDate();
@@ -106,13 +93,10 @@ public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transm
             stmt.bind("plist_name", (evt.getType() == EventType.API_USER) ? ((ApiEvent) evt).getPriceList() : null);
             stmt.bind("current_version", evt.getActiveVersion());
             stmt.bind("is_active", evt.isActive());
-            stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
-            stmt.bind("processing_owner", (String) null);
-            stmt.bind("processing_state", EventLifecycleState.AVAILABLE.toString());
         }
     }
 
-    public static class IEventSqlMapper implements ResultSetMapper<EntitlementEvent> {
+    public static class EventSqlMapper implements ResultSetMapper<EntitlementEvent> {
 
         private DateTime getDate(ResultSet r, String fieldName) throws SQLException {
             final Timestamp resultStamp = r.getTimestamp(fieldName);
@@ -135,9 +119,6 @@ public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transm
             String priceListName = r.getString("plist_name");
             long currentVersion = r.getLong("current_version");
             boolean isActive = r.getBoolean("is_active");
-            DateTime nextAvailableDate = getDate(r, "processing_available_dt");
-            UUID processingOwner = (r.getString("processing_owner") != null) ? UUID.fromString(r.getString("processing_owner")) : null;
-            EventLifecycleState processingState = EventLifecycleState.valueOf(r.getString("processing_state"));
 
             EventBaseBuilder<?> base = ((eventType == EventType.PHASE) ?
                     new PhaseEventBuilder() :
@@ -148,11 +129,7 @@ public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transm
                         .setEffectiveDate(effectiveDate)
                         .setProcessedDate(createdDate)
                         .setActiveVersion(currentVersion)
-                        .setActive(isActive)
-                        .setProcessingOwner(processingOwner)
-                        .setNextAvailableProcessingTime(nextAvailableDate)
-                        .setProcessingState(processingState);
-
+                        .setActive(isActive);
 
             EntitlementEvent result = null;
             if (eventType == EventType.PHASE) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java
index af74752..b7bfece 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java
@@ -21,7 +21,7 @@ import org.joda.time.DateTime;
 import java.util.UUID;
 
 
-public interface EntitlementEvent extends EventLifecycle, Comparable<EntitlementEvent> {
+public interface EntitlementEvent extends Comparable<EntitlementEvent> {
 
     public enum EventType {
         API_USER,
@@ -32,6 +32,16 @@ public interface EntitlementEvent extends EventLifecycle, Comparable<Entitlement
 
     public UUID getId();
 
+    public long getActiveVersion();
+
+    public void setActiveVersion(long activeVersion);
+
+    public boolean isActive();
+
+    public void deactivate();
+
+    public void reactivate();
+
     public DateTime getProcessedDate();
 
     public DateTime getRequestedDate();
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java
index 93dc9e1..9420fbf 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java
@@ -30,12 +30,8 @@ public abstract class EventBase implements EntitlementEvent {
     private final DateTime effectiveDate;
     private final DateTime processedDate;
 
-    // Lifecyle of the event
     private long activeVersion;
     private boolean isActive;
-    private UUID processingOwner;
-    private DateTime nextAvailableProcessingTime;
-    private EventLifecycleState processingState;
 
     public EventBase(EventBaseBuilder<?> builder) {
         this.uuid = builder.getUuid();
@@ -46,31 +42,17 @@ public abstract class EventBase implements EntitlementEvent {
 
         this.activeVersion = builder.getActiveVersion();
         this.isActive = builder.isActive();
-        this.processingOwner = builder.getProcessingOwner();
-        this.nextAvailableProcessingTime = builder.getNextAvailableProcessingTime();
-        this.processingState = builder.getProcessingState();
     }
 
     public EventBase(UUID subscriptionId, DateTime requestedDate,
             DateTime effectiveDate, DateTime processedDate,
             long activeVersion, boolean isActive) {
-        this(subscriptionId, requestedDate, effectiveDate, processedDate, activeVersion, isActive, null, null, EventLifecycleState.AVAILABLE);
-    }
-
-    private EventBase(UUID subscriptionId, DateTime requestedDate,
-            DateTime effectiveDate, DateTime processedDate,
-            long activeVersion, boolean isActive,
-            UUID processingOwner, DateTime nextAvailableProcessingTime,
-            EventLifecycleState processingState) {
-        this(UUID.randomUUID(), subscriptionId, requestedDate, effectiveDate, processedDate, activeVersion, isActive,
-                processingOwner, nextAvailableProcessingTime, processingState);
+        this(UUID.randomUUID(), subscriptionId, requestedDate, effectiveDate, processedDate, activeVersion, isActive);
     }
 
     public EventBase(UUID id, UUID subscriptionId, DateTime requestedDate,
             DateTime effectiveDate, DateTime processedDate,
-            long activeVersion, boolean isActive,
-            UUID processingOwner, DateTime nextAvailableProcessingTime,
-            EventLifecycleState processingState) {
+            long activeVersion, boolean isActive) {
         this.uuid = id;
         this.subscriptionId = subscriptionId;
         this.requestedDate = requestedDate;
@@ -79,10 +61,6 @@ public abstract class EventBase implements EntitlementEvent {
 
         this.activeVersion = activeVersion;
         this.isActive = isActive;
-        this.processingOwner = processingOwner;
-        this.nextAvailableProcessingTime = nextAvailableProcessingTime;
-        this.processingState = processingState;
-
     }
 
 
@@ -138,64 +116,9 @@ public abstract class EventBase implements EntitlementEvent {
     }
 
 
-    @Override
-    public UUID getOwner() {
-        return processingOwner;
-    }
-
-    @Override
-    public void setOwner(UUID owner) {
-        this.processingOwner = owner;
-    }
-
-    @Override
-    public DateTime getNextAvailableDate() {
-        return nextAvailableProcessingTime;
-    }
-
-    @Override
-    public void setNextAvailableDate(DateTime dateTime) {
-        this.nextAvailableProcessingTime = dateTime;
-    }
-
-
-    @Override
-    public EventLifecycleState getProcessingState() {
-        return processingState;
-    }
-
-    @Override
-    public void setProcessingState(EventLifecycleState processingState) {
-        this.processingState = processingState;
-    }
-
-    @Override
-    public boolean isAvailableForProcessing(DateTime now) {
-
-        // Event got deactivated, will never be processed
-        if (!isActive) {
-            return false;
-        }
-
-        switch(processingState) {
-        case AVAILABLE:
-            break;
-        case IN_PROCESSING:
-            // Somebody already got the event, not available yet
-            if (nextAvailableProcessingTime.isAfter(now)) {
-                return false;
-            }
-            break;
-        case PROCESSED:
-            return false;
-        default:
-            throw new EntitlementError(String.format("Unkwnon IEvent processing state %s", processingState));
-        }
-        return effectiveDate.isBefore(now);
-    }
 
     //
-    // Really used for unit tesrs only as the sql implementation relies on date first and then event insertion
+    // Really used for unit tests only as the sql implementation relies on date first and then event insertion
     //
     // Order first by:
     // - effectiveDate, followed by processedDate, requestedDate
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java
index 104fbef..17f5e15 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java
@@ -16,7 +16,6 @@
 
 package com.ning.billing.entitlement.events;
 
-import com.ning.billing.entitlement.events.EventLifecycle.EventLifecycleState;
 import org.joda.time.DateTime;
 
 import java.util.UUID;
@@ -32,15 +31,11 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
 
     private long activeVersion;
     private boolean isActive;
-    private UUID processingOwner;
-    private DateTime nextAvailableProcessingTime;
-    private EventLifecycleState processingState;
 
 
     public EventBaseBuilder() {
         this.uuid = UUID.randomUUID();
         this.isActive = true;
-        this.processingState = EventLifecycleState.AVAILABLE;
     }
 
     public EventBaseBuilder(EventBaseBuilder<?> copy) {
@@ -52,9 +47,6 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
 
         this.activeVersion = copy.activeVersion;
         this.isActive = copy.isActive;
-        this.processingOwner = copy.processingOwner;
-        this.nextAvailableProcessingTime = copy.nextAvailableProcessingTime;
-        this.processingState = copy.processingState;
     }
 
     public T setUuid(UUID uuid) {
@@ -92,21 +84,6 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
         return (T) this;
     }
 
-    public T setProcessingOwner(UUID processingOwner) {
-        this.processingOwner = processingOwner;
-        return (T) this;
-    }
-
-    public T setNextAvailableProcessingTime(DateTime nextAvailableProcessingTime) {
-        this.nextAvailableProcessingTime = nextAvailableProcessingTime;
-        return (T) this;
-    }
-
-    public T setProcessingState(EventLifecycleState processingState) {
-        this.processingState = processingState;
-        return (T) this;
-    }
-
     public UUID getUuid() {
         return uuid;
     }
@@ -134,16 +111,4 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
     public boolean isActive() {
         return isActive;
     }
-
-    public UUID getProcessingOwner() {
-        return processingOwner;
-    }
-
-    public DateTime getNextAvailableProcessingTime() {
-        return nextAvailableProcessingTime;
-    }
-
-    public EventLifecycleState getProcessingState() {
-        return processingState;
-    }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java
index 3f033a4..2438ddf 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java
@@ -58,17 +58,6 @@ public class ApiEventBase extends EventBase implements ApiEvent {
     }
 
 
-    public ApiEventBase(UUID id, UUID subscriptionId, DateTime processed, String eventPlan, String eventPhase,
-            String priceList, DateTime requestedDate,  ApiEventType eventType, DateTime effectiveDate, long activeVersion,
-            boolean isActive, UUID processingOwner, DateTime nextAvailableProcessingTime,EventLifecycleState processingState) {
-        super(id, subscriptionId, requestedDate, effectiveDate, processed, activeVersion, isActive, processingOwner, nextAvailableProcessingTime, processingState);
-        this.eventType = eventType;
-        this.eventPlan = eventPlan;
-        this.eventPlanPhase = eventPhase;
-        this.eventPriceList = priceList;
-    }
-
-
     @Override
     public ApiEventType getEventType() {
         return eventType;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java b/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
index ab04700..a5ab3d8 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
@@ -30,7 +30,6 @@ import com.ning.billing.entitlement.api.test.EntitlementTestApi;
 import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
 import com.ning.billing.entitlement.api.user.EntitlementUserApi;
 import com.ning.billing.entitlement.api.user.SubscriptionApiService;
-import com.ning.billing.entitlement.engine.core.DefaultApiEventProcessor;
 import com.ning.billing.entitlement.engine.core.Engine;
 import com.ning.billing.entitlement.engine.core.EventNotifier;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
@@ -53,10 +52,6 @@ public class EntitlementModule extends AbstractModule {
         bind(EntitlementConfig.class).toInstance(config);
     }
 
-    protected void installApiEventProcessor() {
-        bind(EventNotifier.class).to(DefaultApiEventProcessor.class).asEagerSingleton();
-    }
-
     protected void installEntitlementDao() {
         bind(EntitlementDao.class).to(EntitlementSqlDao.class).asEagerSingleton();
     }
@@ -77,7 +72,6 @@ public class EntitlementModule extends AbstractModule {
     protected void configure() {
         installConfig();
         installClock();
-        installApiEventProcessor();
         installEntitlementDao();
         installEntitlementCore();
     }
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
index 4540ad8..55ad7f4 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
@@ -14,24 +14,9 @@ CREATE TABLE events (
     plist_name varchar(64) DEFAULT NULL,
     current_version int(11) DEFAULT 1,
     is_active bool DEFAULT 1,
-    processing_owner char(36) DEFAULT NULL,
-    processing_available_dt datetime DEFAULT NULL,
-    processing_state varchar(14) DEFAULT 'AVAILABLE',
     PRIMARY KEY(id)
 ) ENGINE=innodb;
 
-DROP TABLE IF EXISTS claimed_events;
-CREATE TABLE claimed_events (
-    id int(11) unsigned NOT NULL AUTO_INCREMENT,    
-    sequence_id int(11) unsigned NOT NULL,    
-    owner_id char(36) NOT NULL,
-    hostname varchar(64) NOT NULL,
-    claimed_dt datetime NOT NULL,
-    event_id char(36) NOT NULL,
-    PRIMARY KEY(id)
-) ENGINE=innodb;
-
-
 DROP TABLE IF EXISTS subscriptions;
 CREATE TABLE subscriptions (
     id char(36) NOT NULL,
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
index 31e9eaf..704e2c7 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
@@ -1,8 +1,8 @@
 group EventSqlDao;
 
-getReadyEvents(now, max) ::= <<
-    select
-      event_id
+getEventById(event_id) ::= <<
+  select
+     event_id
       , event_type
       , user_type
       , created_dt
@@ -14,48 +14,11 @@ getReadyEvents(now, max) ::= <<
       , phase_name
       , plist_name
       , current_version
-      , is_active
-      , processing_owner
-      , processing_available_dt
-      , processing_state
-    from events
-    where
-      effective_dt \<= :now
-      and is_active = 1
-      and processing_state != 'PROCESSED'
-      and (processing_owner IS NULL OR processing_available_dt \<= :now)
-    order by
-      effective_dt asc
-      , created_dt asc
-      , requested_dt asc
-      , id asc
-    limit :max
-    ;
->>
-
-claimEvent(owner, next_available, event_id, now) ::= <<
-    update events
-    set
-      processing_owner = :owner
-      , processing_available_dt = :next_available
-      , processing_state = 'IN_PROCESSING'
-    where
-      event_id = :event_id
-      and is_active = 1
-      and processing_state != 'PROCESSED'
-      and (processing_owner IS NULL OR processing_available_dt \<= :now)
-    ;
->>
-
-clearEvent(event_id, owner) ::= <<
-    update events
-    set
-      processing_owner = NULL
-      , processing_state = 'PROCESSED'
-    where
+      , is_active  
+  from events
+  where
       event_id = :event_id
-      and processing_owner = :owner
-    ;
+  ;
 >>
 
 insertEvent() ::= <<
@@ -73,9 +36,6 @@ insertEvent() ::= <<
       , plist_name
       , current_version
       , is_active
-      , processing_owner
-      , processing_available_dt
-      , processing_state
     ) values (
       :event_id
       , :event_type
@@ -90,9 +50,6 @@ insertEvent() ::= <<
       , :plist_name
       , :current_version
       , :is_active
-      , :processing_owner
-      , :processing_available_dt
-      , :processing_state
     );   
 >>
 
@@ -103,22 +60,6 @@ removeEvents(subscription_id) ::= <<
     ;
 >>
 
-insertClaimedHistory(sequence_id, owner_id, hostname, claimed_dt, event_id) ::= <<
-    insert into claimed_events (
-        sequence_id
-        , owner_id
-        , hostname
-        , claimed_dt
-        , event_id
-      ) values (
-        :sequence_id
-        , :owner_id
-        , :hostname
-        , :claimed_dt
-        , :event_id
-      );
->>
-
 unactiveEvent(event_id, now) ::= <<
     update events
     set
@@ -154,9 +95,6 @@ getFutureActiveEventForSubscription(subscription_id, now) ::= <<
       , plist_name
       , current_version
       , is_active
-      , processing_owner
-      , processing_available_dt
-      , processing_state    
     from events
     where
       subscription_id = :subscription_id
@@ -185,9 +123,6 @@ getEventsForSubscription(subscription_id) ::= <<
       , plist_name
       , current_version
       , is_active
-      , processing_owner
-      , processing_available_dt
-      , processing_state       
     from events
     where
       subscription_id = :subscription_id
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java
index 187c080..87491c7 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java
@@ -32,7 +32,7 @@ public class TestUserApiCancelSql extends TestUserApiCancel {
         return Guice.createInjector(Stage.DEVELOPMENT, new MockEngineModuleSql());
     }
 
-    @Test(enabled= true, groups={"stress"})
+    @Test(enabled= false, groups={"stress"})
     public void stressTest() {
         for (int i = 0; i < MAX_STRESS_ITERATIONS; i++) {
             cleanupTest();
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
index dc6567f..51c3d8b 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
@@ -198,6 +198,15 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.addDeltaFromReality(currentPhase.getDuration());
             DateTime futureNow = clock.getUTCNow();
+
+            /*
+            try {
+                Thread.sleep(1000 * 3000);
+            } catch (Exception e) {
+
+            }
+            */
+
             assertTrue(futureNow.isAfter(nextExpectedPhaseChange));
             assertTrue(testListener.isCompleted(3000));
 
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java
index 74ac1e7..3623da2 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java
@@ -17,5 +17,6 @@
 package com.ning.billing.entitlement.engine.dao;
 
 public interface MockEntitlementDao {
+
     public void reset();
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index 7fc8342..86e458f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -31,16 +31,26 @@ import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionFactory;
 
 import com.ning.billing.entitlement.api.user.SubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.engine.core.Engine;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
-import com.ning.billing.entitlement.events.EventLifecycle.EventLifecycleState;
 import com.ning.billing.entitlement.events.user.ApiEvent;
 import com.ning.billing.entitlement.events.user.ApiEventType;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationLifecycle;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlementDao {
 
@@ -52,15 +62,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
     private final Clock clock;
     private final EntitlementConfig config;
     private final SubscriptionFactory factory;
-
-
+    private final NotificationQueueService notificationQueueService;
 
     @Inject
-    public MockEntitlementDaoMemory(Clock clock, EntitlementConfig config, SubscriptionFactory factory) {
+    public MockEntitlementDaoMemory(Clock clock, EntitlementConfig config, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
         super();
         this.clock = clock;
         this.config = config;
         this.factory = factory;
+        this.notificationQueueService = notificationQueueService;
         this.bundles = new ArrayList<SubscriptionBundle>();
         this.subscriptions = new ArrayList<Subscription>();
         this.events = new TreeSet<EntitlementEvent>();
@@ -138,6 +148,14 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
 
         synchronized(events) {
             events.addAll(initalEvents);
+            for (final EntitlementEvent cur : initalEvents) {
+                recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
+                    @Override
+                    public String toString() {
+                        return cur.getId().toString();
+                    }
+                });
+            }
         }
         Subscription updatedSubscription = buildSubscription(subscription);
         subscriptions.add(updatedSubscription);
@@ -174,7 +192,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
             List<EntitlementEvent> results = new LinkedList<EntitlementEvent>();
             for (EntitlementEvent cur : events) {
                 if (cur.isActive() &&
-                        cur.getProcessingState() == EventLifecycleState.AVAILABLE &&
+                        cur.getEffectiveDate().isAfter(clock.getUTCNow()) &&
                             cur.getSubscriptionId().equals(subscriptionId)) {
                     results.add(cur);
                 }
@@ -202,39 +220,6 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
     }
 
 
-    @Override
-    public List<EntitlementEvent> getEventsReady(UUID ownerId, int sequenceId) {
-        synchronized(events) {
-            List<EntitlementEvent> readyList = new LinkedList<EntitlementEvent>();
-            for (EntitlementEvent cur : events) {
-                if (cur.isAvailableForProcessing(clock.getUTCNow())) {
-
-                    if (cur.getOwner() != null) {
-                        log.warn(String.format("EventProcessor %s stealing event %s from %s", ownerId, cur, cur.getOwner()));
-                    }
-                    cur.setOwner(ownerId);
-                    cur.setNextAvailableDate(clock.getUTCNow().plus(config.getDaoClaimTimeMs()));
-                    cur.setProcessingState(EventLifecycleState.IN_PROCESSING);
-                    readyList.add(cur);
-                }
-            }
-            Collections.sort(readyList);
-            return readyList;
-        }
-    }
-
-    @Override
-    public void clearEventsReady(UUID ownerId, Collection<EntitlementEvent> cleared) {
-        synchronized(events) {
-            for (EntitlementEvent cur : cleared) {
-                if (cur.getOwner().equals(ownerId)) {
-                    cur.setProcessingState(EventLifecycleState.PROCESSED);
-                } else {
-                    log.warn(String.format("EventProcessor %s trying to clear event %s that it does not own", ownerId, cur));
-                }
-            }
-        }
-    }
 
     private Subscription buildSubscription(SubscriptionData in) {
         return factory.createSubscription(new SubscriptionBuilder(in), getEventsForSubscription(in.getId()));
@@ -272,12 +257,26 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
             cancelNextChangeEvent(subscriptionId);
             cancelNextPhaseEvent(subscriptionId);
             events.addAll(changeEvents);
+            for (final EntitlementEvent cur : changeEvents) {
+                recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
+                    @Override
+                    public String toString() {
+                        return cur.getId().toString();
+                    }
+                });
+            }
         }
     }
 
-    private void insertEvent(EntitlementEvent event) {
+    private void insertEvent(final EntitlementEvent event) {
         synchronized(events) {
             events.add(event);
+            recordFutureNotificationFromTransaction(null, event.getEffectiveDate(), new NotificationKey() {
+                @Override
+                public String toString() {
+                    return event.getId().toString();
+                }
+            });
         }
     }
 
@@ -298,10 +297,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
                     continue;
                 }
                 if (cur.getType() == EventType.PHASE &&
-                        cur.getProcessingState() == EventLifecycleState.AVAILABLE) {
+                        cur.getEffectiveDate().isAfter(clock.getUTCNow())) {
                     cur.deactivate();
                     break;
                 }
+
             }
         }
     }
@@ -319,7 +319,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
                 }
                 if (cur.getType() == EventType.API_USER &&
                         ApiEventType.CHANGE == ((ApiEvent) cur).getEventType() &&
-                        cur.getProcessingState() == EventLifecycleState.AVAILABLE) {
+                        cur.getEffectiveDate().isAfter(clock.getUTCNow())) {
                     cur.deactivate();
                     break;
                 }
@@ -364,8 +364,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
                 SubscriptionBundleData bundleData = curBundle.getData();
                 for (SubscriptionMigrationData curSubscription : curBundle.getSubscriptions()) {
                     SubscriptionData subData = curSubscription.getData();
-                    for (EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
+                    for (final EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
                         events.add(curEvent);
+                        recordFutureNotificationFromTransaction(null, curEvent.getEffectiveDate(), new NotificationKey() {
+                            @Override
+                            public String toString() {
+                                return curEvent.getId().toString();
+                            }
+                        });
+
                     }
                     subscriptions.add(subData);
                 }
@@ -393,4 +400,26 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
         }
 
     }
+
+    @Override
+    public EntitlementEvent getEventById(UUID eventId) {
+        synchronized(events) {
+            for (EntitlementEvent cur : events) {
+                if (cur.getId().equals(eventId)) {
+                    return cur;
+                }
+            }
+        }
+        return null;
+    }
+
+    private void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime effectiveDate, final NotificationKey notificationKey) {
+        try {
+            NotificationQueue subscritionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+                Engine.NOTIFICATION_QUEUE_NAME);
+            subscritionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
+        } catch (NoSuchNotificationQueue e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
index 2ebdea9..503fd1a 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
@@ -20,6 +20,8 @@ import com.google.inject.Inject;
 import com.ning.billing.config.EntitlementConfig;
 import com.ning.billing.entitlement.api.user.SubscriptionFactory;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
@@ -32,11 +34,12 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
     private final ResetSqlDao resetDao;
 
     @Inject
-    public MockEntitlementDaoSql(DBI dbi, Clock clock, EntitlementConfig config, SubscriptionFactory factory) {
-        super(dbi, clock, config, factory);
+    public MockEntitlementDaoSql(DBI dbi, Clock clock, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
+        super(dbi, clock, factory, notificationQueueService);
         this.resetDao = dbi.onDemand(ResetSqlDao.class);
     }
 
+
     @Override
     public void reset() {
         resetDao.inTransaction(new Transaction<Void, ResetSqlDao>() {
@@ -45,9 +48,10 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
             public Void inTransaction(ResetSqlDao dao, TransactionStatus status)
                     throws Exception {
                 resetDao.resetEvents();
-                resetDao.resetClaimedEvents();
                 resetDao.resetSubscriptions();
                 resetDao.resetBundles();
+                resetDao.resetClaimedNotifications();
+                resetDao.resetNotifications();
                 return null;
             }
         });
@@ -58,14 +62,17 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
         @SqlUpdate("truncate table events")
         public void resetEvents();
 
-        @SqlUpdate("truncate table claimed_events")
-        public void resetClaimedEvents();
-
         @SqlUpdate("truncate table subscriptions")
         public void resetSubscriptions();
 
         @SqlUpdate("truncate table bundles")
         public void resetBundles();
-    }
 
+        @SqlUpdate("truncate table notifications")
+        public void resetNotifications();
+
+        @SqlUpdate("truncate table claimed_notifications")
+        public void resetClaimedNotifications();
+
+    }
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
index dc422a6..85f5e0a 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
@@ -20,6 +20,7 @@ import com.ning.billing.catalog.glue.CatalogModule;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
 import com.ning.billing.util.glue.EventBusModule;
+import com.ning.billing.util.glue.NotificationQueueModule;
 
 public class MockEngineModule extends EntitlementModule {
 
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
index c6ce269..786f1e3 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
@@ -17,16 +17,12 @@
 package com.ning.billing.entitlement.glue;
 
 
-import com.ning.billing.entitlement.engine.core.EventNotifier;
-import com.ning.billing.entitlement.engine.core.MockApiEventProcessorMemory;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.engine.dao.MockEntitlementDaoMemory;
+import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
 
 public class MockEngineModuleMemory extends MockEngineModule {
-    @Override
-    protected void installApiEventProcessor() {
-        bind(EventNotifier.class).to(MockApiEventProcessorMemory.class).asEagerSingleton();
-    }
 
     @Override
     protected void installEntitlementDao() {
@@ -34,8 +30,12 @@ public class MockEngineModuleMemory extends MockEngineModule {
     }
 
 
+    private void installNotificationQueue() {
+        bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
+    }
     @Override
     protected void configure() {
         super.configure();
+        installNotificationQueue();
     }
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
index f1cd237..dbe2938 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
@@ -22,6 +22,8 @@ import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.engine.dao.MockEntitlementDaoSql;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.glue.NotificationQueueModule;
+
 import org.skife.config.ConfigurationObjectFactory;
 import org.skife.jdbi.v2.DBI;
 
@@ -47,6 +49,7 @@ public class MockEngineModuleSql extends MockEngineModule {
     @Override
     protected void configure() {
         installDBI();
+        install(new NotificationQueueModule());
         super.configure();
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 802d16b..818d831 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -97,7 +97,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             final String notificationKey = r.getString("notification_key");
             final DateTime effectiveDate = getDate(r, "effective_dt");
             final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
-            final UUID processingOwner = (r.getString("processing_owner") != null) ? UUID.fromString(r.getString("processing_owner")) : null;
+            final String processingOwner = r.getString("processing_owner");
             final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
 
             return new DefaultNotification(id, processingOwner, nextAvailableDate,
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 56a8547..2946e13 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -23,14 +23,14 @@ import org.joda.time.DateTime;
 public class DefaultNotification implements Notification {
 
     private final UUID id;
-    private final UUID owner;
+    private final String owner;
     private final DateTime nextAvailableDate;
     private final NotificationLifecycleState lifecycleState;
     private final String notificationKey;
     private final DateTime effectiveDate;
 
 
-    public DefaultNotification(UUID id, UUID owner, DateTime nextAvailableDate,
+    public DefaultNotification(UUID id, String owner, DateTime nextAvailableDate,
             NotificationLifecycleState lifecycleState,
             String notificationKey, DateTime effectiveDate) {
         super();
@@ -52,7 +52,7 @@ public class DefaultNotification implements Notification {
     }
 
     @Override
-    public UUID getOwner() {
+    public String getOwner() {
         return owner;
     }
 
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
new file mode 100644
index 0000000..80f7385
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+
+public class DefaultNotificationQueue extends NotificationQueueBase {
+
+    protected final NotificationSqlDao dao;
+
+    public DefaultNotificationQueue(final DBI dbi, final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+        super(clock, svcName, queueName, handler, config);
+        this.dao = dbi.onDemand(NotificationSqlDao.class);
+    }
+
+    @Override
+    protected void doProcessEvents(int sequenceId) {
+        List<Notification> notifications = getReadyNotifications(sequenceId);
+        for (Notification cur : notifications) {
+            nbProcessedEvents.incrementAndGet();
+            handler.handleReadyNotification(cur.getNotificationKey());
+        }
+        // If anything happens before we get to clear those notifications, somebody else will pick them up
+        clearNotifications(notifications);
+    }
+
+    @Override
+    public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
+            final DateTime futureNotificationTime, final NotificationKey notificationKey) {
+        NotificationSqlDao transactionalNotificationDao =  transactionalDao.become(NotificationSqlDao.class);
+        Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+        transactionalNotificationDao.insertNotification(notification);
+    }
+
+
+    private void clearNotifications(final Collection<Notification> cleared) {
+
+        log.debug(String.format("NotificationQueue %s clearEventsReady START cleared size = %d",
+                getFullQName(),
+                cleared.size()));
+
+        dao.inTransaction(new Transaction<Void, NotificationSqlDao>() {
+
+            @Override
+            public Void inTransaction(NotificationSqlDao transactional,
+                    TransactionStatus status) throws Exception {
+                for (Notification cur : cleared) {
+                    transactional.clearNotification(cur.getId().toString(), hostname);
+                    log.debug(String.format("NotificationQueue %s cleared events %s", getFullQName(), cur.getId()));
+                }
+                return null;
+            }
+        });
+    }
+
+    private List<Notification> getReadyNotifications(final int seqId) {
+
+        final Date now = clock.getUTCNow().toDate();
+        final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
+
+        log.debug(String.format("NotificationQueue %s getEventsReady START effectiveNow =  %s",  getFullQName(), now));
+
+        List<Notification> result = dao.inTransaction(new Transaction<List<Notification>, NotificationSqlDao>() {
+
+            @Override
+            public List<Notification> inTransaction(NotificationSqlDao transactionalDao,
+                    TransactionStatus status) throws Exception {
+
+                List<Notification> claimedNotifications = new ArrayList<Notification>();
+                List<Notification> input = transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents());
+                for (Notification cur : input) {
+                    final boolean claimed = (transactionalDao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
+                    if (claimed) {
+                        claimedNotifications.add(cur);
+                        transactionalDao.insertClaimedHistory(seqId, hostname, now, cur.getId().toString());
+                    }
+                }
+                return claimedNotifications;
+            }
+        });
+
+        for (Notification cur : result) {
+            log.debug(String.format("NotificationQueue %sclaimed events %s",
+                    getFullQName(), cur.getId()));
+            if (cur.getOwner() != null && !cur.getOwner().equals(hostname)) {
+                log.warn(String.format("NotificationQueue %s stealing notification %s from %s",
+                        getFullQName(), cur, cur.getOwner()));
+            }
+        }
+        return result;
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index 6685e05..5181113 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -16,49 +16,26 @@
 
 package com.ning.billing.util.notificationq;
 
-import java.util.Map;
-import java.util.TreeMap;
-
 import org.skife.jdbi.v2.DBI;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import com.google.inject.Inject;
 import com.ning.billing.util.clock.Clock;
 
-public class DefaultNotificationQueueService implements NotificationQueueService {
-
-    private final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
 
     private final DBI dbi;
-    private final Clock clock;
-
-    private final Map<String, NotificationQueue> queues;
 
+    @Inject
     public DefaultNotificationQueueService(final DBI dbi, final Clock clock) {
+        super(clock);
         this.dbi = dbi;
-        this.clock = clock;
-        this.queues = new TreeMap<String, NotificationQueue>();
     }
 
     @Override
-    public NotificationQueue createNotificationQueue(String svcName,
+    protected NotificationQueue createNotificationQueueInternal(String svcName,
             String queueName, NotificationQueueHandler handler,
             NotificationConfig config) {
-        if (svcName == null || queueName == null || handler == null || config == null) {
-            throw new RuntimeException("Need to specify all parameters");
-        }
-
-        String compositeName = svcName + ":" + queueName;
-        NotificationQueue result = null;
-        synchronized(queues) {
-            result = queues.get(compositeName);
-            if (result == null) {
-                result = new NotificationQueue(dbi, clock, svcName, queueName, handler, config);
-                queues.put(compositeName, result);
-            } else {
-                log.warn("Queue for svc {} and name {} already exist", svcName, queueName);
-            }
-        }
-        return result;
+        return new DefaultNotificationQueue(dbi, clock, svcName, queueName, handler, config);
     }
+
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
index 7c81108..3200424 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
@@ -16,8 +16,6 @@
 
 package com.ning.billing.util.notificationq;
 
-import java.util.UUID;
-
 import org.joda.time.DateTime;
 
 
@@ -29,17 +27,12 @@ public interface NotificationLifecycle {
         PROCESSED
     }
 
-    public UUID getOwner();
-
-    //public void setOwner(UUID owner);
+    public String getOwner();
 
     public DateTime getNextAvailableDate();
 
-    //public void setNextAvailableDate(DateTime dateTime);
-
     public NotificationLifecycleState getProcessingState();
 
-    //public void setProcessingState(NotificationLifecycleState procesingState);
 
     public boolean isAvailableForProcessing(DateTime now);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 5677335..23f0de0 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -16,260 +16,44 @@
 
 package com.ning.billing.util.notificationq;
 
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.joda.time.DateTime;
-import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.Transaction;
-import org.skife.jdbi.v2.TransactionStatus;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.ning.billing.util.Hostname;
-import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
-import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
-
-
-public class NotificationQueue {
-
-    protected final static Logger log = LoggerFactory.getLogger(NotificationQueue.class);
-
-    private static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
-    private final long STOP_WAIT_TIMEOUT_MS = 60000;
-
-    private final String svcName;
-    private final String queueName;
-    private final NotificationQueueHandler handler;
-    private final NotificationConfig config;
-    private final NotificationSqlDao dao;
-    private final Executor executor;
-    private final Clock clock;
-    private final String hostname;
-
-    private static final AtomicInteger sequenceId = new AtomicInteger();
-
-    protected AtomicLong nbProcessedEvents;
-
-    // Use this object's monitor for synchronization (no need for volatile)
-    protected boolean isProcessingEvents;
-
-    // Package visibility on purpose
-    NotificationQueue(final DBI dbi, final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
-        this.clock = clock;
-        this.svcName = svcName;
-        this.queueName = queueName;
-        this.handler = handler;
-        this.config = config;
-        this.hostname = Hostname.get();
 
-        this.dao = dbi.onDemand(NotificationSqlDao.class);
-
-        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread th = new Thread(r);
-                th.setName(NOTIFICATION_THREAD_PREFIX + svcName + "-" + queueName);
-                th.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-                    @Override
-                    public void uncaughtException(Thread t, Throwable e) {
-                        log.error("Uncaught exception for thread " + t.getName(), e);
-                    }
-                });
-                return th;
-            }
-        });
-    }
-
-    /**
-     *
-     *  Record from within a transaction the need to be called back when the notification is ready
-     *
-     * @param transactionalDao the transactionalDao
-     * @param futureNotificationTime the time at which the notificatoin is ready
-     * @param notificationKey the key for that notification
-     */
-    public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
-            final DateTime futureNotificationTime, final NotificationKey notificationKey) {
-        NotificationSqlDao transactionalNotificationDao =  transactionalDao.become(NotificationSqlDao.class);
-        Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
-        transactionalNotificationDao.insertNotification(notification);
-    }
+public interface NotificationQueue {
 
     /**
-     * Stops the queue.
-     *
-     * @see NotificationQueueHandler.completedQueueStop to be notified when the notification thread exited
-     */
-    public void stopQueue() {
-        if (config.isNotificationProcessingOff()) {
-            handler.completedQueueStop();
-            return;
-        }
-
-        synchronized(this) {
-            isProcessingEvents = false;
-            try {
-                log.info("NotificationQueue requested to stop");
-                wait(STOP_WAIT_TIMEOUT_MS);
-                log.info("NotificationQueue requested should have exited");
-            } catch (InterruptedException e) {
-                log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
-            }
-        }
-
-    }
-
-    /**
-     * Starts the queue.
-     *
-     * @see NotificationQueueHandler.completedQueueStart to be notified when the notification thread started
-     */
-    public void startQueue() {
-
-        this.isProcessingEvents = true;
-        this.nbProcessedEvents = new AtomicLong();
-
-
-        if (config.isNotificationProcessingOff()) {
-            log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
-            handler.completedQueueStart();
-            return;
-        }
-        final NotificationQueue notificationQueue = this;
-
-        executor.execute(new Runnable() {
-            @Override
-            public void run() {
-
-                log.info(String.format("NotificationQueue thread %s [%d] started",
-                        Thread.currentThread().getName(),
-                        Thread.currentThread().getId()));
-
-                // Thread is now started, notify the listener
-                handler.completedQueueStart();
-
-                try {
-                    while (true) {
-
-                        synchronized (notificationQueue) {
-                            if (!isProcessingEvents) {
-                                log.info(String.format("NotificationQueue has been requested to stop, thread  %s  [%d] stopping...",
-                                        Thread.currentThread().getName(),
-                                        Thread.currentThread().getId()));
-                                notificationQueue.notify();
-                                break;
-                            }
-                        }
-
-                        // Callback may trigger exceptions in user code so catch anything here and live with it.
-                        try {
-                            doProcessEvents(sequenceId.getAndIncrement());
-                        } catch (Exception e) {
-                            log.error(String.format("NotificationQueue thread  %s  [%d] got an exception..",
-                                    Thread.currentThread().getName(),
-                                    Thread.currentThread().getId()), e);
-                        }
-                        sleepALittle();
-                    }
-                } catch (InterruptedException e) {
-                    log.warn(Thread.currentThread().getName() + " got interrupted ", e);
-                } catch (Throwable e) {
-                    log.error(Thread.currentThread().getName() + " got an exception exiting...", e);
-                    // Just to make it really obvious in the log
-                    e.printStackTrace();
-                } finally {
-                    handler.completedQueueStop();
-                    log.info(String.format("NotificationQueue thread  %s  [%d] exited...",
-                            Thread.currentThread().getName(),
-                            Thread.currentThread().getId()));
-                }
-            }
-
-            private void sleepALittle() throws InterruptedException {
-                Thread.sleep(config.getNotificationSleepTimeMs());
-            }
-        });
-    }
-
-    private void doProcessEvents(int sequenceId) {
-        List<Notification> notifications = getReadyNotifications(sequenceId);
-        for (Notification cur : notifications) {
-            nbProcessedEvents.incrementAndGet();
-            handler.handleReadyNotification(cur.getNotificationKey());
-        }
-        // If anything happens before we get to clear those notifications, somebody else will pick them up
-        clearNotifications(notifications);
-    }
-
-    private String getFullQName() {
-        return svcName + ":" +  queueName;
-    }
-
-    private void clearNotifications(final Collection<Notification> cleared) {
-
-        log.debug(String.format("NotificationQueue %s clearEventsReady START cleared size = %d",
-                getFullQName(),
-                cleared.size()));
-
-        dao.inTransaction(new Transaction<Void, NotificationSqlDao>() {
-
-            @Override
-            public Void inTransaction(NotificationSqlDao transactional,
-                    TransactionStatus status) throws Exception {
-                for (Notification cur : cleared) {
-                    transactional.clearNotification(cur.getId().toString(), hostname);
-                    log.debug(String.format("NotificationQueue %s cleared events %s", getFullQName(), cur.getId()));
-                }
-                return null;
-            }
-        });
-    }
-
-    private List<Notification> getReadyNotifications(final int seqId) {
-
-        final Date now = clock.getUTCNow().toDate();
-        final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
-
-        log.debug(String.format("NotificationQueue %s getEventsReady START effectiveNow =  %s",  getFullQName(), now));
-
-        List<Notification> result = dao.inTransaction(new Transaction<List<Notification>, NotificationSqlDao>() {
-
-            @Override
-            public List<Notification> inTransaction(NotificationSqlDao transactionalDao,
-                    TransactionStatus status) throws Exception {
+    *
+    *  Record from within a transaction the need to be called back when the notification is ready
+    *
+    * @param transactionalDao the transactionalDao
+    * @param futureNotificationTime the time at which the notification is ready
+    * @param notificationKey the key for that notification
+    */
+   public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
+           final DateTime futureNotificationTime, final NotificationKey notificationKey);
+
+   /**
+    * This is only valid when the queue has been configured with isNotificationProcessingOff is true
+    * In which case, it will callback users for all the ready notifications.
+    *
+    */
+   public void processReadyNotification();
+
+   /**
+    * Stops the queue.
+    *
+    * @see NotificationQueueHandler.completedQueueStop to be notified when the notification thread exited
+    */
+   public void stopQueue();
+
+   /**
+    * Starts the queue.
+    *
+    * @see NotificationQueueHandler.completedQueueStart to be notified when the notification thread started
+    */
+   public void startQueue();
 
-                List<Notification> claimedNotifications = new ArrayList<Notification>();
-                List<Notification> input = transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents());
-                for (Notification cur : input) {
-                    final boolean claimed = (transactionalDao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
-                    if (claimed) {
-                        claimedNotifications.add(cur);
-                        transactionalDao.insertClaimedHistory(seqId, hostname, now, cur.getId().toString());
-                    }
-                }
-                return claimedNotifications;
-            }
-        });
 
-        for (Notification cur : result) {
-            log.debug(String.format("NotificationQueue %sclaimed events %s",
-                    getFullQName(), cur.getId()));
-            if (cur.getOwner() != null && !cur.getOwner().equals(hostname)) {
-                log.warn(String.format("NotificationQueue %s stealing notification %s from %s",
-                        getFullQName(), cur, cur.getOwner()));
-            }
-        }
-        return result;
-    }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
new file mode 100644
index 0000000..6eaf33f
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.util.Hostname;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+
+
+public abstract class NotificationQueueBase implements NotificationQueue {
+
+    protected final static Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
+
+    protected static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
+    protected final long STOP_WAIT_TIMEOUT_MS = 60000;
+
+    protected final String svcName;
+    protected final String queueName;
+    protected final NotificationQueueHandler handler;
+    protected final NotificationConfig config;
+
+    protected final Executor executor;
+    protected final Clock clock;
+    protected final String hostname;
+
+    protected static final AtomicInteger sequenceId = new AtomicInteger();
+
+    protected AtomicLong nbProcessedEvents;
+
+    // Use this object's monitor for synchronization (no need for volatile)
+    protected boolean isProcessingEvents;
+
+    // Package visibility on purpose
+    NotificationQueueBase(final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+        this.clock = clock;
+        this.svcName = svcName;
+        this.queueName = queueName;
+        this.handler = handler;
+        this.config = config;
+        this.hostname = Hostname.get();
+
+        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread th = new Thread(r);
+                th.setName(NOTIFICATION_THREAD_PREFIX + svcName + "-" + queueName);
+                th.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+                    @Override
+                    public void uncaughtException(Thread t, Throwable e) {
+                        log.error("Uncaught exception for thread " + t.getName(), e);
+                    }
+                });
+                return th;
+            }
+        });
+    }
+
+
+    @Override
+    public void processReadyNotification() {
+        // STEPH to be implemented
+    }
+
+
+    @Override
+    public void stopQueue() {
+        if (config.isNotificationProcessingOff()) {
+            handler.completedQueueStop();
+            return;
+        }
+
+        synchronized(this) {
+            isProcessingEvents = false;
+            try {
+                log.info("NotificationQueue requested to stop");
+                wait(STOP_WAIT_TIMEOUT_MS);
+                log.info("NotificationQueue requested should have exited");
+            } catch (InterruptedException e) {
+                log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
+            }
+        }
+
+    }
+
+    @Override
+    public void startQueue() {
+
+        this.isProcessingEvents = true;
+        this.nbProcessedEvents = new AtomicLong();
+
+
+        if (config.isNotificationProcessingOff()) {
+            log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
+            handler.completedQueueStart();
+            return;
+        }
+        final NotificationQueueBase notificationQueue = this;
+
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+
+                log.info(String.format("NotificationQueue thread %s [%d] started",
+                        Thread.currentThread().getName(),
+                        Thread.currentThread().getId()));
+
+                // Thread is now started, notify the listener
+                handler.completedQueueStart();
+
+                try {
+                    while (true) {
+
+                        synchronized (notificationQueue) {
+                            if (!isProcessingEvents) {
+                                log.info(String.format("NotificationQueue has been requested to stop, thread  %s  [%d] stopping...",
+                                        Thread.currentThread().getName(),
+                                        Thread.currentThread().getId()));
+                                notificationQueue.notify();
+                                break;
+                            }
+                        }
+
+                        // Callback may trigger exceptions in user code so catch anything here and live with it.
+                        try {
+                            doProcessEvents(sequenceId.getAndIncrement());
+                        } catch (Exception e) {
+                            log.error(String.format("NotificationQueue thread  %s  [%d] got an exception..",
+                                    Thread.currentThread().getName(),
+                                    Thread.currentThread().getId()), e);
+                        }
+                        sleepALittle();
+                    }
+                } catch (InterruptedException e) {
+                    log.warn(Thread.currentThread().getName() + " got interrupted ", e);
+                } catch (Throwable e) {
+                    log.error(Thread.currentThread().getName() + " got an exception exiting...", e);
+                    // Just to make it really obvious in the log
+                    e.printStackTrace();
+                } finally {
+                    handler.completedQueueStop();
+                    log.info(String.format("NotificationQueue thread  %s  [%d] exited...",
+                            Thread.currentThread().getName(),
+                            Thread.currentThread().getId()));
+                }
+            }
+
+            private void sleepALittle() throws InterruptedException {
+                Thread.sleep(config.getNotificationSleepTimeMs());
+            }
+        });
+    }
+
+
+    protected String getFullQName() {
+        return svcName + ":" +  queueName;
+    }
+
+    protected abstract void doProcessEvents(int sequenceId);
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index d1544c8..a18906b 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.util.notificationq;
 
+import java.util.NoSuchElementException;
+
 
 public interface NotificationQueueService {
 
@@ -37,6 +39,22 @@ public interface NotificationQueueService {
         public void completedQueueStop();
     }
 
+    public static final class NotficationQueueAlreadyExists extends Exception {
+        private static final long serialVersionUID = 1541281L;
+
+        public NotficationQueueAlreadyExists(String msg) {
+            super(msg);
+        }
+    }
+
+    public static final class NoSuchNotificationQueue extends Exception {
+        private static final long serialVersionUID = 1541281L;
+
+        public NoSuchNotificationQueue(String msg) {
+            super(msg);
+        }
+    }
+
     /**
      * Creates a new NotificationQueue for a given associated with the given service and queueName
      *
@@ -46,6 +64,23 @@ public interface NotificationQueueService {
      * @param config the notification queue configuration
      *
      * @return a new NotificationQueue
+     *
+     * @throws NotficationQueueAlreadyExists is the queue associated with that service and name already exits
+     *
      */
-    NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config);
+    NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config)
+        throws NotficationQueueAlreadyExists;
+
+    /**
+     * Retrieves an already created NotificationQueue by service and name if it exists
+     *
+     * @param svcName
+     * @param queueName
+     * @return
+     *
+     * @throws NoSuchNotificationQueue if queue does not exist
+     */
+    NotificationQueue getNotificationQueue(final String svcName, final String queueName)
+        throws NoSuchNotificationQueue;
+
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
new file mode 100644
index 0000000..a4dc64e
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.util.clock.Clock;
+
+public abstract class NotificationQueueServiceBase implements NotificationQueueService {
+
+    protected final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+
+    protected final Clock clock;
+
+    private final Map<String, NotificationQueue> queues;
+
+    @Inject
+    public NotificationQueueServiceBase(final Clock clock) {
+
+        this.clock = clock;
+        this.queues = new TreeMap<String, NotificationQueue>();
+    }
+
+    @Override
+    public NotificationQueue createNotificationQueue(String svcName,
+            String queueName, NotificationQueueHandler handler,
+            NotificationConfig config) throws NotficationQueueAlreadyExists {
+        if (svcName == null || queueName == null || handler == null || config == null) {
+            throw new RuntimeException("Need to specify all parameters");
+        }
+
+        String compositeName = getCompositeName(svcName, queueName);
+        NotificationQueue result = null;
+        synchronized(queues) {
+            result = queues.get(compositeName);
+            if (result != null) {
+                throw new NotficationQueueAlreadyExists(String.format("Queue for svc %s and name %s already exist",
+                        svcName, queueName));
+            }
+            result = createNotificationQueueInternal(svcName, queueName, handler, config);
+            queues.put(compositeName, result);
+        }
+        return result;
+    }
+
+    @Override
+    public NotificationQueue getNotificationQueue(String svcName,
+            String queueName) throws NoSuchNotificationQueue {
+
+        NotificationQueue result = null;
+        String compositeName = getCompositeName(svcName, queueName);
+        synchronized(queues) {
+            result = queues.get(compositeName);
+            if (result == null) {
+                throw new NoSuchNotificationQueue(String.format("Queue for svc %s and name %s does not exist",
+                        svcName, queueName));
+            }
+        }
+        return result;
+    }
+
+
+    protected abstract NotificationQueue createNotificationQueueInternal(String svcName,
+            String queueName, NotificationQueueHandler handler,
+            NotificationConfig config);
+
+
+    private String getCompositeName(String svcName, String queueName) {
+        return svcName + ":" + queueName;
+    }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
new file mode 100644
index 0000000..11721c8
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
+
+
+    private TreeSet<Notification> notifications;
+
+    public MockNotificationQueue(final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+        super(clock, svcName, queueName, handler, config);
+        notifications = new TreeSet<Notification>(new Comparator<Notification>() {
+            @Override
+            public int compare(Notification o1, Notification o2) {
+                if (o1.getEffectiveDate().equals(o2.getEffectiveDate())) {
+                    return o1.getNotificationKey().compareTo(o2.getNotificationKey());
+                } else {
+                    return o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
+                }
+            }
+        });
+    }
+
+    @Override
+    public void recordFutureNotificationFromTransaction(
+            Transmogrifier transactionalDao, DateTime futureNotificationTime,
+            NotificationKey notificationKey) {
+        Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+        synchronized(notifications) {
+            notifications.add(notification);
+        }
+    }
+
+    @Override
+    protected void doProcessEvents(int sequenceId) {
+
+        List<Notification> processedNotifications = new ArrayList<Notification>();
+        synchronized(notifications) {
+            Iterator<Notification> it = notifications.iterator();
+            while (it.hasNext()) {
+                Notification cur = it.next();
+                if (cur.isAvailableForProcessing(clock.getUTCNow())) {
+                    handler.handleReadyNotification(cur.getNotificationKey());
+                    DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+                    it.remove();
+                    processedNotifications.add(processedNotification);
+                }
+            }
+            if (processedNotifications.size() > 0) {
+                notifications.addAll(processedNotifications);
+            }
+        }
+    }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 7129351..2e3bb3c 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -116,7 +116,7 @@ public class TestNotificationQueue {
     public void testSimpleQueueDisabled() throws InterruptedException {
 
         final TestStartStop testStartStop = new TestStartStop(false, false);
-        NotificationQueue queue = new NotificationQueue(dbi, clock, "test-svc", "dead",
+        DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "dead",
                 new NotificationQueueHandler() {
                     @Override
                     public void handleReadyNotification(String notificationKey) {
@@ -134,7 +134,7 @@ public class TestNotificationQueue {
 
         executeTest(testStartStop, queue, new WithTest() {
             @Override
-            public void test(final NotificationQueue readyQueue) throws InterruptedException {
+            public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
                 // Do nothing
             }
         });
@@ -153,7 +153,7 @@ public class TestNotificationQueue {
         final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
 
         final TestStartStop testStartStop = new TestStartStop(false, false);
-        NotificationQueue queue = new NotificationQueue(dbi, clock, "test-svc", "foo",
+        DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
                 new NotificationQueueHandler() {
                     @Override
                     public void handleReadyNotification(String notificationKey) {
@@ -176,7 +176,7 @@ public class TestNotificationQueue {
 
         executeTest(testStartStop, queue, new WithTest() {
             @Override
-            public void test(final NotificationQueue readyQueue) throws InterruptedException {
+            public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
 
                 final UUID key = UUID.randomUUID();
                 final DummyObject obj = new DummyObject("foo", key);
@@ -229,7 +229,7 @@ public class TestNotificationQueue {
         final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
 
         final TestStartStop testStartStop = new TestStartStop(false, false);
-        NotificationQueue queue = new NotificationQueue(dbi, clock, "test-svc", "many",
+        DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
                 new NotificationQueueHandler() {
                     @Override
                     public void handleReadyNotification(String notificationKey) {
@@ -252,7 +252,7 @@ public class TestNotificationQueue {
 
         executeTest(testStartStop, queue, new WithTest() {
             @Override
-            public void test(final NotificationQueue readyQueue) throws InterruptedException {
+            public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
 
                 final DateTime now = clock.getUTCNow();
                 final int MAX_NOTIFICATIONS = 100;
@@ -388,11 +388,11 @@ public class TestNotificationQueue {
     }
 
     private interface WithTest {
-        public void test(NotificationQueue readyQueue) throws InterruptedException;
+        public void test(DefaultNotificationQueue readyQueue) throws InterruptedException;
     }
 
     private void executeTest(final TestStartStop testStartStop,
-            NotificationQueue queue, WithTest test) throws InterruptedException{
+            DefaultNotificationQueue queue, WithTest test) throws InterruptedException{
 
         queue.startQueue();
         boolean started = testStartStop.waitForStartComplete(3000);