killbill-aplcache

Missing updates from previous commit

6/18/2012 11:54:08 PM

Changes

entitlement/src/test/java/com/ning/billing/entitlement/engine/core/TestEntitlementNotificationKey.java 45(+0 -45)

Details

diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 3e8c025..a4a926e 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import javax.swing.text.html.HTMLDocument.HTMLReader.IsindexAction;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,6 +61,7 @@ import com.ning.billing.util.callcontext.CallContextFactory;
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.UserType;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
@@ -87,11 +90,11 @@ public class Engine implements EventListener, EntitlementService {
 
     @Inject
     public Engine(final Clock clock, final EntitlementDao dao, final PlanAligner planAligner,
-                  final EntitlementConfig config,
-                  final AddonUtils addonUtils, final Bus eventBus,
-                  final NotificationQueueService notificationQueueService,
-                  final SubscriptionFactory subscriptionFactory,
-                  final CallContextFactory factory) {
+            final EntitlementConfig config,
+            final AddonUtils addonUtils, final Bus eventBus,
+            final NotificationQueueService notificationQueueService,
+            final SubscriptionFactory subscriptionFactory,
+            final CallContextFactory factory) {
         super();
         this.clock = clock;
         this.dao = dao;
@@ -114,35 +117,40 @@ public class Engine implements EventListener, EntitlementService {
 
         try {
             subscriptionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
-                                                                                      NOTIFICATION_QUEUE_NAME,
-                                                                                      new NotificationQueueHandler() {
-                                                                                          @Override
-                                                                                          public void handleReadyNotification(final String inputKey, final DateTime eventDateTime) {
-
-                                                                                              final EntitlementNotificationKey key = new EntitlementNotificationKey(inputKey);
-                                                                                              final EntitlementEvent event = dao.getEventById(key.getEventId());
-                                                                                              if (event == null) {
-                                                                                                  log.warn("Failed to extract event for notification key {}", inputKey);
-                                                                                                  return;
-                                                                                              }
-                                                                                              final UUID userToken = (event.getType() == EventType.API_USER) ? ((ApiEvent) event).getUserToken() : null;
-                                                                                              final CallContext context = factory.createCallContext("SubscriptionEventQueue", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
-                                                                                              processEventReady(event, key.getSeqId(), context);
-                                                                                          }
-                                                                                      },
-                                                                                      new NotificationConfig() {
-
-                                                                                          @Override
-                                                                                          public long getSleepTimeMs() {
-                                                                                              return config.getSleepTimeMs();
-                                                                                          }
-
-                                                                                          @Override
-                                                                                          public boolean isNotificationProcessingOff() {
-                                                                                              return config.isNotificationProcessingOff();
-                                                                                          }
-                                                                                      }
-                                                                                     );
+                    NOTIFICATION_QUEUE_NAME,
+                    new NotificationQueueHandler() {
+                @Override
+                public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime) {
+
+                    if (! (inputKey instanceof EntitlementNotificationKey)) {
+                        log.error("Entitlement service received an unexpected event type {}" + inputKey.getClass().getName());
+                        return;
+                    }
+                    EntitlementNotificationKey key = (EntitlementNotificationKey) inputKey;
+                    
+                     final EntitlementEvent event = dao.getEventById(key.getEventId());
+                    if (event == null) {
+                        log.warn("Failed to extract event for notification key {}", inputKey);
+                        return;
+                    }
+                    final UUID userToken = (event.getType() == EventType.API_USER) ? ((ApiEvent) event).getUserToken() : null;
+                    final CallContext context = factory.createCallContext("SubscriptionEventQueue", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
+                    processEventReady(event, key.getSeqId(), context);
+                }
+            },
+            new NotificationConfig() {
+
+                @Override
+                public long getSleepTimeMs() {
+                    return config.getSleepTimeMs();
+                }
+
+                @Override
+                public boolean isNotificationProcessingOff() {
+                    return config.isNotificationProcessingOff();
+                }
+            }
+            );
         } catch (NotificationQueueAlreadyExists e) {
             throw new RuntimeException(e);
         }
@@ -202,10 +210,10 @@ public class Engine implements EventListener, EntitlementService {
             final TimedPhase nextTimedPhase = planAligner.getNextTimedPhase(subscription, now, now);
             final PhaseEvent nextPhaseEvent = (nextTimedPhase != null) ?
                     PhaseEventData.createNextPhaseEvent(nextTimedPhase.getPhase().getName(), subscription, now, nextTimedPhase.getStartPhase()) :
-                    null;
-            if (nextPhaseEvent != null) {
-                dao.createNextPhaseEvent(subscription.getId(), nextPhaseEvent, context);
-            }
+                        null;
+                    if (nextPhaseEvent != null) {
+                        dao.createNextPhaseEvent(subscription.getId(), nextPhaseEvent, context);
+                    }
         } catch (EntitlementError e) {
             log.error(String.format("Failed to insert next phase for subscription %s", subscription.getId()), e);
         }
@@ -238,13 +246,13 @@ public class Engine implements EventListener, EntitlementService {
                 // Perform AO cancellation using the effectiveDate of the BP
                 //
                 final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
-                                                                          .setSubscriptionId(cur.getId())
-                                                                          .setActiveVersion(cur.getActiveVersion())
-                                                                          .setProcessedDate(now)
-                                                                          .setEffectiveDate(event.getEffectiveDate())
-                                                                          .setRequestedDate(now)
-                                                                          .setUserToken(context.getUserToken())
-                                                                          .setFromDisk(true));
+                .setSubscriptionId(cur.getId())
+                .setActiveVersion(cur.getActiveVersion())
+                .setProcessedDate(now)
+                .setEffectiveDate(event.getEffectiveDate())
+                .setRequestedDate(now)
+                .setUserToken(context.getUserToken())
+                .setFromDisk(true));
 
                 addOnCancellations.put(cur.getId(), cancelEvent);
             }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java
