killbill-memoizeit

Initial rework for notificationQ

11/19/2012 5:02:45 PM

Changes

Details

diff --git a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
index 7e2d9cf..410fd97 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
@@ -59,6 +59,7 @@ import com.ning.billing.util.callcontext.DefaultCallContextFactory;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.ClockMock;
 import com.ning.billing.util.clock.DefaultClock;
+import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
 import com.ning.billing.util.svcapi.account.AccountInternalApi;
 import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
@@ -75,6 +76,18 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
     private EntitlementUserApi entitlementUserApi;
     private BusinessTagDao tagDao;
 
+
+    private NotificationConfig config = new NotificationConfig() {
+        @Override
+        public boolean isNotificationProcessingOff() {
+            return false;
+        }
+        @Override
+        public long getSleepTimeMs() {
+            return 3000;
+        }
+    };
+
     @BeforeMethod(groups = "slow")
     public void setUp() throws Exception {
         final IDBI dbi = helper.getDBI();
@@ -92,7 +105,7 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
         accountUserApi = new DefaultAccountUserApi(callContextFactory, internalCallContextFactory, accountDao, accountEmailDao);
         final CatalogService catalogService = new DefaultCatalogService(Mockito.mock(CatalogConfig.class), Mockito.mock(VersionedCatalogLoader.class));
         final AddonUtils addonUtils = new AddonUtils(catalogService);
-        final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, internalCallContextFactory);
+        final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, config, internalCallContextFactory);
         final EntitlementDao entitlementDao = new AuditedEntitlementDao(dbi, clock, addonUtils, notificationQueueService, eventBus, catalogService);
         final PlanAligner planAligner = new PlanAligner(catalogService);
         final DefaultSubscriptionApiService apiService = new DefaultSubscriptionApiService(clock, entitlementDao, catalogService, planAligner, internalCallContextFactory);
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
index 81c832a..eddf4b5 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
@@ -63,6 +63,8 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
     private final InternalCallContextFactory internalCallContextFactory;
     private final AccountInternalApi accountApi;
 