index ee35526..1327a72 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java
@@ -17,16 +17,19 @@ package com.ning.billing.entitlement.engine.core;
 
 import java.util.UUID;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.ning.billing.util.notificationq.NotificationKey;
 
 public class EntitlementNotificationKey implements NotificationKey {
 
-    private static final String DELIMITER = ":";
-
     private final UUID eventId;
     private final int seqId;
 
-    public EntitlementNotificationKey(final UUID eventId, final int seqId) {
+    
+    @JsonCreator
+    public EntitlementNotificationKey(@JsonProperty("eventId") final UUID eventId,
+            @JsonProperty("seqId") final int seqId) {
         this.eventId = eventId;
         this.seqId = seqId;
     }
@@ -35,17 +38,6 @@ public class EntitlementNotificationKey implements NotificationKey {
         this(eventId, 0);
     }
 
-    public EntitlementNotificationKey(final String input) {
-
-        final String[] parts = input.split(DELIMITER);
-        eventId = UUID.fromString(parts[0]);
-        if (parts.length == 2) {
-            seqId = Integer.valueOf(parts[1]);
-        } else {
-            seqId = 0;
-        }
-    }
-
     public UUID getEventId() {
         return eventId;
     }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
index fab86f4..7316c3b 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
@@ -17,6 +17,8 @@
 package com.ning.billing.entitlement.engine.dao;
 
 import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -89,8 +91,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
 
     @Inject
     public AuditedEntitlementDao(final IDBI dbi, final Clock clock,
-                                 final AddonUtils addonUtils, final NotificationQueueService notificationQueueService,
-                                 final Bus eventBus) {
+            final AddonUtils addonUtils, final NotificationQueueService notificationQueueService,
+            final Bus eventBus) {
 
         this.clock = clock;
         this.subscriptionsDao = dbi.onDemand(SubscriptionSqlDao.class);
@@ -188,7 +190,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
         subscriptionsDao.inTransaction(new Transaction<Void, SubscriptionSqlDao>() {
             @Override
             public Void inTransaction(final SubscriptionSqlDao transactionalDao,
-                                      final TransactionStatus status) throws Exception {
+                    final TransactionStatus status) throws Exception {
                 final String subscriptionId = subscription.getId().toString();
                 transactionalDao.updateChargedThroughDate(subscription.getId().toString(), ctd, context);
                 final Long subscriptionRecordId = transactionalDao.getRecordId(subscriptionId);
@@ -220,8 +222,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
                 transactional.insertAuditFromTransaction(audit, context);
 
                 recordFutureNotificationFromTransaction(transactional,
-                                                        nextPhase.getEffectiveDate(),
-                                                        new EntitlementNotificationKey(nextPhase.getId()));
+                        nextPhase.getEffectiveDate(),
+                        new EntitlementNotificationKey(nextPhase.getId()));
                 return null;
             }
         });
@@ -242,7 +244,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
         return subscriptionsDao.inTransaction(new Transaction<Map<UUID, List<EntitlementEvent>>, SubscriptionSqlDao>() {
             @Override
             public Map<UUID, List<EntitlementEvent>> inTransaction(final SubscriptionSqlDao transactional,
-                                                                   final TransactionStatus status) throws Exception {
+                    final TransactionStatus status) throws Exception {
                 final List<Subscription> subscriptions = transactional.getSubscriptionsFromBundleId(bundleId.toString());
                 if (subscriptions.size() == 0) {
                     return Collections.emptyMap();
@@ -266,13 +268,13 @@ public class AuditedEntitlementDao implements EntitlementDao {
 
     @Override
     public void createSubscription(final SubscriptionData subscription,
-                                   final List<EntitlementEvent> initialEvents, final CallContext context) {
+            final List<EntitlementEvent> initialEvents, final CallContext context) {
 
         subscriptionsDao.inTransaction(new Transaction<Void, SubscriptionSqlDao>() {
 
             @Override
             public Void inTransaction(final SubscriptionSqlDao transactional,
-                                      final TransactionStatus status) throws Exception {
+                    final TransactionStatus status) throws Exception {
 
                 transactional.insertSubscription(subscription, context);
                 final Long subscriptionRecordId = transactional.getRecordId(subscription.getId().toString());
@@ -288,8 +290,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
                     final Long recordId = eventsDaoFromSameTransaction.getRecordId(cur.getId().toString());
                     audits.add(new EntityAudit(TableName.SUBSCRIPTION_EVENTS, recordId, ChangeType.INSERT));
                     recordFutureNotificationFromTransaction(transactional,
-                                                            cur.getEffectiveDate(),
-                                                            new EntitlementNotificationKey(cur.getId()));
+                            cur.getEffectiveDate(),
+                            new EntitlementNotificationKey(cur.getId()));
                 }
 
                 eventsDaoFromSameTransaction.insertAuditFromTransaction(audits, context);
@@ -300,12 +302,12 @@ public class AuditedEntitlementDao implements EntitlementDao {
 
     @Override
     public void recreateSubscription(final UUID subscriptionId,
-                                     final List<EntitlementEvent> recreateEvents, final CallContext context) {
+            final List<EntitlementEvent> recreateEvents, final CallContext context) {
 
         eventsDao.inTransaction(new Transaction<Void, EntitlementEventSqlDao>() {
             @Override
             public Void inTransaction(final EntitlementEventSqlDao transactional,
-                                      final TransactionStatus status) throws Exception {
+                    final TransactionStatus status) throws Exception {
 
                 final List<EntityAudit> audits = new ArrayList<EntityAudit>();
                 for (final EntitlementEvent cur : recreateEvents) {
@@ -313,8 +315,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
                     final Long recordId = transactional.getRecordId(cur.getId().toString());
                     audits.add(new EntityAudit(TableName.SUBSCRIPTION_EVENTS, recordId, ChangeType.INSERT));
                     recordFutureNotificationFromTransaction(transactional,
-                                                            cur.getEffectiveDate(),
-                                                            new EntitlementNotificationKey(cur.getId()));
+                            cur.getEffectiveDate(),
+                            new EntitlementNotificationKey(cur.getId()));
 
                 }
 
@@ -330,7 +332,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
         eventsDao.inTransaction(new Transaction<Void, EntitlementEventSqlDao>() {
             @Override
             public Void inTransaction(final EntitlementEventSqlDao transactional,
-                                      final TransactionStatus status) throws Exception {
+                    final TransactionStatus status) throws Exception {
                 cancelNextCancelEventFromTransaction(subscriptionId, transactional, context);
                 cancelNextChangeEventFromTransaction(subscriptionId, transactional, context);
                 cancelNextPhaseEventFromTransaction(subscriptionId, transactional, context);
@@ -342,8 +344,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
                 transactional.insertAuditFromTransaction(audit, context);
 
                 recordFutureNotificationFromTransaction(transactional,
-                                                        cancelEvent.getEffectiveDate(),
-                                                        new EntitlementNotificationKey(cancelEvent.getId(), seqId));
+                        cancelEvent.getEffectiveDate(),
+                        new EntitlementNotificationKey(cancelEvent.getId(), seqId));
                 return null;
             }
         });
@@ -356,7 +358,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
 
             @Override
             public Void inTransaction(final EntitlementEventSqlDao transactional,
-                                      final TransactionStatus status) throws Exception {
+                    final TransactionStatus status) throws Exception {
 
                 EntitlementEvent cancelledEvent = null;
                 final Date now = clock.getUTCNow().toDate();
@@ -384,8 +386,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
                         final Long recordId = transactional.getRecordId(cur.getId().toString());
                         eventAudits.add(new EntityAudit(TableName.SUBSCRIPTION_EVENTS, recordId, ChangeType.INSERT));
                         recordFutureNotificationFromTransaction(transactional,
-                                                                cur.getEffectiveDate(),
-                                                                new EntitlementNotificationKey(cur.getId()));
+                                cur.getEffectiveDate(),
+                                new EntitlementNotificationKey(cur.getId()));
                     }
 
                     transactional.insertAuditFromTransaction(eventAudits, context);
@@ -410,8 +412,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
                     eventAudits.add(new EntityAudit(TableName.SUBSCRIPTION_EVENTS, recordId, ChangeType.INSERT));
 
                     recordFutureNotificationFromTransaction(transactional,
-                                                            cur.getEffectiveDate(),
-                                                            new EntitlementNotificationKey(cur.getId()));
+                            cur.getEffectiveDate(),
+                            new EntitlementNotificationKey(cur.getId()));
                 }
 
                 transactional.insertAuditFromTransaction(eventAudits, context);
@@ -433,8 +435,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
     }
 
     private void cancelFutureEventFromTransaction(final UUID subscriptionId, final EntitlementEventSqlDao dao,
-                                                  final EventType type, @Nullable final ApiEventType apiType,
-                                                  final CallContext context) {
+            final EventType type, @Nullable final ApiEventType apiType,
+            final CallContext context) {
 
         EntitlementEvent futureEvent = null;
         final Date now = clock.getUTCNow().toDate();
@@ -445,7 +447,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
                 if (futureEvent != null) {
                     throw new EntitlementError(
                             String.format("Found multiple future events for type %s for subscriptions %s",
-                                          type, subscriptionId.toString()));
+                                    type, subscriptionId.toString()));
                 }
                 futureEvent = cur;
             }
@@ -504,46 +506,46 @@ public class AuditedEntitlementDao implements EntitlementDao {
             Subscription reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
 
             switch (cur.getCategory()) {
-                case BASE:
-                    final Collection<EntitlementEvent> futureApiEvents = Collections2.filter(events, new Predicate<EntitlementEvent>() {
-                        @Override
-                        public boolean apply(final EntitlementEvent input) {
-                            return (input.getEffectiveDate().isAfter(clock.getUTCNow()) &&
-                                    ((input instanceof ApiEventCancel) || (input instanceof ApiEventChange)));
-                        }
-                    });
-                    futureBaseEvent = (futureApiEvents.size() == 0) ? null : futureApiEvents.iterator().next();
-                    break;
-
-                case ADD_ON:
-                    final Plan targetAddOnPlan = reloaded.getCurrentPlan();
-                    final String baseProductName = (futureBaseEvent instanceof ApiEventChange) ?
-                            ((ApiEventChange) futureBaseEvent).getEventPlan() : null;
-
-                    final boolean createCancelEvent = (futureBaseEvent != null) &&
-                            ((futureBaseEvent instanceof ApiEventCancel) ||
-                                    ((!addonUtils.isAddonAvailableFromPlanName(baseProductName, futureBaseEvent.getEffectiveDate(), targetAddOnPlan)) ||
-                                            (addonUtils.isAddonIncludedFromPlanName(baseProductName, futureBaseEvent.getEffectiveDate(), targetAddOnPlan))));
-
-                    if (createCancelEvent) {
-                        final DateTime now = clock.getUTCNow();
-                        final EntitlementEvent addOnCancelEvent = new ApiEventCancel(new ApiEventBuilder()
-                                                                                       .setSubscriptionId(reloaded.getId())
-                                                                                       .setActiveVersion(((SubscriptionData) reloaded).getActiveVersion())
-                                                                                       .setProcessedDate(now)
-                                                                                       .setEffectiveDate(futureBaseEvent.getEffectiveDate())
-                                                                                       .setRequestedDate(now)
-                                                                                               // This event is only there to indicate the ADD_ON is future canceled, but it is not there
-                                                                                               // on disk until the base plan cancellation becomes effective
-                                                                                       .setFromDisk(false));
-
-                        events.add(addOnCancelEvent);
-                        // Finally reload subscription with full set of events
-                        reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
+            case BASE:
+                final Collection<EntitlementEvent> futureApiEvents = Collections2.filter(events, new Predicate<EntitlementEvent>() {
+                    @Override
+                    public boolean apply(final EntitlementEvent input) {
+                        return (input.getEffectiveDate().isAfter(clock.getUTCNow()) &&
+                                ((input instanceof ApiEventCancel) || (input instanceof ApiEventChange)));
                     }
-                    break;
-                default:
-                    break;
+                });
+                futureBaseEvent = (futureApiEvents.size() == 0) ? null : futureApiEvents.iterator().next();
+                break;
+
+            case ADD_ON:
+                final Plan targetAddOnPlan = reloaded.getCurrentPlan();
+                final String baseProductName = (futureBaseEvent instanceof ApiEventChange) ?
+                        ((ApiEventChange) futureBaseEvent).getEventPlan() : null;
+
+                        final boolean createCancelEvent = (futureBaseEvent != null) &&
+                        ((futureBaseEvent instanceof ApiEventCancel) ||
+                                ((!addonUtils.isAddonAvailableFromPlanName(baseProductName, futureBaseEvent.getEffectiveDate(), targetAddOnPlan)) ||
+                                        (addonUtils.isAddonIncludedFromPlanName(baseProductName, futureBaseEvent.getEffectiveDate(), targetAddOnPlan))));
+
+                        if (createCancelEvent) {
+                            final DateTime now = clock.getUTCNow();
+                            final EntitlementEvent addOnCancelEvent = new ApiEventCancel(new ApiEventBuilder()
+                            .setSubscriptionId(reloaded.getId())
+                            .setActiveVersion(((SubscriptionData) reloaded).getActiveVersion())
+                            .setProcessedDate(now)
+                            .setEffectiveDate(futureBaseEvent.getEffectiveDate())
+                            .setRequestedDate(now)
+                            // This event is only there to indicate the ADD_ON is future canceled, but it is not there
+                            // on disk until the base plan cancellation becomes effective
+                            .setFromDisk(false));
+
+                            events.add(addOnCancelEvent);
+                            // Finally reload subscription with full set of events
+                            reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
+                        }
+                        break;
+            default:
+                break;
             }
 
             result.add(reloaded);
@@ -558,7 +560,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
 
             @Override
             public Void inTransaction(final EntitlementEventSqlDao transactional,
-                                      final TransactionStatus status) throws Exception {
+                    final TransactionStatus status) throws Exception {
 
                 final SubscriptionSqlDao transSubDao = transactional.become(SubscriptionSqlDao.class);
                 final BundleSqlDao transBundleDao = transactional.become(BundleSqlDao.class);
@@ -579,8 +581,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
                             audits.add(new EntityAudit(TableName.SUBSCRIPTION_EVENTS, recordId, ChangeType.INSERT));
 
                             recordFutureNotificationFromTransaction(transactional,
-                                                                    curEvent.getEffectiveDate(),
-                                                                    new EntitlementNotificationKey(curEvent.getId()));
+                                    curEvent.getEffectiveDate(),
+                                    new EntitlementNotificationKey(curEvent.getId()));
                         }
                         transSubDao.insertSubscription(subData, context);
                         recordId = transSubDao.getRecordId(subData.getId().toString());
@@ -604,7 +606,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
 
             @Override
             public Void inTransaction(final SubscriptionSqlDao transactional,
-                                      final TransactionStatus status) throws Exception {
+                    final TransactionStatus status) throws Exception {
 
                 final EntitlementEventSqlDao transEventDao = transactional.become(EntitlementEventSqlDao.class);
                 for (final SubscriptionDataRepair cur : inRepair) {
@@ -616,8 +618,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
                         transEventDao.insertEvent(event, context);
                         if (event.getEffectiveDate().isAfter(clock.getUTCNow())) {
                             recordFutureNotificationFromTransaction(transactional,
-                                                                    event.getEffectiveDate(),
-                                                                    new EntitlementNotificationKey(event.getId()));
+                                    event.getEffectiveDate(),
+                                    new EntitlementNotificationKey(event.getId()));
                         }
                     }
                 }
@@ -645,10 +647,12 @@ public class AuditedEntitlementDao implements EntitlementDao {
     private void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime effectiveDate, final NotificationKey notificationKey) {
         try {
             final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
-                                                                                                     Engine.NOTIFICATION_QUEUE_NAME);
+                    Engine.NOTIFICATION_QUEUE_NAME);
             subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
         } catch (NoSuchNotificationQueue e) {
             throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
     }
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index 5b905ac..ec5c17e 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -16,6 +16,7 @@
 
 package com.ning.billing.entitlement.engine.dao;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -46,6 +47,7 @@ import com.ning.billing.entitlement.api.user.SubscriptionBundle;
 import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.engine.core.Engine;
+import com.ning.billing.entitlement.engine.core.EntitlementNotificationKey;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
 import com.ning.billing.entitlement.events.user.ApiEvent;
@@ -159,12 +161,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
         synchronized (events) {
             events.addAll(initialEvents);
             for (final EntitlementEvent cur : initialEvents) {
-                recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
-                    @Override
-                    public String toString() {
-                        return cur.getId().toString();
-                    }
-                });
+                recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new EntitlementNotificationKey(cur.getId()));
             }
         }
         final Subscription updatedSubscription = buildSubscription(null, subscription);
@@ -178,12 +175,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
         synchronized (events) {
             events.addAll(recreateEvents);
             for (final EntitlementEvent cur : recreateEvents) {
-                recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
-                    @Override
-                    public String toString() {
-                        return cur.getId().toString();
-                    }
-                });
+                recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new EntitlementNotificationKey(cur.getId()));
             }
         }
     }