+    private volatile boolean isStarted;
+
     private static final class EventBusDelegate extends EventBus {
 
         public EventBusDelegate(final String busName) {
@@ -90,10 +92,12 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
 
     public void start() {
         startQueue();
+        isStarted = true;
     }
 
     public void stop() {
         stopQueue();
+        isStarted = false;
     }
 
     @Override
@@ -119,6 +123,11 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
         return result;
     }
 
+    @Override
+    public boolean isStarted() {
+        return isStarted;
+    }
+
     private final UUID getAccountIdFromRecordId(final Long recordId, final InternalCallContext context) {
         try {
             final Account account = accountApi.getAccountByRecordId(recordId, context);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
index 081132e..e035ba0 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2010-2011 Ning, Inc.
+ * Copyright 2010-2012 Ning, Inc.
  *
  * Ning licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
index c84d1b0..fd6154d 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
@@ -58,6 +58,8 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
     private final String hostname;
     private final InternalCallContextFactory internalCallContextFactory;
 
+    private volatile boolean isStarted;
+
     private static final class EventBusDelegate extends EventBus {
 
         public EventBusDelegate(final String busName) {
@@ -94,16 +96,19 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
         this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
         this.hostname = Hostname.get();
         this.internalCallContextFactory = internalCallContextFactory;
+        this.isStarted = false;
     }
 
     @Override
     public void start() {
         startQueue();
+        isStarted = true;
     }
 
     @Override
     public void stop() {
         stopQueue();
+        isStarted = false;
     }
 
     @Override
@@ -129,6 +134,11 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
         return result;
     }
 
+    @Override
+    public boolean isStarted() {
+        return isStarted;
+    }
+
     private List<BusEventEntry> getNextBusEvent(final InternalCallContext context) {
         final Date now = clock.getUTCNow().toDate();
         final Date nextAvailable = clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 811846e..4fe0000 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -55,7 +55,6 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
     public List<Notification> getReadyNotifications(@Bind("now") Date now,
                                                     @Bind("owner") String owner,
                                                     @Bind("max") int max,
-                                                    @Bind("queueName") String queueName,
                                                     @InternalTenantContextBinder final InternalTenantContext context);
 
     @SqlQuery
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index b20c1a7..461410d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -17,68 +17,55 @@
 package com.ning.billing.util.notificationq;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 import java.util.UUID;
 
-import javax.annotation.Nullable;
-
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.billing.util.Hostname;
 import com.ning.billing.util.config.NotificationConfig;
-import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
-import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
-public class DefaultNotificationQueue extends NotificationQueueBase {
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class DefaultNotificationQueue implements NotificationQueue {
 
     private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
 
     private final NotificationSqlDao dao;
-    private final InternalCallContextFactory internalCallContextFactory;
+    private final String hostname;
+
+    private final String svcName;
+    private final String queueName;
+
+    private final ObjectMapper objectMapper;
+
+    private final NotificationQueueHandler handler;
 
-    public DefaultNotificationQueue(final IDBI dbi, final Clock clock, final String svcName, final String queueName,
-                                    final NotificationQueueHandler handler, final NotificationConfig config,
-                                    final InternalCallContextFactory internalCallContextFactory) {
-        super(clock, svcName, queueName, handler, config);
+    private final NotificationQueueService notificationQueueService;
+
+    private volatile boolean isStarted;
+
+    public DefaultNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler,
+                                    final IDBI dbi, final NotificationQueueService notificationQueueService) {
+        this.svcName = svcName;
+        this.queueName = queueName;
+        this.handler = handler;
         this.dao = dbi.onDemand(NotificationSqlDao.class);
-        this.internalCallContextFactory = internalCallContextFactory;
+        this.hostname = Hostname.get();
+        this.notificationQueueService = notificationQueueService;
+        this.objectMapper = new ObjectMapper();
     }
 
-    @Override
-    public int doProcessEvents() {
-        logDebug("ENTER doProcessEvents");
-        // Finding and claiming notifications is not done per tenant (yet?)
-        final List<Notification> notifications = getReadyNotifications(createCallContext(null, null));
-        if (notifications.size() == 0) {
-            logDebug("EXIT doProcessEvents");
-            return 0;
-        }
-
-        logDebug("START processing %d events at time %s", notifications.size(), getClock().getUTCNow().toDate());
-
-        int result = 0;
-        for (final Notification cur : notifications) {
-            getNbProcessedEvents().incrementAndGet();
-            logDebug("handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-            final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
-            getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
-            result++;
-            clearNotification(cur, createCallContext(cur.getTenantRecordId(), cur.getAccountRecordId()));
-            logDebug("done handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-        }
-
-        return result;
-    }
+
 
     @Override
     public void recordFutureNotification(final DateTime futureNotificationTime,
@@ -104,50 +91,12 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
                                                   final NotificationSqlDao thisDao,
                                                   final InternalCallContext context) throws IOException {
         final String json = objectMapper.writeValueAsString(notificationKey);
-        final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json,
+        final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.getClass().getName(), json,
                                                                   accountId, futureNotificationTime, context.getAccountRecordId(), context.getTenantRecordId());
         thisDao.insertNotification(notification, context);
     }
 
-    private void clearNotification(final Notification cleared, final InternalCallContext context) {
-        dao.clearNotification(cleared.getId().toString(), getHostname(), context);
-    }
-
-    private List<Notification> getReadyNotifications(final InternalCallContext context) {
-        final Date now = getClock().getUTCNow().toDate();
-        final Date nextAvailable = getClock().getUTCNow().plus(CLAIM_TIME_MS).toDate();
-        final List<Notification> input = dao.getReadyNotifications(now, getHostname(), CLAIM_TIME_MS, getFullQName(), context);
-
-        final List<Notification> claimedNotifications = new ArrayList<Notification>();
-        for (final Notification cur : input) {
-            logDebug("about to claim notification %s,  key = %s for time %s",
-                     cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-
-            final boolean claimed = (dao.claimNotification(getHostname(), nextAvailable, cur.getId().toString(), now, context) == 1);
-            logDebug("claimed notification %s, key = %s for time %s result = %s",
-                     cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate(), claimed);
-
-            if (claimed) {
-                claimedNotifications.add(cur);
-                dao.insertClaimedHistory(getHostname(), now, cur.getId().toString(), context);
-            }
-        }
-
-        for (final Notification cur : claimedNotifications) {
-            if (cur.getOwner() != null && !cur.getOwner().equals(getHostname())) {
-                log.warn("NotificationQueue {} stealing notification {} from {}", new Object[]{getFullQName(), cur, cur.getOwner()});
-            }
-        }
-
-        return claimedNotifications;
-    }
 
-    private void logDebug(final String format, final Object... args) {
-        if (log.isDebugEnabled()) {
-            final String realDebug = String.format(format, args);
-            log.debug(String.format("Thread %d [queue = %s] %s", Thread.currentThread().getId(), getFullQName(), realDebug));
-        }
-    }
 
     @Override
     public void removeNotificationsByKey(final NotificationKey notificationKey, final InternalCallContext context) {
@@ -164,7 +113,41 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
         dao.removeNotification(notificationId.toString(), context);
     }
 
-    private InternalCallContext createCallContext(@Nullable final Long tenantRecordId, @Nullable final Long accountRecordId) {
-        return internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "NotificationQueue", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+    @Override
+    public String getFullQName() {
+        return NotificationQueueServiceBase.getCompositeName(svcName, queueName);
+    }
+
+    @Override
+    public String getServiceName() {
+        return svcName;
     }
+
+    @Override
+    public String getQueueName() {
+        return queueName;
+    }
+
+    @Override
+    public NotificationQueueHandler getHandler() {
+        return handler;
+    }
+
+    @Override
+    public void startQueue() {
+        notificationQueueService.startQueue();
+        isStarted = true;
+    }
+
+    @Override
+    public void stopQueue() {
+        notificationQueueService.stopQueue();
+        isStarted = false;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return isStarted;
+    }
+
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index 350a140..277cadf 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -21,25 +21,26 @@ import org.skife.jdbi.v2.IDBI;
 import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
 import com.google.inject.Inject;
 
 public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
+
     private final IDBI dbi;
-    private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
-    public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final InternalCallContextFactory internalCallContextFactory) {
-        super(clock);
+    public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final NotificationConfig config,
+                                           final InternalCallContextFactory internalCallContextFactory) {
+        super(clock, config, dbi, internalCallContextFactory);
         this.dbi = dbi;
-        this.internalCallContextFactory = internalCallContextFactory;
     }
 
+
     @Override
     protected NotificationQueue createNotificationQueueInternal(final String svcName,
                                                                 final String queueName,
-                                                                final NotificationQueueHandler handler,
-                                                                final NotificationConfig config) {
-        return new DefaultNotificationQueue(dbi, clock, svcName, queueName, handler, config, internalCallContextFactory);
+                                                                final NotificationQueueHandler handler) {
+        return new DefaultNotificationQueue(svcName, queueName, handler, dbi, this);
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 1444425..f5ab4bb 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -24,6 +24,7 @@ import org.joda.time.DateTime;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 import com.ning.billing.util.callcontext.InternalCallContext;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.queue.QueueLifecycle;
 
 public interface NotificationQueue extends QueueLifecycle {
@@ -67,13 +68,6 @@ public interface NotificationQueue extends QueueLifecycle {
     public void removeNotification(final UUID notificationId,
                                    final InternalCallContext context);
 
-    /**
-     * This is only valid when the queue has been configured with isNotificationProcessingOff is true
-     * In which case, it will callback users for all the ready notifications.
-     *
-     * @return the number of entries we processed
-     */
-    public int processReadyNotification();
 
     /**
      * @return the name of that queue
@@ -89,4 +83,6 @@ public interface NotificationQueue extends QueueLifecycle {
      * @return the queue name associated
      */
     public String getQueueName();
+
+    public NotificationQueueHandler getHandler();
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index d6df6dd..f5f493d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -19,8 +19,9 @@ package com.ning.billing.util.notificationq;
 import org.joda.time.DateTime;
 
 import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.queue.QueueLifecycle;
 
-public interface NotificationQueueService {
+public interface NotificationQueueService extends QueueLifecycle {
 
     public interface NotificationQueueHandler {
 
@@ -89,8 +90,7 @@ public interface NotificationQueueService {
             throws NoSuchNotificationQueue;
 
     /**
-     * @param services
      * @return the number of processed notifications
      */
-    public int triggerManualQueueProcessing(final String[] services, final Boolean keepRunning);
+    public int triggerManualQueueProcessing(final Boolean keepRunning);
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 8763192..4ad413c 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -22,26 +22,25 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.skife.jdbi.v2.IDBI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
 import com.google.common.base.Joiner;
 import com.google.inject.Inject;
 
-public abstract class NotificationQueueServiceBase implements NotificationQueueService {
-    protected final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+public abstract class NotificationQueueServiceBase extends NotificationQueueDispatcher implements NotificationQueueService {
 
-    protected final Clock clock;
-
-    private final Map<String, NotificationQueue> queues;
 
     @Inject
-    public NotificationQueueServiceBase(final Clock clock) {
-        this.clock = clock;
-        this.queues = new TreeMap<String, NotificationQueue>();
+    public NotificationQueueServiceBase(final Clock clock, final NotificationConfig config, final IDBI dbi,
+                                        final InternalCallContextFactory internalCallContextFactory) {
+        super(clock, config, dbi,  internalCallContextFactory);
     }
 
     @Override
@@ -61,7 +60,7 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
                 throw new NotificationQueueAlreadyExists(String.format("Queue for svc %s and name %s already exist",
                                                                        svcName, queueName));
             }
-            result = createNotificationQueueInternal(svcName, queueName, handler, config);
+            result = createNotificationQueueInternal(svcName, queueName, handler);
             queues.put(compositeName, result);
         }
         return result;
@@ -96,33 +95,29 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
         }
     }
 
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("NotificationQueueServiceBase");
+        sb.append("{queues=").append(queues);
+        sb.append('}');
+        return sb.toString();
+    }
+
+
     //
     // Test ONLY
     //
     @Override
-    public int triggerManualQueueProcessing(final String[] services, final Boolean keepRunning) {
+    public int triggerManualQueueProcessing(final Boolean keepRunning) {
 
         int result = 0;
 
-        List<NotificationQueue> manualQueues = null;
-        if (services == null) {
-            manualQueues = new ArrayList<NotificationQueue>(queues.values());
-        } else {
-            final Joiner join = Joiner.on(",");
-            join.join(services);
-
-            log.info("Trigger manual processing for services {} ", join.toString());
-            manualQueues = new LinkedList<NotificationQueue>();
-            synchronized (queues) {
-                for (final String svc : services) {
-                    addQueuesForService(manualQueues, svc);
-                }
-            }
-        }
+        List<NotificationQueue> manualQueues = new ArrayList<NotificationQueue>(queues.values());
         for (final NotificationQueue cur : manualQueues) {
             int processedNotifications = 0;
             do {
-                processedNotifications = cur.processReadyNotification();
+                doProcessEventsWithLimit(1);
                 log.info("Got {} results from queue {}", processedNotifications, cur.getFullQName());
                 result += processedNotifications;
             } while (keepRunning && processedNotifications > 0);
@@ -130,28 +125,6 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
         return result;
     }
 
-    private void addQueuesForService(final List<NotificationQueue> result, final String svcName) {
-        for (final String cur : queues.keySet()) {
-            if (cur.startsWith(svcName)) {
-                result.add(queues.get(cur));
-            }
-        }
-    }
-
     protected abstract NotificationQueue createNotificationQueueInternal(String svcName,
-                                                                         String queueName, NotificationQueueHandler handler,
-                                                                         NotificationConfig config);
-
-    public static String getCompositeName(final String svcName, final String queueName) {
-        return svcName + ":" + queueName;
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("NotificationQueueServiceBase");
-        sb.append("{queues=").append(queues);
-        sb.append('}');
-        return sb.toString();
-    }
+                                                                         String queueName, NotificationQueueHandler handler);
 }
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index 8deea0e..2fa9baa 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -34,23 +34,25 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
 
     private final int nbThreads;
     private final Executor executor;
-    private final String svcName;
+    private final String svcQName;
     private final long sleepTimeMs;
     private boolean isProcessingEvents;
     private int curActiveThreads;
 
     protected final ObjectMapper objectMapper;
-    
-    public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
+
+
+    public PersistentQueueBase(final String svcQName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
         this.executor = executor;
         this.nbThreads = nbThreads;
-        this.svcName = svcName;
+        this.svcQName = svcQName;
         this.sleepTimeMs = config.getSleepTimeMs();
         this.objectMapper = new ObjectMapper();        
         this.isProcessingEvents = false;
         this.curActiveThreads = 0;
     }
 
+
     @Override
     public void startQueue() {
 
@@ -61,7 +63,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
         final CountDownLatch doneInitialization = new CountDownLatch(nbThreads);
 
         log.info(String.format("%s: Starting with %d threads",
-                               svcName, nbThreads));
+                               svcQName, nbThreads));
 
         for (int i = 0; i < nbThreads; i++) {
             executor.execute(new Runnable() {
@@ -69,7 +71,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
                 public void run() {
 
                     log.info(String.format("%s: Thread %s [%d] starting",
-                                           svcName,
+                                           svcQName,
                                            Thread.currentThread().getName(),
                                            Thread.currentThread().getId()));
 
@@ -93,19 +95,19 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
                                 doProcessEvents();
                             } catch (Exception e) {
                                 log.warn(String.format("%s: Thread  %s  [%d] got an exception, catching and moving on...",
-                                                       svcName,
+                                                       svcQName,
                                                        Thread.currentThread().getName(),
                                                        Thread.currentThread().getId()), e);
                             }
                             sleepALittle();
                         }
                     } catch (InterruptedException e) {
-                        log.info(String.format("%s: Thread %s got interrupted, exting... ", svcName, Thread.currentThread().getName()));
+                        log.info(String.format("%s: Thread %s got interrupted, exting... ", svcQName, Thread.currentThread().getName()));
                     } catch (Throwable e) {
-                        log.error(String.format("%s: Thread %s got an exception, exting... ", svcName, Thread.currentThread().getName()), e);
+                        log.error(String.format("%s: Thread %s got an exception, exting... ", svcQName, Thread.currentThread().getName()), e);
                     } finally {
 
-                        log.info(String.format("%s: Thread %s has exited", svcName, Thread.currentThread().getName()));
+                        log.info(String.format("%s: Thread %s has exited", svcQName, Thread.currentThread().getName()));
                         synchronized (thePersistentQ) {
                             curActiveThreads--;
                         }
@@ -121,12 +123,12 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
             final boolean success = doneInitialization.await(waitTimeoutMs, TimeUnit.MILLISECONDS);
             if (!success) {
 
-                log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+                log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcQName, (nbThreads - doneInitialization.getCount()), nbThreads));
             } else {
-                log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+                log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcQName, (nbThreads - doneInitialization.getCount()), nbThreads));
             }
         } catch (InterruptedException e) {
-            log.warn(String.format("%s: Start sequence, got interrupted", svcName));
+            log.warn(String.format("%s: Start sequence, got interrupted", svcQName));
         }
     }
 
@@ -147,17 +149,17 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
             }
 
         } catch (InterruptedException ignore) {
-            log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcName, curActiveThreads));
+            log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcQName, curActiveThreads));
         } finally {
             if (remaining > 0) {
-                log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+                log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcQName, curActiveThreads));
             } else {
-                log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+                log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcQName, curActiveThreads));
             }
             curActiveThreads = 0;
         }
     }
-    
+
     protected <T> T deserializeEvent(final String className, final String json) {
         try {
             final Class<?> claz = Class.forName(className);
@@ -168,9 +170,8 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
         }
     }
 
+    public abstract int doProcessEvents();
 
+    public abstract boolean isStarted();
 
-
-    @Override
-    public abstract int doProcessEvents();
 }
diff --git a/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java b/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
index b839468..e183bb9 100644
--- a/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
@@ -26,8 +26,5 @@ public interface QueueLifecycle {
      */
     public void stopQueue();
 
-    /**
-     * Processes event from queue
-     */
-    public int doProcessEvents();
+    public boolean isStarted();
 }
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 980d9c9..4e36ef9 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -131,7 +131,7 @@ CREATE TABLE notifications (
     PRIMARY KEY(record_id)
 ) ENGINE=innodb;
 CREATE UNIQUE INDEX notifications_id ON notifications(id);
-CREATE INDEX  `idx_comp_where` ON notifications (`effective_date`, `queue_name`, `processing_state`,`processing_owner`,`processing_available_date`);
+CREATE INDEX  `idx_comp_where` ON notifications (`effective_date`, `processing_state`,`processing_owner`,`processing_available_date`);
 CREATE INDEX  `idx_update` ON notifications (`processing_state`,`processing_owner`,`processing_available_date`);
 CREATE INDEX  `idx_get_ready` ON notifications (`effective_date`,`created_date`,`id`);
 CREATE INDEX notifications_tenant_account_record_id ON notifications(tenant_record_id, account_record_id);
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 8287ea4..70651e8 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -23,7 +23,6 @@ getReadyNotifications() ::= <<
     FORCE INDEX (idx_comp_where)
     where
       effective_date \<= :now