@@ -295,12 +287,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
             cancelNextPhaseEvent(subscriptionId);
             events.addAll(changeEvents);
             for (final EntitlementEvent cur : changeEvents) {
-                recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
-                    @Override
-                    public String toString() {
-                        return cur.getId().toString();
-                    }
-                });
+                recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new EntitlementNotificationKey(cur.getId()));
             }
         }
     }
@@ -308,12 +295,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
     private void insertEvent(final EntitlementEvent event) {
         synchronized (events) {
             events.add(event);
-            recordFutureNotificationFromTransaction(null, event.getEffectiveDate(), new NotificationKey() {
-                @Override
-                public String toString() {
-                    return event.getId().toString();
-                }
-            });
+            recordFutureNotificationFromTransaction(null, event.getEffectiveDate(), new EntitlementNotificationKey(event.getId()));
         }
     }
 
@@ -403,12 +385,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
                     final SubscriptionData subData = curSubscription.getData();
                     for (final EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
                         events.add(curEvent);
-                        recordFutureNotificationFromTransaction(null, curEvent.getEffectiveDate(), new NotificationKey() {
-                            @Override
-                            public String toString() {
-                                return curEvent.getId().toString();
-                            }
-                        });
+                        recordFutureNotificationFromTransaction(null, curEvent.getEffectiveDate(), new EntitlementNotificationKey(curEvent.getId()));
 
                     }
                     subscriptions.add(subData);
@@ -437,6 +414,8 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
             subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
         } catch (NoSuchNotificationQueue e) {
             throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new RuntimeException(e);            
         }
     }
 
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 534ee0f..5c03f01 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -30,6 +30,7 @@ import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
 import com.ning.billing.entitlement.api.user.Subscription;
 import com.ning.billing.invoice.InvoiceListener;
 import com.ning.billing.invoice.api.DefaultInvoiceService;
+import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
@@ -52,7 +53,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
 
     @Inject
     public DefaultNextBillingDateNotifier(final NotificationQueueService notificationQueueService,
-                                          final InvoiceConfig config, final EntitlementUserApi entitlementUserApi, final InvoiceListener listener) {
+            final InvoiceConfig config, final EntitlementUserApi entitlementUserApi, final InvoiceListener listener) {
         this.notificationQueueService = notificationQueueService;
         this.config = config;
         this.entitlementUserApi = entitlementUserApi;
@@ -62,41 +63,46 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
     @Override
     public void initialize() throws NotificationQueueAlreadyExists {
         nextBillingQueue = notificationQueueService.createNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
-                                                                            NEXT_BILLING_DATE_NOTIFIER_QUEUE,
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final String notificationKey, final DateTime eventDate) {
-                                                                                    try {
-                                                                                        final UUID key = UUID.fromString(notificationKey);
-                                                                                        try {
-                                                                                            final Subscription subscription = entitlementUserApi.getSubscriptionFromId(key);
-                                                                                            if (subscription == null) {
-                                                                                                log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")");
-                                                                                            } else {
-                                                                                                processEvent(key, eventDate);
-                                                                                            }
-                                                                                        } catch (EntitlementUserApiException e) {
-                                                                                            log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")", e);
-                                                                                        }
-                                                                                    } catch (IllegalArgumentException e) {
-                                                                                        log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
-                                                                                    }
-
-                                                                                }
-                                                                            },
-                                                                            new NotificationConfig() {
-
-                                                                                @Override
-                                                                                public long getSleepTimeMs() {
-                                                                                    return config.getSleepTimeMs();
-                                                                                }
-
-                                                                                @Override
-                                                                                public boolean isNotificationProcessingOff() {
-                                                                                    return config.isNotificationProcessingOff();
-                                                                                }
-                                                                            }
-                                                                           );
+                NEXT_BILLING_DATE_NOTIFIER_QUEUE,
+                new NotificationQueueHandler() {
+            @Override
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate) {
+                try {
+                    
+                    if (! (notificationKey instanceof NextBillingDateNotificationKey)) {
+                        log.error("Invoice service received an unexpected event type {}", notificationKey.getClass().getName());
+                        return;
+                    }
+                    final NextBillingDateNotificationKey key = (NextBillingDateNotificationKey) notificationKey;
+                    try {
+                        final Subscription subscription = entitlementUserApi.getSubscriptionFromId(key.getUuidKey());
+                        if (subscription == null) {
+                            log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")");
+                        } else {
+                            processEvent(key.getUuidKey(), eventDate);
+                        }
+                    } catch (EntitlementUserApiException e) {
+                        log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")", e);
+                    }
+                } catch (IllegalArgumentException e) {
+                    log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
+                }
+
+            }
+        },
+        new NotificationConfig() {
+
+            @Override
+            public long getSleepTimeMs() {
+                return config.getSleepTimeMs();
+            }
+
+            @Override
+            public boolean isNotificationProcessingOff() {
+                return config.isNotificationProcessingOff();
+            }
+        }
+        );
 
     }
 
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
index ffc08a3..f2521de 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
@@ -16,6 +16,7 @@
 
 package com.ning.billing.invoice.notification;
 
+import java.io.IOException;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -51,14 +52,11 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
                                                                              DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
             log.info("Queuing next billing date notification. id: {}, timestamp: {}", subscriptionId.toString(), futureNotificationTime.toString());
 
-            nextBillingQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NotificationKey() {
-                @Override
-                public String toString() {
-                    return subscriptionId.toString();
-                }
-            });
+            nextBillingQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NextBillingDateNotificationKey(subscriptionId));
         } catch (NoSuchNotificationQueue e) {
             log.error("Attempting to put items on a non-existent queue (NextBillingDateNotifier).", e);
+        } catch (IOException e) {
+            log.error("Failed to serialize notficationKey for subscriptionId {}", subscriptionId);            
         }
     }
 }
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
index 1695aa4..0d58473 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -27,6 +27,7 @@ import com.ning.billing.config.NotificationConfig;
 import com.ning.billing.overdue.OverdueProperties;
 import com.ning.billing.overdue.listener.OverdueListener;
 import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.notificationq.NotificationKey;
 import com.ning.billing.util.notificationq.NotificationQueue;
 import com.ning.billing.util.notificationq.NotificationQueueService;
 import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