-      and queue_name = :queueName
       and processing_state != 'PROCESSED'
       and processing_state != 'REMOVED'
       and (processing_owner IS NULL OR processing_available_date \<= :now)
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 54be599..ea5c536 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -86,7 +86,7 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
 
         Thread.sleep(1000);
         final DateTime now = new DateTime();
-        final List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, "testBasic", internalCallContext);
+        final List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, internalCallContext);
         assertNotNull(notifications);
         assertEquals(notifications.size(), 1);
 
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 2a59bfd..bffb033 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
+import com.ning.billing.util.Hostname;
 import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.clock.Clock;
@@ -34,13 +35,27 @@ import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueue
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
+public class MockNotificationQueue implements NotificationQueue {
+
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
+    private final String hostname;
     private final TreeSet<Notification> notifications;
+    private final Clock clock;
+    private final String svcName;
+    private final String queueName;
+    private final NotificationQueueHandler handler;
+
+    private volatile boolean isStarted;
+
+    public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler) {
+
+        this.svcName = svcName;
+        this.queueName = queueName;
+        this.handler = handler;
+        this.clock = clock;
+        this.hostname = Hostname.get();
 
-    public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
-        super(clock, svcName, queueName, handler, config);
         notifications = new TreeSet<Notification>(new Comparator<Notification>() {
             @Override
             public int compare(final Notification o1, final Notification o2) {
@@ -57,7 +72,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     public void recordFutureNotification(final DateTime futureNotificationTime, final UUID accountId,
                                          final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
         final String json = objectMapper.writeValueAsString(notificationKey);
-        final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, accountId, futureNotificationTime,
+        final Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.getClass().getName(), json, accountId, futureNotificationTime,
                                                                   null, 0L);
         synchronized (notifications) {
             notifications.add(notification);
@@ -70,81 +85,107 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
         recordFutureNotification(futureNotificationTime, accountId, notificationKey, context);
     }
 
-    public List<Notification> getPendingEvents() {
-        final List<Notification> result = new ArrayList<Notification>();
 
+    @Override
+    public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
+        final List<Notification> toClearNotifications = new ArrayList<Notification>();
         for (final Notification notification : notifications) {
-            if (notification.getProcessingState() == PersistentQueueEntryLifecycleState.AVAILABLE) {
-                result.add(notification);
+            if (notification.getNotificationKey().equals(key.toString())) {
+                toClearNotifications.add(notification);
             }
         }
 
-        return result;
+        synchronized (notifications) {
+            if (toClearNotifications.size() > 0) {
+                notifications.removeAll(toClearNotifications);
+            }
+        }
     }
 
     @Override
-    public int doProcessEvents() {
-        final int result;
-        final List<Notification> processedNotifications = new ArrayList<Notification>();
-        final List<Notification> oldNotifications = new ArrayList<Notification>();
+    public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
 
-        final List<Notification> readyNotifications = new ArrayList<Notification>();
+        final List<Notification> result = new ArrayList<Notification>();
         synchronized (notifications) {
-            for (final Notification cur : notifications) {
-                if (cur.isAvailableForProcessing(getClock().getUTCNow())) {
-                    readyNotifications.add(cur);
+            for (Notification cur : notifications) {
+                if (cur.getAccountId().equals(accountId) || cur.getEffectiveDate().compareTo(effectiveDate) == 0) {
+                    result.add(cur);
                 }
             }
         }
+        return result;
+    }
 
-        result = readyNotifications.size();
-        for (final Notification cur : readyNotifications) {
-            final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
-            getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
-            final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
-                                                                                      "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
-                                                                                      PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
-                                                                                      cur.getNotificationKey(), cur.getAccountId(), cur.getEffectiveDate(),
-                                                                                      cur.getAccountRecordId(), cur.getTenantRecordId());
-            oldNotifications.add(cur);
-            processedNotifications.add(processedNotification);
-        }
-
+    @Override
+    public void removeNotification(final UUID notificationId, final InternalCallContext context) {
         synchronized (notifications) {
-            if (oldNotifications.size() > 0) {
-                notifications.removeAll(oldNotifications);
-            }
-
-            if (processedNotifications.size() > 0) {
-                notifications.addAll(processedNotifications);
+            for (Notification cur : notifications) {
+                if (cur.getId().equals(notificationId)) {
+                    notifications.remove(cur);
+                    break;
+                }
             }
         }
 
-        return result;
     }
 
     @Override
-    public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
-        final List<Notification> toClearNotifications = new ArrayList<Notification>();
-        for (final Notification notification : notifications) {
-            if (notification.getNotificationKey().equals(key.toString())) {
-                toClearNotifications.add(notification);
-            }
-        }
+    public String getFullQName() {
+        return NotificationQueueDispatcher.getCompositeName(svcName, queueName);
+    }
 
-        synchronized (notifications) {
-            if (toClearNotifications.size() > 0) {
-                notifications.removeAll(toClearNotifications);
-            }
-        }
+    @Override
+    public String getServiceName() {
+        return svcName;
     }
 
     @Override
-    public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
-        return null;
+    public String getQueueName() {
+        return queueName;
     }
 
     @Override
-    public void removeNotification(final UUID notificationId, final InternalCallContext context) {
+    public NotificationQueueHandler getHandler() {
+        return handler;
+    }
+
+    @Override
+    public void startQueue() {
+        isStarted = true;
+    }
+
+    @Override
+    public void stopQueue() {
+        isStarted = false;
+    }
+
+    @Override
+    public boolean isStarted() {
+        return isStarted;
     }
+
+
+    public List<Notification> getReadyNotifications() {
+        final int result;
+        final List<Notification> processedNotifications = new ArrayList<Notification>();
+        final List<Notification> oldNotifications = new ArrayList<Notification>();
+
+        final List<Notification> readyNotifications = new ArrayList<Notification>();
+        synchronized (notifications) {
+            for (final Notification cur : notifications) {
+                if (cur.isAvailableForProcessing(clock.getUTCNow())) {
+                    readyNotifications.add(cur);
+                }
+            }
+        }
+        return readyNotifications;
+    }
+
+    public void markProcessedNotifications(final List<Notification> toBeremoved, final List<Notification> toBeAdded ) {
+        synchronized (notifications) {
+            notifications.removeAll(toBeremoved);
+            notifications.addAll(toBeAdded);
+        }
+    }
+
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
index ee5ca32..45acc5b 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
@@ -16,22 +16,65 @@
 
 package com.ning.billing.util.notificationq;
 
-import com.ning.billing.util.config.NotificationConfig;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
 
 import com.google.inject.Inject;
 
 public class MockNotificationQueueService extends NotificationQueueServiceBase {
 
     @Inject
-    public MockNotificationQueueService(final Clock clock) {
-        super(clock);
+    public MockNotificationQueueService(final Clock clock, final NotificationConfig config) {
+        super(clock, config, null, null);
+    }
+
+
+    @Override
+    protected NotificationQueue createNotificationQueueInternal(final String svcName, final String queueName,
+                                                                final NotificationQueueHandler handler) {
+        return new MockNotificationQueue(clock, svcName, queueName, handler);
     }
 
+
     @Override
-    protected NotificationQueue createNotificationQueueInternal(final String svcName,
-                                                                final String queueName, final NotificationQueueHandler handler,
-                                                                final NotificationConfig config) {
-        return new MockNotificationQueue(clock, svcName, queueName, handler, config);
+    public int doProcessEvents() {
+
+        int result = 0;
+
+        for (NotificationQueue cur : queues.values()) {
+            result += doProcessEventsForQueue((MockNotificationQueue) cur);
+        }
+        return result;
+    }
+
+    private int doProcessEventsForQueue(final MockNotificationQueue queue) {
+
+
+        int result = 0;
+        final List<Notification> processedNotifications = new ArrayList<Notification>();
+        final List<Notification> oldNotifications = new ArrayList<Notification>();
+
+        List<Notification> readyNotifications = queue.getReadyNotifications();
+        for (final Notification cur : readyNotifications) {
+            final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
+            queue.getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
+            final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
+                                                                                      "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
+                                                                                      PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
+                                                                                      cur.getNotificationKey(), cur.getAccountId(), cur.getEffectiveDate(),
+                                                                                      cur.getAccountRecordId(), cur.getTenantRecordId());
+            oldNotifications.add(cur);
+            processedNotifications.add(processedNotification);
+            result++;
+        }
+
+        queue.markProcessedNotifications(oldNotifications, processedNotifications);
+        return result;
     }
 }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index ee19da5..2faa969 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -38,12 +38,12 @@ import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import com.ning.billing.KillbillTestSuiteWithEmbeddedDB;
-import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.config.NotificationConfig;
 import com.ning.billing.util.io.IOUtils;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
@@ -62,6 +62,7 @@ import static org.testng.Assert.assertEquals;
 
 @Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
 public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
+
     private final Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
 
     private static final UUID accountId = UUID.randomUUID();
@@ -75,11 +76,18 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
     @Inject
     private Clock clock;
 
+    @Inject
+    NotificationQueueService queueService;
+
+    @Inject
+    NotificationConfig config;
+
     private DummySqlTest dao;
 
     private int eventsReceived;
 
     private static final class TestNotificationKey implements NotificationKey, Comparable<TestNotificationKey> {
+
         private final String value;
 
         @JsonCreator
@@ -96,6 +104,13 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         public int compareTo(TestNotificationKey arg0) {
             return value.compareTo(arg0.value);
         }
+
+        @Override
+        public String toString() {
+            final StringBuilder sb = new StringBuilder();
+            sb.append(value);
+            return sb.toString();
+        }
     }
 
     @BeforeSuite(groups = "slow")
@@ -133,20 +148,21 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
 
         final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
 
-        final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
-                                                                                    synchronized (expectedNotifications) {
-                                                                                        log.info("Handler received key: " + notificationKey);
-
-                                                                                        expectedNotifications.put(notificationKey, Boolean.TRUE);
-                                                                                        expectedNotifications.notify();
-                                                                                    }
-                                                                                }
-                                                                            },
-                                                                            getNotificationConfig(false, 100, 1, 10000),
-                                                                            new InternalCallContextFactory(dbi, clock));
+        final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+                                                                             "foo",
+                                                                             new NotificationQueueHandler() {
+                                                                                 @Override
+                                                                                 public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                     synchronized (expectedNotifications) {
+                                                                                         log.info("Handler received key: " + notificationKey);
+
+                                                                                         expectedNotifications.put(notificationKey, Boolean.TRUE);
+                                                                                         expectedNotifications.notify();
+                                                                                     }
+                                                                                 }
+                                                                             },
+                                                                             config);
+
 
         queue.startQueue();
 
@@ -188,21 +204,24 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
     }
 
     @Test(groups = "slow")
-    public void testManyNotifications() throws InterruptedException {
+    public void testManyNotifications() throws Exception {
         final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
 
-        final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
-                                                                                    synchronized (expectedNotifications) {
-                                                                                        expectedNotifications.put(notificationKey, Boolean.TRUE);
-                                                                                        expectedNotifications.notify();
-                                                                                    }
-                                                                                }
-                                                                            },
-                                                                            getNotificationConfig(false, 100, 10, 10000),
-                                                                            new InternalCallContextFactory(dbi, clock));
+
+        final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+                                                                             "many",
+                                                                             new NotificationQueueHandler() {
+                                                                                 @Override
+                                                                                 public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                     synchronized (expectedNotifications) {
+                                                                                         log.info("Handler received key: " + notificationKey.toString());
+
+                                                                                         expectedNotifications.put(notificationKey, Boolean.TRUE);
+                                                                                         expectedNotifications.notify();
+                                                                                     }
+                                                                                 }
+                                                                             },
+                                                                             config);
 
         queue.startQueue();
 