@@ -47,7 +48,7 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
 
     @Inject
     public DefaultOverdueCheckNotifier(final NotificationQueueService notificationQueueService,
-                                       final OverdueProperties config, final OverdueListener listener) {
+            final OverdueProperties config, final OverdueListener listener) {
         this.notificationQueueService = notificationQueueService;
         this.config = config;
         this.listener = listener;
@@ -57,32 +58,36 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
     public void initialize() {
         try {
             overdueQueue = notificationQueueService.createNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
-                                                                            OVERDUE_CHECK_NOTIFIER_QUEUE,
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final String notificationKey, final DateTime eventDate) {
-                                                                                    try {
-                                                                                        final UUID key = UUID.fromString(notificationKey);
-                                                                                        processEvent(key, eventDate);
-                                                                                    } catch (IllegalArgumentException e) {
-                                                                                        log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
-                                                                                        return;
-                                                                                    }
-
-                                                                                }
-                                                                            },
-                                                                            new NotificationConfig() {
-                                                                                @Override
-                                                                                public boolean isNotificationProcessingOff() {
-                                                                                    return config.isNotificationProcessingOff();
-                                                                                }
-
-                                                                                @Override
-                                                                                public long getSleepTimeMs() {
-                                                                                    return config.getSleepTimeMs();
-                                                                                }
-                                                                            }
-                                                                           );
+                    OVERDUE_CHECK_NOTIFIER_QUEUE,
+                    new NotificationQueueHandler() {
+                @Override
+                public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate) {
+                    try {
+                        if (! (notificationKey instanceof OverdueCheckNotificationKey)) {
+                            log.error("Overdue service received Unexpected notificationKey {}", notificationKey.getClass().getName());
+                            return;
+                        }
+                        final OverdueCheckNotificationKey key = (OverdueCheckNotificationKey) notificationKey; 
+                        processEvent(key.getUuidKey(), eventDate);
+                    } catch (IllegalArgumentException e) {
+                        log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
+                        return;
+                    }
+
+                }
+            },
+            new NotificationConfig() {
+                @Override
+                public boolean isNotificationProcessingOff() {
+                    return config.isNotificationProcessingOff();
+                }
+
+                @Override
+                public long getSleepTimeMs() {
+                    return config.getSleepTimeMs();
+                }
+            }
+            );
         } catch (NotificationQueueAlreadyExists e) {
             throw new RuntimeException(e);
         }
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
index 33c1328..79cfdf9 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.ovedue.notification;
 
+import java.io.IOException;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,16 +50,12 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
                                                                               DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
             log.info("Queuing overdue check notification. id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
 
-            checkOverdueQueue.recordFutureNotification(futureNotificationTime, new NotificationKey() {
-                @Override
-                public String toString() {
-                    return overdueable.getId().toString();
-                }
-            });
+            checkOverdueQueue.recordFutureNotification(futureNotificationTime, new OverdueCheckNotificationKey(overdueable.getId()));
         } catch (NoSuchNotificationQueue e) {
             log.error("Attempting to put items on a non-existent queue (DefaultOverdueCheck).", e);
+        } catch (IOException e) {
+            log.error("Failed to serialize notifcationKey for {}", overdueable.toString());            
         }
-
     }
 
 
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
index f8c4498..292583e 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
@@ -15,6 +15,7 @@
  */
 package com.ning.billing.payment.retry;
 
+import java.io.IOException;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -43,7 +44,7 @@ public abstract class BaseRetryService implements RetryService {
     private NotificationQueue retryQueue;
 
     public BaseRetryService(final NotificationQueueService notificationQueueService,
-                            final Clock clock, final PaymentConfig config) {
+            final Clock clock, final PaymentConfig config) {
         this.notificationQueueService = notificationQueueService;
         final Clock clock1 = clock;
         this.config = config;
@@ -54,13 +55,18 @@ public abstract class BaseRetryService implements RetryService {
     public void initialize(final String svcName) throws NotificationQueueAlreadyExists {
         retryQueue = notificationQueueService.createNotificationQueue(svcName, getQueueName(), new NotificationQueueHandler() {
             @Override
-            public void handleReadyNotification(final String notificationKey, final DateTime eventDateTime) {
-                retry(UUID.fromString(notificationKey));
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
+                if (! (notificationKey instanceof PaymentRetryNotificationKey)) {
+                    log.error("Payment service got an unexpected notification type {}", notificationKey.getClass().getName());
+                    return;
+                }
+                final PaymentRetryNotificationKey key = (PaymentRetryNotificationKey) notificationKey;
+                retry(key.getUuidKey());
             }
         },
-                                                                      config);
+        config);
     }
-
+    
     @Override
     public void start() {
         retryQueue.startQueue();
@@ -109,19 +115,14 @@ public abstract class BaseRetryService implements RetryService {
             } catch (NoSuchNotificationQueue e) {
                 log.error(String.format("Failed to retrieve notification queue %s:%s", DefaultPaymentService.SERVICE_NAME, getQueueName()));
             }
-            */
+             */
         }
 
         private boolean scheduleRetryInternal(final UUID paymentId, final DateTime timeOfRetry, final Transmogrifier transactionalDao) {
 
             try {
                 final NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, getQueueName());
-                final NotificationKey key = new NotificationKey() {
-                    @Override
-                    public String toString() {
-                        return paymentId.toString();
-                    }
-                };
+                final NotificationKey key = new PaymentRetryNotificationKey(paymentId);
                 if (retryQueue != null) {
                     if (transactionalDao == null) {
                         retryQueue.recordFutureNotification(timeOfRetry, key);
@@ -132,6 +133,9 @@ public abstract class BaseRetryService implements RetryService {
             } catch (NoSuchNotificationQueue e) {
                 log.error(String.format("Failed to retrieve notification queue %s:%s", DefaultPaymentService.SERVICE_NAME, getQueueName()));
                 return false;
+            } catch (IOException e) {
+                log.error(String.format("Failed to serialize notificationQueue event for paymentId %s", paymentId));
+                return false;
             }
             return true;
         }
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
index d89b07f..b315e79 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
@@ -46,7 +46,7 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
 
     private final PersistentBusSqlDao dao;
 
-    private final ObjectMapper objectMapper = new ObjectMapper();
+
     private final EventBusDelegate eventBusDelegate;
     private final Clock clock;
     private final String hostname;
@@ -108,7 +108,7 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
 
         int result = 0;
         for (final BusEventEntry cur : events) {
-            final BusEvent evt = deserializeBusEvent(cur.getBusEventClass(), cur.getBusEventJson());
+            final BusEvent evt = deserializeEvent(cur.getBusEventClass(), cur.getBusEventJson());
             result++;
             // STEPH exception handling is done by GUAVA-- logged a bug Issue-780
             eventBusDelegate.post(evt);
@@ -117,16 +117,6 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
         return result;
     }
 
-    private BusEvent deserializeBusEvent(final String className, final String json) {
-        try {
-            final Class<?> claz = Class.forName(className);
-            return (BusEvent) objectMapper.readValue(json, claz);
-        } catch (Exception e) {
-            log.error(String.format("Failed to deserialize json object %s for class %s", json, className), e);
-            return null;
-        }
-    }
-
 
     private List<BusEventEntry> getNextBusEvent() {
 
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index f5a5f62..dc3ee4d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -73,6 +73,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             stmt.bind("id", evt.getId().toString());
             stmt.bind("createdDate", getDate(new DateTime()));
             stmt.bind("creatingOwner", evt.getCreatedOwner());
+            stmt.bind("className", evt.getNotificationKeyClass());            
             stmt.bind("notificationKey", evt.getNotificationKey());
             stmt.bind("effectiveDate", getDate(evt.getEffectiveDate()));
             stmt.bind("queueName", evt.getQueueName());
@@ -91,6 +92,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             final Long ordering = r.getLong("record_id");
             final UUID id = getUUID(r, "id");
             final String createdOwner = r.getString("creating_owner");
+            final String className = r.getString("class_name");            
             final String notificationKey = r.getString("notification_key");
             final String queueName = r.getString("queue_name");
             final DateTime effectiveDate = getDate(r, "effective_date");
@@ -99,7 +101,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             final PersistentQueueEntryLifecycleState processingState = PersistentQueueEntryLifecycleState.valueOf(r.getString("processing_state"));
 
             return new DefaultNotification(ordering, id, createdOwner, processingOwner, queueName, nextAvailableDate,
-                                           processingState, notificationKey, effectiveDate);
+                                           processingState, className, notificationKey, effectiveDate);
 
         }
     }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index fe9b7e2..1d74210 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -29,13 +29,14 @@ public class DefaultNotification extends EntityBase implements Notification {
     private final String queueName;
     private final DateTime nextAvailableDate;
     private final PersistentQueueEntryLifecycleState lifecycleState;
+    private final String notificationKeyClass;
     private final String notificationKey;
     private final DateTime effectiveDate;
 
 
     public DefaultNotification(final long ordering, final UUID id, final String createdOwner, final String owner, final String queueName, final DateTime nextAvailableDate,
                                final PersistentQueueEntryLifecycleState lifecycleState,
-                               final String notificationKey, final DateTime effectiveDate) {
+                               final String notificationKeyClass, final String notificationKey, final DateTime effectiveDate) {
         super(id);
         this.ordering = ordering;
         this.owner = owner;
@@ -43,12 +44,13 @@ public class DefaultNotification extends EntityBase implements Notification {
         this.queueName = queueName;
         this.nextAvailableDate = nextAvailableDate;
         this.lifecycleState = lifecycleState;
+        this.notificationKeyClass = notificationKeyClass;
         this.notificationKey = notificationKey;
         this.effectiveDate = effectiveDate;
     }
 
-    public DefaultNotification(final String queueName, final String createdOwner, final String notificationKey, final DateTime effectiveDate) {
-        this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+    public DefaultNotification(final String queueName, final String createdOwner, final String notificationKeyClass, final String notificationKey, final DateTime effectiveDate) {
+        this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE, notificationKeyClass, notificationKey, effectiveDate);
     }
 
     @Override
@@ -91,6 +93,12 @@ public class DefaultNotification extends EntityBase implements Notification {
     }
 
     @Override
+    public String getNotificationKeyClass() {
+        return notificationKeyClass;
+    }
+
+    
+    @Override
     public String getNotificationKey() {
         return notificationKey;
     }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 607c1ba..19923b3 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -16,6 +16,7 @@
 
 package com.ning.billing.util.notificationq;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -25,6 +26,7 @@ import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 import com.ning.billing.config.NotificationConfig;
+import com.ning.billing.util.bus.dao.BusEventEntry;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
@@ -56,7 +58,8 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
             nbProcessedEvents.incrementAndGet();
             logDebug("handling notification %s, key = %s for time %s",
                      cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-            handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
+            NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey()); 
+            handler.handleReadyNotification(key, cur.getEffectiveDate());
             result++;
             clearNotification(cur);
             logDebug("done handling notification %s, key = %s for time %s",
@@ -65,20 +68,24 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
         return result;
     }
 
+
     @Override
-    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) {
-        final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.toString(), futureNotificationTime);
-        dao.insertNotification(notification);
+    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
+        recordFutureNotificationInternal(futureNotificationTime, notificationKey, dao);
     }
 
     @Override
     public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
-                                                        final DateTime futureNotificationTime, final NotificationKey notificationKey) {
+                                                        final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
         final NotificationSqlDao transactionalNotificationDao = transactionalDao.become(NotificationSqlDao.class);
-        final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.toString(), futureNotificationTime);
-        transactionalNotificationDao.insertNotification(notification);
+        recordFutureNotificationInternal(futureNotificationTime, notificationKey, transactionalNotificationDao);
+    }
+    
+    private void recordFutureNotificationInternal(final DateTime futureNotificationTime, final NotificationKey notificationKey, final NotificationSqlDao thisDao) throws IOException {
+        final String json = objectMapper.writeValueAsString(notificationKey);
+        final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.getClass().getName(), json, futureNotificationTime);
+        thisDao.insertNotification(notification);
     }