@@ -216,7 +235,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
             final DummyObject obj = new DummyObject("foo", key);
             final int currentIteration = i;
 
-            final NotificationKey notificationKey = new TestNotificationKey(key.toString());
+            final NotificationKey notificationKey = new TestNotificationKey(new Integer(i).toString());
             expectedNotifications.put(notificationKey, Boolean.FALSE);
 
             dao.inTransaction(new Transaction<Void, DummySqlTest>() {
@@ -255,7 +274,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
                     success = true;
                     break;
                 }
-                //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+                log.info(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
                 expectedNotifications.wait(1000);
             }
         } while (nbTry-- > 0);
@@ -281,19 +300,9 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         final Map<NotificationKey, Boolean> expectedNotificationsFred = new TreeMap<NotificationKey, Boolean>();
         final Map<NotificationKey, Boolean> expectedNotificationsBarney = new TreeMap<NotificationKey, Boolean>();
 
-        final NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, new InternalCallContextFactory(dbi, clock));
 
-        final NotificationConfig config = new NotificationConfig() {
-            @Override
-            public boolean isNotificationProcessingOff() {
-                return false;
-            }
+        final NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, config, new InternalCallContextFactory(dbi, clock));
 
-            @Override
-            public long getSleepTimeMs() {
-                return 10;
-            }
-        };
 
         final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
             @Override