-
 
     private void clearNotification(final Notification cleared) {
         dao.clearNotification(cleared.getId().toString(), hostname);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index 53fd0e0..2ecea54 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -24,6 +24,8 @@ import com.ning.billing.util.queue.PersistentQueueEntryLifecycle;
 public interface Notification extends PersistentQueueEntryLifecycle, Entity {
     public Long getOrdering();
 
+    public String getNotificationKeyClass();
+    
     public String getNotificationKey();
 
     public DateTime getEffectiveDate();
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java
index 2cde9bf..534a99e 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java
@@ -21,6 +21,4 @@ package com.ning.billing.util.notificationq;
  */
 public interface NotificationKey {
 
-    @Override
-    public String toString();
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 6e09320..678da46 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -16,6 +16,8 @@
 
 package com.ning.billing.util.notificationq;
 
+import java.io.IOException;
+
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
@@ -29,7 +31,8 @@ public interface NotificationQueue extends QueueLifecycle {
      * @param futureNotificationTime the time at which the notification is ready
      * @param notificationKey        the key for that notification
      */
-    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey);
+    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey)
+        throws IOException;
 
     /**
      * Record from within a transaction the need to be called back when the notification is ready
@@ -39,7 +42,8 @@ public interface NotificationQueue extends QueueLifecycle {
      * @param notificationKey        the key for that notification
      */
     public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
-                                                        final DateTime futureNotificationTime, final NotificationKey notificationKey);
+                                                        final DateTime futureNotificationTime, final NotificationKey notificationKey)
+        throws IOException;
 
 
     /**
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index e66d803..3b3d74d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -29,7 +29,7 @@ public interface NotificationQueueService {
          *
          * @param notificationKey the notification key associated to that notification entry
          */
-        public void handleReadyNotification(String notificationKey, DateTime eventDateTime);
+        public void handleReadyNotification(NotificationKey notificationKey, DateTime eventDateTime);
     }
 
     public static final class NotificationQueueAlreadyExists extends Exception {
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index e938057..09bb1b5 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -22,7 +22,9 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 import com.ning.billing.config.PersistentQueueConfig;
+import com.ning.billing.util.jackson.ObjectMapper;
 
 
 public abstract class PersistentQueueBase implements QueueLifecycle {
@@ -35,15 +37,17 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
     private final Executor executor;
     private final String svcName;
     private final long sleepTimeMs;
-
     private boolean isProcessingEvents;
     private int curActiveThreads;
 
+    protected final ObjectMapper objectMapper;
+    
     public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
         this.executor = executor;
         this.nbThreads = nbThreads;
         this.svcName = svcName;
         this.sleepTimeMs = config.getSleepTimeMs();
+        this.objectMapper = new ObjectMapper();        
         this.isProcessingEvents = false;
         this.curActiveThreads = 0;
     }
@@ -154,6 +158,18 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
             curActiveThreads = 0;
         }
     }
+    
+    protected <T> T deserializeEvent(final String className, final String json) {
+        try {
+            final Class<?> claz = Class.forName(className);
+            return (T) objectMapper.readValue(json, claz);
+        } catch (Exception e) {
+            log.error(String.format("Failed to deserialize json object %s for class %s", json, className), e);
+            return null;
+        }
+    }
+
+
 
 
     @Override
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 9ea7c1a..c61cd30 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -102,7 +102,8 @@ CREATE TABLE notifications (
     record_id int(11) unsigned NOT NULL AUTO_INCREMENT,
     id char(36) NOT NULL,
     created_date datetime NOT NULL,
-	notification_key varchar(256) NOT NULL,
+    class_name varchar(256) NOT NULL,
+	notification_key varchar(2048) NOT NULL,
 	creating_owner char(50) NOT NULL,
     effective_date datetime NOT NULL,
     queue_name char(64) NOT NULL,
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 0731adc..13ff029 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -4,6 +4,7 @@ getReadyNotifications() ::= <<
     select
       record_id
       , id
+      , class_name
       , notification_key
       , created_date
       , creating_owner
@@ -62,6 +63,7 @@ removeNotificationsByKey() ::= <<
 insertNotification() ::= <<
     insert into notifications (
       id
+      , class_name
       , notification_key
       , created_date
       , creating_owner
@@ -72,6 +74,7 @@ insertNotification() ::= <<
       , processing_state
     ) values (
       :id
+      , :className
       , :notificationKey
       , :createdDate
       , :creatingOwner
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 4b5ec17..d7141d3 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -102,7 +102,7 @@ public class TestNotificationSqlDao {
 
         final String notificationKey = UUID.randomUUID().toString();
         final DateTime effDt = new DateTime();
-        final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey, effDt);
+        final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey.getClass().getName(), notificationKey, effDt);
         dao.insertNotification(notif);
 
         Thread.sleep(1000);
@@ -149,6 +149,7 @@ public class TestNotificationSqlDao {
                 final Notification res = handle.createQuery("   select" +
                                                                     " record_id " +
                                                                     ", id" +
+                                                                    ", class_name" +                                                                    
                                                                     ", notification_key" +
                                                                     ", created_date" +
                                                                     ", creating_owner" +
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 258e6da..5e7d607 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -16,6 +16,7 @@
 
 package com.ning.billing.util.notificationq;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -25,6 +26,7 @@ import java.util.TreeSet;
 import org.joda.time.DateTime;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.ning.billing.config.NotificationConfig;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
@@ -33,6 +35,8 @@ import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueue
 public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
     private final TreeSet<Notification> notifications;
 
+    ObjectMapper objectMapper = new ObjectMapper();
+    
     public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
         super(clock, svcName, queueName, handler, config);
         notifications = new TreeSet<Notification>(new Comparator<Notification>() {
@@ -48,8 +52,9 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     }
 
     @Override
-    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) {
-        final Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.toString(), futureNotificationTime);
+    public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException  {
+        final String json = objectMapper.writeValueAsString(notificationKey);
+        final Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.getClass().getName(), json, futureNotificationTime);
         synchronized (notifications) {
             notifications.add(notification);
         }
@@ -58,7 +63,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     @Override
     public void recordFutureNotificationFromTransaction(
             final Transmogrifier transactionalDao, final DateTime futureNotificationTime,
-            final NotificationKey notificationKey) {
+            final NotificationKey notificationKey) throws IOException  {
         recordFutureNotification(futureNotificationTime, notificationKey);
     }
 
@@ -94,8 +99,12 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
 
         result = readyNotifications.size();
         for (final Notification cur : readyNotifications) {
-            handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
-            final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), hostname, hostname, "MockQueue", clock.getUTCNow().plus(CLAIM_TIME_MS), PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+            
+            
+            NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey()); 
+            handler.handleReadyNotification(key, cur.getEffectiveDate());
+            final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), hostname, hostname, "MockQueue", clock.getUTCNow().plus(CLAIM_TIME_MS), PersistentQueueEntryLifecycleState.PROCESSED,
+                    cur.getNotificationKeyClass(), cur.getNotificationKey(), cur.getEffectiveDate());
             oldNotifications.add(cur);
             processedNotifications.add(processedNotification);
         }
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index d972537..11db32e 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -40,6 +40,8 @@ import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.inject.AbstractModule;
@@ -84,6 +86,28 @@ public class TestNotificationQueue {
         helper.initDb(testDdl);
     }
 
+    
+    private final static class TestNotificationKey implements NotificationKey, Comparable<TestNotificationKey> {
+        
+        private final String value;
+
+        @JsonCreator
+        public TestNotificationKey(@JsonProperty("value") String value) {
+            super();
+            this.value = value;
+        }
+
+        public String getValue() {
+            return value;
+        }
+
+        @Override
+        public int compareTo(TestNotificationKey arg0) {
+            return value.compareTo(arg0.value);
+        }
+    }
+    
+    
     @BeforeSuite(groups = "slow")
     public void setup() throws Exception {
         startMysql();
@@ -124,21 +148,21 @@ public class TestNotificationQueue {
     @Test(groups = {"slow"}, enabled = true)
     public void testSimpleNotification() throws Exception {
 
-        final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+        final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
 
         final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final String notificationKey, final DateTime eventDateTime) {
-                                                                                    synchronized (expectedNotifications) {
-                                                                                        log.info("Handler received key: " + notificationKey);
+                new NotificationQueueHandler() {
+            @Override
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
+                synchronized (expectedNotifications) {
+                    log.info("Handler received key: " + notificationKey);
 
-                                                                                        expectedNotifications.put(notificationKey, Boolean.TRUE);
-                                                                                        expectedNotifications.notify();
-                                                                                    }
-                                                                                }
-                                                                            },
-                                                                            getNotificationConfig(false, 100, 1, 10000));
+                    expectedNotifications.put(notificationKey, Boolean.TRUE);
+                    expectedNotifications.notify();
+                }
+            }
+        },
+        getNotificationConfig(false, 100, 1, 10000));
 
 
         queue.startQueue();
@@ -147,24 +171,20 @@ public class TestNotificationQueue {
         final DummyObject obj = new DummyObject("foo", key);
         final DateTime now = new DateTime();
         final DateTime readyTime = now.plusMillis(2000);
-        final NotificationKey notificationKey = new NotificationKey() {
-            @Override
-            public String toString() {
-                return key.toString();
-            }
-        };
-        expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+        final NotificationKey notificationKey = new TestNotificationKey(key.toString());
+
+        expectedNotifications.put(notificationKey, Boolean.FALSE);
 
 
         // Insert dummy to be processed in 2 sec'
         dao.inTransaction(new Transaction<Void, DummySqlTest>() {
             @Override
             public Void inTransaction(final DummySqlTest transactional,
-                                      final TransactionStatus status) throws Exception {
+                    final TransactionStatus status) throws Exception {
 
                 transactional.insertDummy(obj);
                 queue.recordFutureNotificationFromTransaction(transactional,
-                                                              readyTime, notificationKey);
+                        readyTime, notificationKey);
                 log.info("Posted key: " + notificationKey);
 
                 return null;
@@ -178,29 +198,29 @@ public class TestNotificationQueue {
         await().atMost(1, MINUTES).until(new Callable<Boolean>() {
             @Override
             public Boolean call() throws Exception {
-                return expectedNotifications.get(notificationKey.toString());
+                return expectedNotifications.get(notificationKey);
             }
         });
 
         queue.stopQueue();
-        Assert.assertTrue(expectedNotifications.get(notificationKey.toString()));
+        Assert.assertTrue(expectedNotifications.get(notificationKey));
     }
 
     @Test(groups = "slow")
     public void testManyNotifications() throws InterruptedException {
-        final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+        final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
 
         final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final String notificationKey, final DateTime eventDateTime) {
-                                                                                    synchronized (expectedNotifications) {
-                                                                                        expectedNotifications.put(notificationKey, Boolean.TRUE);
-                                                                                        expectedNotifications.notify();
-                                                                                    }
-                                                                                }
-                                                                            },
-                                                                            getNotificationConfig(false, 100, 10, 10000));
+                new NotificationQueueHandler() {
+            @Override
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
+                synchronized (expectedNotifications) {
+                    expectedNotifications.put(notificationKey, Boolean.TRUE);
+                    expectedNotifications.notify();
+                }
+            }
+        },
+        getNotificationConfig(false, 100, 10, 10000));
 
 
         queue.startQueue();