@@ -367,39 +376,29 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney));
     }
 
-    NotificationConfig getNotificationConfig(final boolean off, final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
-        return new NotificationConfig() {
-            @Override
-            public boolean isNotificationProcessingOff() {
-                return off;
-            }
-
-            @Override
-            public long getSleepTimeMs() {
-                return sleepTime;
-            }
-        };
-    }
 
     @Test(groups = "slow")
-    public void testRemoveNotifications() throws InterruptedException {
+    public void testRemoveNotifications() throws Exception  {
         final UUID key = UUID.randomUUID();
         final NotificationKey notificationKey = new TestNotificationKey(key.toString());
         final UUID key2 = UUID.randomUUID();
         final NotificationKey notificationKey2 = new TestNotificationKey(key2.toString());
 
-        final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
-                                                                                    if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
-                                                                                        log.info("Received notification with key: " + notificationKey);
-                                                                                        eventsReceived++;
-                                                                                    }
-                                                                                }
-                                                                            },
-                                                                            getNotificationConfig(false, 100, 10, 10000),
-                                                                            new InternalCallContextFactory(dbi, clock));
+
+        final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+                                                                             "remove",
+                                                                             new NotificationQueueHandler() {
+                                                                                 @Override
+                                                                                 public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+                                                                                     if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
+                                                                                         log.info("Received notification with key: " + notificationKey);
+                                                                                         eventsReceived++;
+                                                                                     }
+                                                                                 }
+                                                                             },
+                                                                             config);
+
+
 
         queue.startQueue();
 
@@ -440,10 +439,26 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
         queue.stopQueue();
     }
 
+
+    static NotificationConfig getNotificationConfig(final boolean off, final long sleepTime) {
+        return new NotificationConfig() {
+            @Override
+            public boolean isNotificationProcessingOff() {
+                return off;
+            }
+
+            @Override
+            public long getSleepTimeMs() {
+                return sleepTime;
+            }
+        };
+    }
+
     public static class TestNotificationQueueModule extends AbstractModule {
+
         @Override
         protected void configure() {
-            bind(Clock.class).to(ClockMock.class);
+            bind(Clock.class).to(ClockMock.class).asEagerSingleton();
 
             final MysqlTestingHelper helper = KillbillTestSuiteWithEmbeddedDB.getMysqlTestingHelper();
             bind(MysqlTestingHelper.class).toInstance(helper);
@@ -451,6 +466,8 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
             bind(IDBI.class).toInstance(dbi);
             final IDBI otherDbi = helper.getDBI();
             bind(IDBI.class).annotatedWith(Names.named("global-lock")).toInstance(otherDbi);
+            bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+            bind(NotificationConfig.class).toInstance(getNotificationConfig(false, 100));
         }
     }
 }