@@ -215,22 +235,17 @@ public class TestNotificationQueue {
             final DummyObject obj = new DummyObject("foo", key);
             final int currentIteration = i;
 
-            final NotificationKey notificationKey = new NotificationKey() {
-                @Override
-                public String toString() {
-                    return key.toString();
-                }
-            };
-            expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+            final NotificationKey notificationKey = new TestNotificationKey(key.toString());
+            expectedNotifications.put(notificationKey, Boolean.FALSE);
 
             dao.inTransaction(new Transaction<Void, DummySqlTest>() {
                 @Override
                 public Void inTransaction(final DummySqlTest transactional,
-                                          final TransactionStatus status) throws Exception {
+                        final TransactionStatus status) throws Exception {
 
                     transactional.insertDummy(obj);
                     queue.recordFutureNotificationFromTransaction(transactional,
-                                                                  now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
+                            now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
                     return null;
                 }
             });
@@ -266,6 +281,12 @@ public class TestNotificationQueue {
         } while (nbTry-- > 0);
 
         queue.stopQueue();
+        log.info("STEPH GOT SIZE " +  Collections2.filter(expectedNotifications.values(), new Predicate<Boolean>() {
+            @Override
+            public boolean apply(final Boolean input) {
+                return input;
+            }
+        }).size());
         assertEquals(success, true);
     }
 
@@ -278,8 +299,8 @@ public class TestNotificationQueue {
     @Test(groups = {"slow"}, enabled = true)
     public void testMultipleHandlerNotification() throws Exception {
 
-        final Map<String, Boolean> expectedNotificationsFred = new TreeMap<String, Boolean>();
-        final Map<String, Boolean> expectedNotificationsBarney = new TreeMap<String, Boolean>();
+        final Map<NotificationKey, Boolean> expectedNotificationsFred = new TreeMap<NotificationKey, Boolean>();
+        final Map<NotificationKey, Boolean> expectedNotificationsBarney = new TreeMap<NotificationKey, Boolean>();
 
         final NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock);
 
@@ -298,23 +319,23 @@ public class TestNotificationQueue {
 
         final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
             @Override
-            public void handleReadyNotification(final String notificationKey, final DateTime eventDateTime) {
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
                 log.info("Fred received key: " + notificationKey);
                 expectedNotificationsFred.put(notificationKey, Boolean.TRUE);
                 eventsReceived++;
             }
         },
-                                                                                             config);
+        config);
 
         final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
             @Override
-            public void handleReadyNotification(final String notificationKey, final DateTime eventDateTime) {
+            public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
                 log.info("Barney received key: " + notificationKey);
                 expectedNotificationsBarney.put(notificationKey, Boolean.TRUE);
                 eventsReceived++;
             }
         },
-                                                                                               config);
+        config);
 
         queueFred.startQueue();
         //		We don't start Barney so it can never pick up notifications
@@ -324,37 +345,27 @@ public class TestNotificationQueue {
         final DummyObject obj = new DummyObject("foo", key);
         final DateTime now = new DateTime();
         final DateTime readyTime = now.plusMillis(2000);
-        final NotificationKey notificationKeyFred = new NotificationKey() {
-            @Override
-            public String toString() {
-                return "Fred";
-            }
-        };
+        final NotificationKey notificationKeyFred = new TestNotificationKey("Fred");
 
 
-        final NotificationKey notificationKeyBarney = new NotificationKey() {
-            @Override
-            public String toString() {
-                return "Barney";
-            }
-        };
+        final NotificationKey notificationKeyBarney = new TestNotificationKey("Barney"); 
 
-        expectedNotificationsFred.put(notificationKeyFred.toString(), Boolean.FALSE);
-        expectedNotificationsFred.put(notificationKeyBarney.toString(), Boolean.FALSE);
+        expectedNotificationsFred.put(notificationKeyFred, Boolean.FALSE);
+        expectedNotificationsFred.put(notificationKeyBarney, Boolean.FALSE);
 
 
         // Insert dummy to be processed in 2 sec'
         dao.inTransaction(new Transaction<Void, DummySqlTest>() {
             @Override
             public Void inTransaction(final DummySqlTest transactional,
-                                      final TransactionStatus status) throws Exception {
+                    final TransactionStatus status) throws Exception {
 
                 transactional.insertDummy(obj);
                 queueFred.recordFutureNotificationFromTransaction(transactional,
-                                                                  readyTime, notificationKeyFred);
+                        readyTime, notificationKeyFred);
                 log.info("posted key: " + notificationKeyFred.toString());
                 queueBarney.recordFutureNotificationFromTransaction(transactional,
-                                                                    readyTime, notificationKeyBarney);
+                        readyTime, notificationKeyBarney);
                 log.info("posted key: " + notificationKeyBarney.toString());
 
                 return null;
@@ -379,12 +390,12 @@ public class TestNotificationQueue {
         }
 
         queueFred.stopQueue();
-        Assert.assertTrue(expectedNotificationsFred.get(notificationKeyFred.toString()));
-        Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney.toString()));
+        Assert.assertTrue(expectedNotificationsFred.get(notificationKeyFred));
+        Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney));
     }
 
     NotificationConfig getNotificationConfig(final boolean off,
-                                             final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
+            final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
         return new NotificationConfig() {
             @Override
             public boolean isNotificationProcessingOff() {
@@ -403,31 +414,21 @@ public class TestNotificationQueue {
     public void testRemoveNotifications() throws InterruptedException {
 
         final UUID key = UUID.randomUUID();
-        final NotificationKey notificationKey = new NotificationKey() {
-            @Override
-            public String toString() {
-                return key.toString();
-            }
-        };
+        final NotificationKey notificationKey = new TestNotificationKey(key.toString());
         final UUID key2 = UUID.randomUUID();
-        final NotificationKey notificationKey2 = new NotificationKey() {
-            @Override
-            public String toString() {
-                return key2.toString();
-            }
-        };
+        final NotificationKey notificationKey2 = new TestNotificationKey(key2.toString());
 
         final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
-                                                                            new NotificationQueueHandler() {
-                                                                                @Override
-                                                                                public void handleReadyNotification(final String key, final DateTime eventDateTime) {
-                                                                                    if (key.equals(notificationKey) || key.equals(notificationKey2)) { //ignore stray events from other tests
-                                                                                        log.info("Received notification with key: " + notificationKey);
-                                                                                        eventsReceived++;
-                                                                                    }
-                                                                                }
-                                                                            },
-                                                                            getNotificationConfig(false, 100, 10, 10000));
+                new NotificationQueueHandler() {
+            @Override
+            public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime) {
+                if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
+                    log.info("Received notification with key: " + notificationKey);
+                    eventsReceived++;
+                }
+            }
+        },
+        getNotificationConfig(false, 100, 10, 10000));
 
 
         queue.startQueue();
@@ -440,14 +441,14 @@ public class TestNotificationQueue {
         dao.inTransaction(new Transaction<Void, DummySqlTest>() {
             @Override
             public Void inTransaction(final DummySqlTest transactional,
-                                      final TransactionStatus status) throws Exception {
+                    final TransactionStatus status) throws Exception {
 
                 queue.recordFutureNotificationFromTransaction(transactional,
-                                                              start.plus(nextReadyTimeIncrementMs), notificationKey);
+                        start.plus(nextReadyTimeIncrementMs), notificationKey);
                 queue.recordFutureNotificationFromTransaction(transactional,
-                                                              start.plus(2 * nextReadyTimeIncrementMs), notificationKey);
+                        start.plus(2 * nextReadyTimeIncrementMs), notificationKey);
                 queue.recordFutureNotificationFromTransaction(transactional,
-                                                              start.plus(3 * nextReadyTimeIncrementMs), notificationKey2);
+                        start.plus(3 * nextReadyTimeIncrementMs), notificationKey2);
                 return null;
             }
         });