killbill-aplcache

Changes

entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java 253(+0 -253)

entitlement/src/main/java/com/ning/billing/entitlement/engine/core/DefaultApiEventProcessor.java 63(+0 -63)

entitlement/src/test/java/com/ning/billing/entitlement/engine/core/MockApiEventProcessorMemory.java 63(+0 -63)

pom.xml 1(+1 -0)

util/pom.xml 15(+14 -1)

util/src/main/java/com/ning/billing/util/notification/NotificationSystem.java 51(+0 -51)

Details

diff --git a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
index 0fdb5d3..f501267 100644
--- a/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/DefaultAccountDao.java
@@ -18,10 +18,11 @@ package com.ning.billing.account.dao;
 
 import java.util.List;
 import java.util.UUID;
+
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.exceptions.TransactionFailedException;
+
 import com.google.inject.Inject;
 import com.ning.billing.ErrorCode;
 import com.ning.billing.account.api.Account;
@@ -157,7 +158,7 @@ public class DefaultAccountDao implements AccountDao {
             }
         }
     }
-    
+
     @Override
 	public void deleteByKey(final String externalKey) throws AccountApiException {
     	try {
@@ -204,7 +205,7 @@ public class DefaultAccountDao implements AccountDao {
         }
     }
 
-    private void saveCustomFieldsFromWithinTransaction(final Account account, final AccountSqlDao transactionalDao, final boolean isCreation) {
+    private void saveTagsFromWithinTransaction(final Account account, final AccountSqlDao transactionalDao, final boolean isCreation) {
         String accountId = account.getId().toString();
         String objectType = account.getObjectName();
 
@@ -219,7 +220,7 @@ public class DefaultAccountDao implements AccountDao {
         }
     }
 
-    private void saveTagsFromWithinTransaction(final Account account, final AccountSqlDao transactionalDao, final boolean isCreation) {
+    private void saveCustomFieldsFromWithinTransaction(final Account account, final AccountSqlDao transactionalDao, final boolean isCreation) {
         String accountId = account.getId().toString();
         String objectType = account.getObjectName();
 
@@ -234,5 +235,5 @@ public class DefaultAccountDao implements AccountDao {
         }
     }
 
-	
+
 }
diff --git a/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java b/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java
index d4fb56a..48663fa 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/AnalyticsTestModule.java
@@ -26,6 +26,7 @@ import com.ning.billing.dbi.MysqlTestingHelper;
 import com.ning.billing.entitlement.glue.EntitlementModule;
 import com.ning.billing.util.glue.ClockModule;
 import com.ning.billing.util.glue.EventBusModule;
+import com.ning.billing.util.glue.NotificationQueueModule;
 import com.ning.billing.util.glue.TagStoreModule;
 
 public class AnalyticsTestModule extends AnalyticsModule
@@ -42,6 +43,7 @@ public class AnalyticsTestModule extends AnalyticsModule
         install(new EntitlementModule());
         install(new ClockModule());
         install(new TagStoreModule());
+        install(new NotificationQueueModule());
 
         // Install the Dao layer
         final MysqlTestingHelper helper = new MysqlTestingHelper();
diff --git a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
index 94b28e5..d826d5c 100644
--- a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
+++ b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
@@ -33,11 +33,6 @@ public interface InvoiceUserApi {
 
     public void notifyOfPaymentAttempt(InvoicePayment invoicePayment);
 
-//    public void paymentAttemptFailed(UUID invoiceId, UUID paymentId, DateTime paymentAttemptDate);
-//
-//    public void paymentAttemptSuccessful(UUID invoiceId, BigDecimal amount, Currency currency,
-//                                         UUID paymentId, DateTime paymentDate);
-    
     public BigDecimal getAccountBalance(UUID accountId);
 
 }
diff --git a/api/src/main/java/com/ning/billing/payment/api/CreditCardPaymentMethodInfo.java b/api/src/main/java/com/ning/billing/payment/api/CreditCardPaymentMethodInfo.java
index 75a4ab2..a9e25dc 100644
--- a/api/src/main/java/com/ning/billing/payment/api/CreditCardPaymentMethodInfo.java
+++ b/api/src/main/java/com/ning/billing/payment/api/CreditCardPaymentMethodInfo.java
@@ -23,6 +23,12 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
         private String cardType;
         private String expirationDate;
         private String maskNumber;
+        private String cardAddress1;
+        private String cardAddress2;
+        private String cardCity;
+        private String cardState;
+        private String cardPostalCode;
+        private String cardCountry;
 
         public Builder() {
             super(Builder.class);
@@ -30,6 +36,16 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
 
         public Builder(CreditCardPaymentMethodInfo src) {
             super(Builder.class, src);
+            this.cardHolderName = src.cardHolderName;
+            this.cardType = src.cardType;
+            this.expirationDate = src.expirationDate;
+            this.cardAddress1 = src.cardAddress1;
+            this.cardAddress2 = src.cardAddress2;
+            this.cardCity = src.cardCity;
+            this.cardState = src.cardState;
+            this.cardPostalCode = src.cardPostalCode;
+            this.cardCountry = src.cardCountry;
+            this.maskNumber = src.maskNumber;
         }
 
         public Builder setCardHolderName(String cardHolderName) {
@@ -47,13 +63,55 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
             return this;
         }
 
+        public Builder setCardAddress1(String creditCardAddress1) {
+            this.cardAddress1 = creditCardAddress1;
+            return this;
+        }
+
+        public Builder setCardAddress2(String creditCardAddress2) {
+            this.cardAddress2 = creditCardAddress2;
+            return this;
+        }
+
+        public Builder setCardCity(String creditCardCity) {
+            this.cardCity = creditCardCity;
+            return this;
+        }
+
+        public Builder setCardState(String creditCardState) {
+            this.cardState = creditCardState;
+            return this;
+        }
+
+        public Builder setCardPostalCode(String creditCardPostalCode) {
+            this.cardPostalCode = creditCardPostalCode;
+            return this;
+        }
+
+        public Builder setCardCountry(String creditCardCountry) {
+            this.cardCountry = creditCardCountry;
+            return this;
+        }
+
         public Builder setMaskNumber(String maskNumber) {
             this.maskNumber = maskNumber;
             return this;
         }
 
         public CreditCardPaymentMethodInfo build() {
-            return new CreditCardPaymentMethodInfo(id, accountId, defaultMethod, cardHolderName, cardType, expirationDate, maskNumber);
+            return new CreditCardPaymentMethodInfo(id,
+                                                   accountId,
+                                                   defaultMethod,
+                                                   cardHolderName,
+                                                   cardType,
+                                                   expirationDate,
+                                                   maskNumber,
+                                                   cardAddress1,
+                                                   cardAddress2,
+                                                   cardCity,
+                                                   cardState,
+                                                   cardPostalCode,
+                                                   cardCountry);
         }
     }
 
@@ -61,6 +119,12 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
     private final String cardType;
     private final String expirationDate;
     private final String maskNumber;
+    private final String cardAddress1;
+    private final String cardAddress2;
+    private final String cardCity;
+    private final String cardState;
+    private final String cardPostalCode;
+    private final String cardCountry;
 
     public CreditCardPaymentMethodInfo(String id,
                                    String accountId,
@@ -68,12 +132,25 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
                                    String cardHolderName,
                                    String cardType,
                                    String expirationDate,
-                                   String maskNumber) {
+                                   String maskNumber,
+                                   String cardAddress1,
+                                   String cardAddress2,
+                                   String cardCity,
+                                   String cardState,
+                                   String cardPostalCode,
+                                   String cardCountry) {
+
       super(id, accountId, defaultMethod, "CreditCard");
       this.cardHolderName = cardHolderName;
       this.cardType = cardType;
       this.expirationDate = expirationDate;
       this.maskNumber = maskNumber;
+      this.cardAddress1 = cardAddress1;
+      this.cardAddress2 = cardAddress2;
+      this.cardCity = cardCity;
+      this.cardState = cardState;
+      this.cardPostalCode = cardPostalCode;
+      this.cardCountry = cardCountry;
     }
 
     public String getCardHolderName() {
@@ -84,6 +161,30 @@ public final class CreditCardPaymentMethodInfo extends PaymentMethodInfo {
       return cardType;
     }
 
+    public String getCardAddress1() {
+        return cardAddress1;
+    }
+
+    public String getCardAddress2() {
+        return cardAddress2;
+    }
+
+    public String getCardCity() {
+        return cardCity;
+    }
+
+    public String getCardState() {
+        return cardState;
+    }
+
+    public String getCardPostalCode() {
+        return cardPostalCode;
+    }
+
+    public String getCardCountry() {
+        return cardCountry;
+    }
+
     public String getExpirationDate() {
       return expirationDate;
     }
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java b/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
index 8e665ed..189e87c 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentApi.java
@@ -23,18 +23,19 @@ import javax.annotation.Nullable;
 import com.ning.billing.account.api.Account;
 
 public interface PaymentApi {
-    Either<PaymentError, PaymentMethodInfo> getPaymentMethod(@Nullable String accountKey, String paymentMethodId);
 
-    Either<PaymentError, List<PaymentMethodInfo>> getPaymentMethods(String accountKey);
+    Either<PaymentError, Void> updatePaymentGateway(String accountKey);
 
-    Either<PaymentError, Void> deletePaymentMethod(String accountKey, String paymentMethodId);
+    Either<PaymentError, PaymentMethodInfo> getPaymentMethod(@Nullable String accountKey, String paymentMethodId);
 
-    Either<PaymentError, Void> updatePaymentGateway(String accountKey);
+    Either<PaymentError, List<PaymentMethodInfo>> getPaymentMethods(String accountKey);
 
-    Either<PaymentError, String> addPaypalPaymentMethod(@Nullable String accountKey, PaypalPaymentMethodInfo paypalPaymentMethod);
+    Either<PaymentError, String> addPaymentMethod(@Nullable String accountKey, PaymentMethodInfo paymentMethod);
 
     Either<PaymentError, PaymentMethodInfo> updatePaymentMethod(String accountKey, PaymentMethodInfo paymentMethodInfo);
 
+    Either<PaymentError, Void> deletePaymentMethod(String accountKey, String paymentMethodId);
+
     List<Either<PaymentError, PaymentInfo>> createPayment(String accountKey, List<String> invoiceIds);
     List<Either<PaymentError, PaymentInfo>> createPayment(Account account, List<String> invoiceIds);
 
@@ -44,7 +45,7 @@ public interface PaymentApi {
 
     Either<PaymentError, String> createPaymentProviderAccount(Account account);
 
-    Either<PaymentError, PaymentProviderAccount> updatePaymentProviderAccount(Account account);
+    Either<PaymentError, Void> updatePaymentProviderAccountContact(Account account);
 
     PaymentAttempt getPaymentAttemptForPaymentId(String id);
 
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaymentProviderAccount.java b/api/src/main/java/com/ning/billing/payment/api/PaymentProviderAccount.java
index 553d6a2..47cad39 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaymentProviderAccount.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaymentProviderAccount.java
@@ -20,18 +20,18 @@ import com.google.common.base.Objects;
 
 public class PaymentProviderAccount {
     private final String id;
-    private final String accountNumber;
+    private final String accountKey;
     private final String accountName;
     private final String phoneNumber;
     private final String defaultPaymentMethodId;
 
     public PaymentProviderAccount(String id,
-                                  String accountNumber,
+                                  String accountKey,
                                   String accountName,
                                   String phoneNumber,
                                   String defaultPaymentMethodId) {
         this.id = id;
-        this.accountNumber = accountNumber;
+        this.accountKey = accountKey;
         this.accountName = accountName;
         this.phoneNumber = phoneNumber;
         this.defaultPaymentMethodId = defaultPaymentMethodId;
@@ -41,8 +41,8 @@ public class PaymentProviderAccount {
         return id;
     }
 
-    public String getAccountNumber() {
-        return accountNumber;
+    public String getAccountKey() {
+        return accountKey;
     }
 
     public String getAccountName() {
@@ -59,18 +59,27 @@ public class PaymentProviderAccount {
 
     public static class Builder {
         private String id;
-        private String accountNumber;
+        private String accountKey;
         private String accountName;
         private String phoneNumber;
         private String defaultPaymentMethodId;
 
+        public Builder copyFrom(PaymentProviderAccount src) {
+            this.id = src.getId();
+            this.accountKey = src.getAccountKey();
+            this.accountName = src.getAccountName();
+            this.phoneNumber = src.getPhoneNumber();
+            this.defaultPaymentMethodId = src.getDefaultPaymentMethodId();
+            return this;
+        }
+
         public Builder setId(String id) {
             this.id = id;
             return this;
         }
 
-        public Builder setAccountNumber(String accountNumber) {
-            this.accountNumber = accountNumber;
+        public Builder setAccountKey(String accountKey) {
+            this.accountKey = accountKey;
             return this;
         }
 
@@ -90,7 +99,7 @@ public class PaymentProviderAccount {
         }
 
         public PaymentProviderAccount build() {
-            return new PaymentProviderAccount(id, accountNumber, accountName, phoneNumber, defaultPaymentMethodId);
+            return new PaymentProviderAccount(id, accountKey, accountName, phoneNumber, defaultPaymentMethodId);
         }
 
     }
@@ -98,7 +107,7 @@ public class PaymentProviderAccount {
     @Override
     public int hashCode() {
         return Objects.hashCode(id,
-                                accountNumber,
+                                accountKey,
                                 accountName,
                                 phoneNumber,
                                 defaultPaymentMethodId);
@@ -113,7 +122,8 @@ public class PaymentProviderAccount {
             }
             else {
                 return Objects.equal(id, other.id) &&
-                       Objects.equal(accountNumber, other.accountNumber) &&
+                       Objects.equal(accountKey, other.accountKey) &&
+                       Objects.equal(accountName, other.accountName) &&
                        Objects.equal(phoneNumber, other.phoneNumber) &&
                        Objects.equal(defaultPaymentMethodId, other.defaultPaymentMethodId);
             }
@@ -121,4 +131,9 @@ public class PaymentProviderAccount {
         return false;
     }
 
+    @Override
+    public String toString() {
+        return "PaymentProviderAccount [id=" + id + ", accountKey=" + accountKey + ", accountName=" + accountName + ", phoneNumber=" + phoneNumber + ", defaultPaymentMethodId=" + defaultPaymentMethodId + "]";
+    }
+
 }
diff --git a/api/src/main/java/com/ning/billing/payment/api/PaypalPaymentMethodInfo.java b/api/src/main/java/com/ning/billing/payment/api/PaypalPaymentMethodInfo.java
index 65c36e9..0977071 100644
--- a/api/src/main/java/com/ning/billing/payment/api/PaypalPaymentMethodInfo.java
+++ b/api/src/main/java/com/ning/billing/payment/api/PaypalPaymentMethodInfo.java
@@ -20,6 +20,8 @@ import com.google.common.base.Strings;
 
 
 public final class PaypalPaymentMethodInfo extends PaymentMethodInfo {
+    public static final String TYPE = "PayPal";
+
     public static final class Builder extends BuilderBase<PaypalPaymentMethodInfo, Builder> {
         private String baid;
         private String email;
@@ -30,6 +32,8 @@ public final class PaypalPaymentMethodInfo extends PaymentMethodInfo {
 
         public Builder(PaypalPaymentMethodInfo src) {
             super(Builder.class, src);
+            this.baid = src.baid;
+            this.email = src.email;
         }
 
         public Builder setBaid(String baid) {
@@ -55,10 +59,10 @@ public final class PaypalPaymentMethodInfo extends PaymentMethodInfo {
                                    Boolean defaultMethod,
                                    String baid,
                                    String email) {
-        super(id, accountId, defaultMethod, "PayPal");
+        super(id, accountId, defaultMethod, TYPE);
 
-        if (Strings.isNullOrEmpty(accountId) || Strings.isNullOrEmpty(baid) || Strings.isNullOrEmpty(email)) {
-            throw new IllegalArgumentException("accountId, baid and email should be present");
+        if (Strings.isNullOrEmpty(baid) || Strings.isNullOrEmpty(email)) {
+            throw new IllegalArgumentException("baid and email should be present");
         }
 
         this.baid = baid;
@@ -72,4 +76,10 @@ public final class PaypalPaymentMethodInfo extends PaymentMethodInfo {
     public String getEmail() {
         return email;
     }
+
+    @Override
+    public String toString() {
+        return "PaypalPaymentMethodInfo [baid=" + baid + ", email=" + email + "]";
+    }
+
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
index c2f5878..19b851d 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
@@ -18,7 +18,11 @@ package com.ning.billing.entitlement.api.test;
 
 import com.google.inject.Inject;
 import com.ning.billing.config.EntitlementConfig;
-import com.ning.billing.entitlement.engine.core.EventNotifier;
+import com.ning.billing.entitlement.engine.core.Engine;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,20 +32,32 @@ public class DefaultEntitlementTestApi implements EntitlementTestApi {
 
     private final static Logger log = LoggerFactory.getLogger(DefaultEntitlementTestApi.class);
 
-    private final EventNotifier apiEventProcessor;
     private final EntitlementConfig config;
+    private final NotificationQueueService notificationQueueService;
 
     @Inject
-    public DefaultEntitlementTestApi(EventNotifier apiEventProcessor, EntitlementConfig config) {
-        this.apiEventProcessor = apiEventProcessor;
+    public DefaultEntitlementTestApi(NotificationQueueService notificationQueueService, EntitlementConfig config) {
         this.config = config;
+        this.notificationQueueService = notificationQueueService;
     }
 
     @Override
     public void doProcessReadyEvents(UUID [] subscriptionsIds, Boolean recursive, Boolean oneEventOnly) {
         if (config.isEventProcessingOff()) {
             log.warn("Running event processing loop");
-            apiEventProcessor.processAllReadyEvents(subscriptionsIds, recursive, oneEventOnly);
+            NotificationQueue queue = getNotificationQueue();
+            queue.processReadyNotification();
+
+        }
+    }
+
+    private NotificationQueue getNotificationQueue() {
+        try {
+            NotificationQueue subscritionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+                    Engine.NOTIFICATION_QUEUE_NAME);
+            return subscritionEventQueue;
+        } catch (NoSuchNotificationQueue e) {
+            throw new RuntimeException(e);
         }
     }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 3eb5dd3..7bd1124 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -16,6 +16,9 @@
 
 package com.ning.billing.entitlement.engine.core;
 
+import java.util.UUID;
+
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,10 +48,16 @@ import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.eventbus.EventBus;
 import com.ning.billing.util.eventbus.EventBus.EventBusException;
+import com.ning.billing.util.notificationq.NotificationConfig;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotficationQueueAlreadyExists;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 
 public class Engine implements EventListener, EntitlementService {
 
-    private static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
+    public static final String NOTIFICATION_QUEUE_NAME = "subscription-events";
+    public static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
 
     private final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
     private final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
@@ -58,33 +67,36 @@ public class Engine implements EventListener, EntitlementService {
 
     private final Clock clock;
     private final EntitlementDao dao;
-    private final EventNotifier apiEventProcessor;
     private final PlanAligner planAligner;
     private final EntitlementUserApi userApi;
     private final EntitlementBillingApi billingApi;
     private final EntitlementTestApi testApi;
     private final EntitlementMigrationApi migrationApi;
     private final EventBus eventBus;
+    private final EntitlementConfig config;
+    private final NotificationQueueService notificationQueueService;
 
     private boolean startedNotificationThread;
+    private boolean stoppedNotificationThread;
+    private NotificationQueue subscritionEventQueue;
 
     @Inject
-    public Engine(Clock clock, EntitlementDao dao, EventNotifier apiEventProcessor,
-            PlanAligner planAligner, EntitlementConfig config, DefaultEntitlementUserApi userApi,
+    public Engine(Clock clock, EntitlementDao dao, PlanAligner planAligner,
+            EntitlementConfig config, DefaultEntitlementUserApi userApi,
             DefaultEntitlementBillingApi billingApi, DefaultEntitlementTestApi testApi,
-            DefaultEntitlementMigrationApi migrationApi, EventBus eventBus) {
+            DefaultEntitlementMigrationApi migrationApi, EventBus eventBus,
+            NotificationQueueService notificationQueueService) {
         super();
         this.clock = clock;
         this.dao = dao;
-        this.apiEventProcessor = apiEventProcessor;
         this.planAligner = planAligner;
         this.userApi = userApi;
         this.testApi = testApi;
         this.billingApi = billingApi;
         this.migrationApi = migrationApi;
+        this.config = config;
         this.eventBus = eventBus;
-
-        this.startedNotificationThread = false;
+        this.notificationQueueService = notificationQueueService;
     }
 
     @Override
@@ -92,20 +104,75 @@ public class Engine implements EventListener, EntitlementService {
         return ENTITLEMENT_SERVICE_NAME;
     }
 
-
     @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
     public void initialize() {
+
+        try {
+            this.stoppedNotificationThread = false;
+            this.startedNotificationThread = false;
+            subscritionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
+                    NOTIFICATION_QUEUE_NAME,
+                    new NotificationQueueHandler() {
+                @Override
+                public void handleReadyNotification(String notificationKey) {
+                    EntitlementEvent event = dao.getEventById(UUID.fromString(notificationKey));
+                    if (event == null) {
+                        log.warn("Failed to extract event for notification key {}", notificationKey);
+                    } else {
+                        processEventReady(event);
+                    }
+                }
+
+                @Override
+                public void completedQueueStop() {
+                    synchronized (this) {
+                        stoppedNotificationThread = true;
+                        this.notifyAll();
+                    }
+                }
+                @Override
+                public void completedQueueStart() {
+                    synchronized (this) {
+                        startedNotificationThread = true;
+                        this.notifyAll();
+                    }
+                }
+            },
+            new NotificationConfig() {
+                @Override
+                public boolean isNotificationProcessingOff() {
+                    return config.isEventProcessingOff();
+                }
+                @Override
+                public long getNotificationSleepTimeMs() {
+                    return config.getNotificationSleepTimeMs();
+                }
+                @Override
+                public int getDaoMaxReadyEvents() {
+                    return config.getDaoMaxReadyEvents();
+                }
+                @Override
+                public long getDaoClaimTimeMs() {
+                    return config.getDaoMaxReadyEvents();
+                }
+            });
+        } catch (NotficationQueueAlreadyExists e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
     public void start() {
-        apiEventProcessor.startNotifications(this);
+        subscritionEventQueue.startQueue();
         waitForNotificationStartCompletion();
     }
 
     @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
     public void stop() {
-        apiEventProcessor.stopNotifications();
+        if (subscritionEventQueue != null) {
+            subscritionEventQueue.stopQueue();
+            waitForNotificationStopCompletion();
+        }
         startedNotificationThread = false;
     }
 
@@ -133,6 +200,9 @@ public class Engine implements EventListener, EntitlementService {
 
     @Override
     public void processEventReady(EntitlementEvent event) {
+        if (!event.isActive()) {
+            return;
+        }
         SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(event.getSubscriptionId());
         if (subscription == null) {
             log.warn("Failed to retrieve subscription for id %s", event.getSubscriptionId());
@@ -148,23 +218,20 @@ public class Engine implements EventListener, EntitlementService {
         }
     }
 
-    //
-    // We want to ensure the notification thread is indeed started when we return from start()
-    //
-    @Override
-    public void completedNotificationStart() {
-        synchronized (this) {
-            startedNotificationThread = true;
-            this.notifyAll();
-        }
+    private void waitForNotificationStartCompletion() {
+        waitForNotificationEventCompletion(true);
     }
 
-    private void waitForNotificationStartCompletion() {
+    private void waitForNotificationStopCompletion() {
+        waitForNotificationEventCompletion(false);
+    }
+
+    private void waitForNotificationEventCompletion(boolean startEvent) {
 
         long ini = System.nanoTime();
         synchronized(this) {
             do {
-                if (startedNotificationThread) {
+                if ((startEvent ? startedNotificationThread : stoppedNotificationThread)) {
                     break;
                 }
                 try {
@@ -173,14 +240,18 @@ public class Engine implements EventListener, EntitlementService {
                     Thread.currentThread().interrupt();
                     throw new EntitlementError(e);
                 }
-            } while (!startedNotificationThread &&
+            } while (!(startEvent ? startedNotificationThread : stoppedNotificationThread) &&
                     (System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
 
-            if (!startedNotificationThread) {
-                log.error("Could not start notification thread in %d msec !!!", MAX_NOTIFICATION_THREAD_WAIT_MS);
+            if (!(startEvent ? startedNotificationThread : stoppedNotificationThread)) {
+                log.error("Could not {} notification thread in {} msec !!!",
+                        (startEvent ? "start" : "stop"),
+                        MAX_NOTIFICATION_THREAD_WAIT_MS);
                 throw new EntitlementError("Failed to start service!!");
             }
-            log.info("Notification thread has been started in {} ms", (System.nanoTime() - ini) / NANO_TO_MS);
+            log.info("Notification thread has been {} in {} ms",
+                    (startEvent ? "started" : "stopped"),
+                    (System.nanoTime() - ini) / NANO_TO_MS);
         }
     }
 
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java
index 7a84c51..e9962d8 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java
@@ -24,5 +24,4 @@ public interface EventListener {
 
     public void processEventReady(EntitlementEvent event);
 
-    public void completedNotificationStart();
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
index b118110..ea62b84 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
@@ -54,14 +54,12 @@ public interface EntitlementDao {
     // Event apis
     public void createNextPhaseEvent(UUID subscriptionId, EntitlementEvent nextPhase);
 
+    public EntitlementEvent getEventById(UUID eventId);
+
     public List<EntitlementEvent> getEventsForSubscription(UUID subscriptionId);
 
     public List<EntitlementEvent> getPendingEventsForSubscription(UUID subscriptionId);
 
-    public List<EntitlementEvent> getEventsReady(UUID ownerId, int sequenceId);
-
-    public void clearEventsReady(UUID ownerId, Collection<EntitlementEvent> cleared);
-
     // Subscription creation, cancellation, changePlan apis
     public void createSubscription(SubscriptionData subscription, List<EntitlementEvent> initialEvents);
 
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
index 016f005..72a71d8 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
@@ -16,9 +16,14 @@
 
 package com.ning.billing.entitlement.engine.dao;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
 import com.google.inject.Inject;
 import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.config.EntitlementConfig;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
 import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
@@ -28,20 +33,26 @@ import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionData;
 import com.ning.billing.entitlement.api.user.SubscriptionFactory;
 import com.ning.billing.entitlement.api.user.SubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.engine.core.Engine;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
 import com.ning.billing.entitlement.events.user.ApiEvent;
 import com.ning.billing.entitlement.events.user.ApiEventType;
 import com.ning.billing.entitlement.exceptions.EntitlementError;
-import com.ning.billing.util.Hostname;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
+import org.joda.time.DateTime;
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
 
 public class EntitlementSqlDao implements EntitlementDao {
 
@@ -51,19 +62,17 @@ public class EntitlementSqlDao implements EntitlementDao {
     private final SubscriptionSqlDao subscriptionsDao;
     private final BundleSqlDao bundlesDao;
     private final EventSqlDao eventsDao;
-    private final EntitlementConfig config;
-    private final String hostname;
     private final SubscriptionFactory factory;
+    private final NotificationQueueService notificationQueueService;
 
     @Inject
-    public EntitlementSqlDao(DBI dbi, Clock clock, EntitlementConfig config, SubscriptionFactory factory) {
+    public EntitlementSqlDao(DBI dbi, Clock clock, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
         this.clock = clock;
-        this.config = config;
         this.factory = factory;
         this.subscriptionsDao = dbi.onDemand(SubscriptionSqlDao.class);
         this.eventsDao = dbi.onDemand(EventSqlDao.class);
         this.bundlesDao = dbi.onDemand(BundleSqlDao.class);
-        this.hostname = Hostname.get();
+        this.notificationQueueService = notificationQueueService;
     }
 
     @Override
@@ -140,11 +149,24 @@ public class EntitlementSqlDao implements EntitlementDao {
                     TransactionStatus status) throws Exception {
                 cancelNextPhaseEventFromTransaction(subscriptionId, dao);
                 dao.insertEvent(nextPhase);
+                recordFutureNotificationFromTransaction(dao,
+                        nextPhase.getEffectiveDate(),
+                        new NotificationKey() {
+                            @Override
+                            public String toString() {
+                                return nextPhase.getId().toString();
+                            }
+                        });
                 return null;
             }
         });
     }
 
+    @Override
+    public EntitlementEvent getEventById(UUID eventId) {
+        return eventsDao.getEventById(eventId.toString());
+    }
+
 
     @Override
     public List<EntitlementEvent> getEventsForSubscription(UUID subscriptionId) {
@@ -159,61 +181,6 @@ public class EntitlementSqlDao implements EntitlementDao {
         return results;
     }
 
-    @Override
-    public List<EntitlementEvent> getEventsReady(final UUID ownerId, final int sequenceId) {
-
-        final Date now = clock.getUTCNow().toDate();
-        final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
-
-        log.debug(String.format("EntitlementDao getEventsReady START effectiveNow =  %s", now));
-
-        List<EntitlementEvent> events = eventsDao.inTransaction(new Transaction<List<EntitlementEvent>, EventSqlDao>() {
-
-            @Override
-            public List<EntitlementEvent> inTransaction(EventSqlDao dao,
-                    TransactionStatus status) throws Exception {
-
-                List<EntitlementEvent> claimedEvents = new ArrayList<EntitlementEvent>();
-                List<EntitlementEvent> input = dao.getReadyEvents(now, config.getDaoMaxReadyEvents());
-                for (EntitlementEvent cur : input) {
-                    final boolean claimed = (dao.claimEvent(ownerId.toString(), nextAvailable, cur.getId().toString(), now) == 1);
-                    if (claimed) {
-                        claimedEvents.add(cur);
-                        dao.insertClaimedHistory(sequenceId, ownerId.toString(), hostname, now, cur.getId().toString());
-                    }
-                }
-                return claimedEvents;
-            }
-        });
-
-        for (EntitlementEvent cur : events) {
-            log.debug(String.format("EntitlementDao %s [host %s] claimed events %s", ownerId, hostname, cur.getId()));
-            if (cur.getOwner() != null && !cur.getOwner().equals(ownerId)) {
-                log.warn(String.format("EventProcessor %s stealing event %s from %s", ownerId, cur, cur.getOwner()));
-            }
-        }
-        return events;
-    }
-
-    @Override
-    public void clearEventsReady(final UUID ownerId, final Collection<EntitlementEvent> cleared) {
-
-        log.debug(String.format("EntitlementDao clearEventsReady START cleared size = %d", cleared.size()));
-
-        eventsDao.inTransaction(new Transaction<Void, EventSqlDao>() {
-
-            @Override
-            public Void inTransaction(EventSqlDao dao,
-                    TransactionStatus status) throws Exception {
-                // STEPH Same here batch would nice
-                for (EntitlementEvent cur : cleared) {
-                    dao.clearEvent(cur.getId().toString(), ownerId.toString());
-                    log.debug(String.format("EntitlementDao %s [host %s] cleared events %s", ownerId, hostname, cur.getId()));
-                }
-                return null;
-            }
-        });
-    }
 
     @Override
     public void createSubscription(final SubscriptionData subscription,
@@ -228,8 +195,16 @@ public class EntitlementSqlDao implements EntitlementDao {
                 dao.insertSubscription(subscription);
                 // STEPH batch as well
                 EventSqlDao eventsDaoFromSameTranscation = dao.become(EventSqlDao.class);
-                for (EntitlementEvent cur : initialEvents) {
+                for (final EntitlementEvent cur : initialEvents) {
                     eventsDaoFromSameTranscation.insertEvent(cur);
+                    recordFutureNotificationFromTransaction(dao,
+                            cur.getEffectiveDate(),
+                            new NotificationKey() {
+                                @Override
+                                public String toString() {
+                                    return cur.getId().toString();
+                                }
+                            });
                 }
                 return null;
             }
@@ -246,6 +221,14 @@ public class EntitlementSqlDao implements EntitlementDao {
                 cancelNextChangeEventFromTransaction(subscriptionId, dao);
                 cancelNextPhaseEventFromTransaction(subscriptionId, dao);
                 dao.insertEvent(cancelEvent);
+                recordFutureNotificationFromTransaction(dao,
+                        cancelEvent.getEffectiveDate(),
+                        new NotificationKey() {
+                            @Override
+                            public String toString() {
+                                return cancelEvent.getId().toString();
+                            }
+                        });
                 return null;
             }
         });
@@ -275,8 +258,16 @@ public class EntitlementSqlDao implements EntitlementDao {
 
                 if (existingCancelId != null) {
                     dao.unactiveEvent(existingCancelId.toString(), now);
-                    for (EntitlementEvent cur : uncancelEvents) {
+                    for (final EntitlementEvent cur : uncancelEvents) {
                         dao.insertEvent(cur);
+                        recordFutureNotificationFromTransaction(dao,
+                                cur.getEffectiveDate(),
+                                new NotificationKey() {
+                            @Override
+                            public String toString() {
+                                return cur.getId().toString();
+                            }
+                        });
                     }
                 }
                 return null;
@@ -292,8 +283,16 @@ public class EntitlementSqlDao implements EntitlementDao {
                     TransactionStatus status) throws Exception {
                 cancelNextChangeEventFromTransaction(subscriptionId, dao);
                 cancelNextPhaseEventFromTransaction(subscriptionId, dao);
-                for (EntitlementEvent cur : changeEvents) {
+                for (final EntitlementEvent cur : changeEvents) {
                     dao.insertEvent(cur);
+                    recordFutureNotificationFromTransaction(dao,
+                            cur.getEffectiveDate(),
+                            new NotificationKey() {
+                        @Override
+                        public String toString() {
+                            return cur.getId().toString();
+                        }
+                    });
                 }
                 return null;
             }
@@ -366,8 +365,16 @@ public class EntitlementSqlDao implements EntitlementDao {
                     SubscriptionBundleData bundleData = curBundle.getData();
                     for (SubscriptionMigrationData curSubscription : curBundle.getSubscriptions()) {
                         SubscriptionData subData = curSubscription.getData();
-                        for (EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
+                        for (final EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
                             transEventDao.insertEvent(curEvent);
+                            recordFutureNotificationFromTransaction(transEventDao,
+                                    curEvent.getEffectiveDate(),
+                                    new NotificationKey() {
+                                @Override
+                                public String toString() {
+                                    return curEvent.getId().toString();
+                                }
+                            });
                         }
                         transSubDao.insertSubscription(subData);
                     }
@@ -378,6 +385,7 @@ public class EntitlementSqlDao implements EntitlementDao {
         });
     }
 
+
     @Override
     public void undoMigration(final UUID accountId) {
 
@@ -406,4 +414,14 @@ public class EntitlementSqlDao implements EntitlementDao {
             transBundleDao.removeBundle(curBundle.getId().toString());
         }
     }
+
+    private void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime effectiveDate, final NotificationKey notificationKey) {
+        try {
+            NotificationQueue subscritionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+                Engine.NOTIFICATION_QUEUE_NAME);
+            subscritionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
+        } catch (NoSuchNotificationQueue e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java
index de61217..5f485e5 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java
@@ -19,7 +19,6 @@ package com.ning.billing.entitlement.engine.dao;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
 import com.ning.billing.entitlement.events.EventBaseBuilder;
-import com.ning.billing.entitlement.events.EventLifecycle.EventLifecycleState;
 import com.ning.billing.entitlement.events.phase.PhaseEvent;
 import com.ning.billing.entitlement.events.phase.PhaseEventBuilder;
 import com.ning.billing.entitlement.events.phase.PhaseEventData;
@@ -49,43 +48,31 @@ import java.util.UUID;
 @ExternalizedSqlViaStringTemplate3()
 public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transmogrifier  {
 
-    //
-    // APIs for event notifications
-    //
     @SqlQuery
-    @Mapper(IEventSqlMapper.class)
-    public List<EntitlementEvent> getReadyEvents(@Bind("now") Date now, @Bind("max") int max);
+    @Mapper(EventSqlMapper.class)
+    public EntitlementEvent getEventById(@Bind("event_id") String eventId);
 
     @SqlUpdate
-    public int claimEvent(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("event_id") String eventId, @Bind("now") Date now);
+    public void insertEvent(@Bind(binder = EventSqlDaoBinder.class) EntitlementEvent evt);
 
     @SqlUpdate
     public void removeEvents(@Bind("subscription_id") String subscriptionId);
 
     @SqlUpdate
-    public void clearEvent(@Bind("event_id") String eventId, @Bind("owner") String owner);
-
-    @SqlUpdate
-    public void insertEvent(@Bind(binder = IEventSqlDaoBinder.class) EntitlementEvent evt);
-
-    @SqlUpdate
-    public void insertClaimedHistory(@Bind("sequence_id") int sequenceId, @Bind("owner_id") String ownerId, @Bind("hostname") String hostname, @Bind("claimed_dt") Date clainedDate, @Bind("event_id") String eventId);
-
-    @SqlUpdate
     public void unactiveEvent(@Bind("event_id")String eventId, @Bind("now") Date now);
 
     @SqlUpdate
     public void reactiveEvent(@Bind("event_id")String eventId, @Bind("now") Date now);
 
     @SqlQuery
-    @Mapper(IEventSqlMapper.class)
+    @Mapper(EventSqlMapper.class)
     public List<EntitlementEvent> getFutureActiveEventForSubscription(@Bind("subscription_id") String subscriptionId, @Bind("now") Date now);
 
     @SqlQuery
-    @Mapper(IEventSqlMapper.class)
+    @Mapper(EventSqlMapper.class)
     public List<EntitlementEvent> getEventsForSubscription(@Bind("subscription_id") String subscriptionId);
 
-    public static class IEventSqlDaoBinder implements Binder<Bind, EntitlementEvent> {
+    public static class EventSqlDaoBinder implements Binder<Bind, EntitlementEvent> {
 
         private Date getDate(DateTime dateTime) {
             return dateTime == null ? null : dateTime.toDate();
@@ -106,13 +93,10 @@ public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transm
             stmt.bind("plist_name", (evt.getType() == EventType.API_USER) ? ((ApiEvent) evt).getPriceList() : null);
             stmt.bind("current_version", evt.getActiveVersion());
             stmt.bind("is_active", evt.isActive());
-            stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
-            stmt.bind("processing_owner", (String) null);
-            stmt.bind("processing_state", EventLifecycleState.AVAILABLE.toString());
         }
     }
 
-    public static class IEventSqlMapper implements ResultSetMapper<EntitlementEvent> {
+    public static class EventSqlMapper implements ResultSetMapper<EntitlementEvent> {
 
         private DateTime getDate(ResultSet r, String fieldName) throws SQLException {
             final Timestamp resultStamp = r.getTimestamp(fieldName);
@@ -135,9 +119,6 @@ public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transm
             String priceListName = r.getString("plist_name");
             long currentVersion = r.getLong("current_version");
             boolean isActive = r.getBoolean("is_active");
-            DateTime nextAvailableDate = getDate(r, "processing_available_dt");
-            UUID processingOwner = (r.getString("processing_owner") != null) ? UUID.fromString(r.getString("processing_owner")) : null;
-            EventLifecycleState processingState = EventLifecycleState.valueOf(r.getString("processing_state"));
 
             EventBaseBuilder<?> base = ((eventType == EventType.PHASE) ?
                     new PhaseEventBuilder() :
@@ -148,11 +129,7 @@ public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transm
                         .setEffectiveDate(effectiveDate)
                         .setProcessedDate(createdDate)
                         .setActiveVersion(currentVersion)
-                        .setActive(isActive)
-                        .setProcessingOwner(processingOwner)
-                        .setNextAvailableProcessingTime(nextAvailableDate)
-                        .setProcessingState(processingState);
-
+                        .setActive(isActive);
 
             EntitlementEvent result = null;
             if (eventType == EventType.PHASE) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java
index af74752..b7bfece 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java
@@ -21,7 +21,7 @@ import org.joda.time.DateTime;
 import java.util.UUID;
 
 
-public interface EntitlementEvent extends EventLifecycle, Comparable<EntitlementEvent> {
+public interface EntitlementEvent extends Comparable<EntitlementEvent> {
 
     public enum EventType {
         API_USER,
@@ -32,6 +32,16 @@ public interface EntitlementEvent extends EventLifecycle, Comparable<Entitlement
 
     public UUID getId();
 
+    public long getActiveVersion();
+
+    public void setActiveVersion(long activeVersion);
+
+    public boolean isActive();
+
+    public void deactivate();
+
+    public void reactivate();
+
     public DateTime getProcessedDate();
 
     public DateTime getRequestedDate();
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java
index 93dc9e1..9420fbf 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java
@@ -30,12 +30,8 @@ public abstract class EventBase implements EntitlementEvent {
     private final DateTime effectiveDate;
     private final DateTime processedDate;
 
-    // Lifecyle of the event
     private long activeVersion;
     private boolean isActive;
-    private UUID processingOwner;
-    private DateTime nextAvailableProcessingTime;
-    private EventLifecycleState processingState;
 
     public EventBase(EventBaseBuilder<?> builder) {
         this.uuid = builder.getUuid();
@@ -46,31 +42,17 @@ public abstract class EventBase implements EntitlementEvent {
 
         this.activeVersion = builder.getActiveVersion();
         this.isActive = builder.isActive();
-        this.processingOwner = builder.getProcessingOwner();
-        this.nextAvailableProcessingTime = builder.getNextAvailableProcessingTime();
-        this.processingState = builder.getProcessingState();
     }
 
     public EventBase(UUID subscriptionId, DateTime requestedDate,
             DateTime effectiveDate, DateTime processedDate,
             long activeVersion, boolean isActive) {
-        this(subscriptionId, requestedDate, effectiveDate, processedDate, activeVersion, isActive, null, null, EventLifecycleState.AVAILABLE);
-    }
-
-    private EventBase(UUID subscriptionId, DateTime requestedDate,
-            DateTime effectiveDate, DateTime processedDate,
-            long activeVersion, boolean isActive,
-            UUID processingOwner, DateTime nextAvailableProcessingTime,
-            EventLifecycleState processingState) {
-        this(UUID.randomUUID(), subscriptionId, requestedDate, effectiveDate, processedDate, activeVersion, isActive,
-                processingOwner, nextAvailableProcessingTime, processingState);
+        this(UUID.randomUUID(), subscriptionId, requestedDate, effectiveDate, processedDate, activeVersion, isActive);
     }
 
     public EventBase(UUID id, UUID subscriptionId, DateTime requestedDate,
             DateTime effectiveDate, DateTime processedDate,
-            long activeVersion, boolean isActive,
-            UUID processingOwner, DateTime nextAvailableProcessingTime,
-            EventLifecycleState processingState) {
+            long activeVersion, boolean isActive) {
         this.uuid = id;
         this.subscriptionId = subscriptionId;
         this.requestedDate = requestedDate;
@@ -79,10 +61,6 @@ public abstract class EventBase implements EntitlementEvent {
 
         this.activeVersion = activeVersion;
         this.isActive = isActive;
-        this.processingOwner = processingOwner;
-        this.nextAvailableProcessingTime = nextAvailableProcessingTime;
-        this.processingState = processingState;
-
     }
 
 
@@ -138,64 +116,9 @@ public abstract class EventBase implements EntitlementEvent {
     }
 
 
-    @Override
-    public UUID getOwner() {
-        return processingOwner;
-    }
-
-    @Override
-    public void setOwner(UUID owner) {
-        this.processingOwner = owner;
-    }
-
-    @Override
-    public DateTime getNextAvailableDate() {
-        return nextAvailableProcessingTime;
-    }
-
-    @Override
-    public void setNextAvailableDate(DateTime dateTime) {
-        this.nextAvailableProcessingTime = dateTime;
-    }
-
-
-    @Override
-    public EventLifecycleState getProcessingState() {
-        return processingState;
-    }
-
-    @Override
-    public void setProcessingState(EventLifecycleState processingState) {
-        this.processingState = processingState;
-    }
-
-    @Override
-    public boolean isAvailableForProcessing(DateTime now) {
-
-        // Event got deactivated, will never be processed
-        if (!isActive) {
-            return false;
-        }
-
-        switch(processingState) {
-        case AVAILABLE:
-            break;
-        case IN_PROCESSING:
-            // Somebody already got the event, not available yet
-            if (nextAvailableProcessingTime.isAfter(now)) {
-                return false;
-            }
-            break;
-        case PROCESSED:
-            return false;
-        default:
-            throw new EntitlementError(String.format("Unkwnon IEvent processing state %s", processingState));
-        }
-        return effectiveDate.isBefore(now);
-    }
 
     //
-    // Really used for unit tesrs only as the sql implementation relies on date first and then event insertion
+    // Really used for unit tests only as the sql implementation relies on date first and then event insertion
     //
     // Order first by:
     // - effectiveDate, followed by processedDate, requestedDate
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java
index 104fbef..17f5e15 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java
@@ -16,7 +16,6 @@
 
 package com.ning.billing.entitlement.events;
 
-import com.ning.billing.entitlement.events.EventLifecycle.EventLifecycleState;
 import org.joda.time.DateTime;
 
 import java.util.UUID;
@@ -32,15 +31,11 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
 
     private long activeVersion;
     private boolean isActive;
-    private UUID processingOwner;
-    private DateTime nextAvailableProcessingTime;
-    private EventLifecycleState processingState;
 
 
     public EventBaseBuilder() {
         this.uuid = UUID.randomUUID();
         this.isActive = true;
-        this.processingState = EventLifecycleState.AVAILABLE;
     }
 
     public EventBaseBuilder(EventBaseBuilder<?> copy) {
@@ -52,9 +47,6 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
 
         this.activeVersion = copy.activeVersion;
         this.isActive = copy.isActive;
-        this.processingOwner = copy.processingOwner;
-        this.nextAvailableProcessingTime = copy.nextAvailableProcessingTime;
-        this.processingState = copy.processingState;
     }
 
     public T setUuid(UUID uuid) {
@@ -92,21 +84,6 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
         return (T) this;
     }
 
-    public T setProcessingOwner(UUID processingOwner) {
-        this.processingOwner = processingOwner;
-        return (T) this;
-    }
-
-    public T setNextAvailableProcessingTime(DateTime nextAvailableProcessingTime) {
-        this.nextAvailableProcessingTime = nextAvailableProcessingTime;
-        return (T) this;
-    }
-
-    public T setProcessingState(EventLifecycleState processingState) {
-        this.processingState = processingState;
-        return (T) this;
-    }
-
     public UUID getUuid() {
         return uuid;
     }
@@ -134,16 +111,4 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
     public boolean isActive() {
         return isActive;
     }
-
-    public UUID getProcessingOwner() {
-        return processingOwner;
-    }
-
-    public DateTime getNextAvailableProcessingTime() {
-        return nextAvailableProcessingTime;
-    }
-
-    public EventLifecycleState getProcessingState() {
-        return processingState;
-    }
 }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java
index 3f033a4..2438ddf 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java
@@ -58,17 +58,6 @@ public class ApiEventBase extends EventBase implements ApiEvent {
     }
 
 
-    public ApiEventBase(UUID id, UUID subscriptionId, DateTime processed, String eventPlan, String eventPhase,
-            String priceList, DateTime requestedDate,  ApiEventType eventType, DateTime effectiveDate, long activeVersion,
-            boolean isActive, UUID processingOwner, DateTime nextAvailableProcessingTime,EventLifecycleState processingState) {
-        super(id, subscriptionId, requestedDate, effectiveDate, processed, activeVersion, isActive, processingOwner, nextAvailableProcessingTime, processingState);
-        this.eventType = eventType;
-        this.eventPlan = eventPlan;
-        this.eventPlanPhase = eventPhase;
-        this.eventPriceList = priceList;
-    }
-
-
     @Override
     public ApiEventType getEventType() {
         return eventType;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java b/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
index c1268ca..d2776ad 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
@@ -32,12 +32,9 @@ import com.ning.billing.entitlement.api.test.EntitlementTestApi;
 import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
 import com.ning.billing.entitlement.api.user.EntitlementUserApi;
 import com.ning.billing.entitlement.api.user.SubscriptionApiService;
-import com.ning.billing.entitlement.engine.core.DefaultApiEventProcessor;
 import com.ning.billing.entitlement.engine.core.Engine;
-import com.ning.billing.entitlement.engine.core.EventNotifier;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.engine.dao.EntitlementSqlDao;
-import com.ning.billing.util.glue.ClockModule;
 
 
 
@@ -47,10 +44,7 @@ public class EntitlementModule extends AbstractModule {
         final EntitlementConfig config = new ConfigurationObjectFactory(System.getProperties()).build(EntitlementConfig.class);
         bind(EntitlementConfig.class).toInstance(config);
     }
-    
-    protected void installApiEventProcessor() {
-        bind(EventNotifier.class).to(DefaultApiEventProcessor.class).asEagerSingleton();
-    }
+
 
     protected void installEntitlementDao() {
         bind(EntitlementDao.class).to(EntitlementSqlDao.class).asEagerSingleton();
@@ -71,7 +65,6 @@ public class EntitlementModule extends AbstractModule {
     @Override
     protected void configure() {
         installConfig();
-        installApiEventProcessor();
         installEntitlementDao();
         installEntitlementCore();
     }
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
index 4540ad8..55ad7f4 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
@@ -14,24 +14,9 @@ CREATE TABLE events (
     plist_name varchar(64) DEFAULT NULL,
     current_version int(11) DEFAULT 1,
     is_active bool DEFAULT 1,
-    processing_owner char(36) DEFAULT NULL,
-    processing_available_dt datetime DEFAULT NULL,
-    processing_state varchar(14) DEFAULT 'AVAILABLE',
     PRIMARY KEY(id)
 ) ENGINE=innodb;
 
-DROP TABLE IF EXISTS claimed_events;
-CREATE TABLE claimed_events (
-    id int(11) unsigned NOT NULL AUTO_INCREMENT,    
-    sequence_id int(11) unsigned NOT NULL,    
-    owner_id char(36) NOT NULL,
-    hostname varchar(64) NOT NULL,
-    claimed_dt datetime NOT NULL,
-    event_id char(36) NOT NULL,
-    PRIMARY KEY(id)
-) ENGINE=innodb;
-
-
 DROP TABLE IF EXISTS subscriptions;
 CREATE TABLE subscriptions (
     id char(36) NOT NULL,
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
index 31e9eaf..704e2c7 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
@@ -1,8 +1,8 @@
 group EventSqlDao;
 
-getReadyEvents(now, max) ::= <<
-    select
-      event_id
+getEventById(event_id) ::= <<
+  select
+     event_id
       , event_type
       , user_type
       , created_dt
@@ -14,48 +14,11 @@ getReadyEvents(now, max) ::= <<
       , phase_name
       , plist_name
       , current_version
-      , is_active
-      , processing_owner
-      , processing_available_dt
-      , processing_state
-    from events
-    where
-      effective_dt \<= :now
-      and is_active = 1
-      and processing_state != 'PROCESSED'
-      and (processing_owner IS NULL OR processing_available_dt \<= :now)
-    order by
-      effective_dt asc
-      , created_dt asc
-      , requested_dt asc
-      , id asc
-    limit :max
-    ;
->>
-
-claimEvent(owner, next_available, event_id, now) ::= <<
-    update events
-    set
-      processing_owner = :owner
-      , processing_available_dt = :next_available
-      , processing_state = 'IN_PROCESSING'
-    where
-      event_id = :event_id
-      and is_active = 1
-      and processing_state != 'PROCESSED'
-      and (processing_owner IS NULL OR processing_available_dt \<= :now)
-    ;
->>
-
-clearEvent(event_id, owner) ::= <<
-    update events
-    set
-      processing_owner = NULL
-      , processing_state = 'PROCESSED'
-    where
+      , is_active  
+  from events
+  where
       event_id = :event_id
-      and processing_owner = :owner
-    ;
+  ;
 >>
 
 insertEvent() ::= <<
@@ -73,9 +36,6 @@ insertEvent() ::= <<
       , plist_name
       , current_version
       , is_active
-      , processing_owner
-      , processing_available_dt
-      , processing_state
     ) values (
       :event_id
       , :event_type
@@ -90,9 +50,6 @@ insertEvent() ::= <<
       , :plist_name
       , :current_version
       , :is_active
-      , :processing_owner
-      , :processing_available_dt
-      , :processing_state
     );   
 >>
 
@@ -103,22 +60,6 @@ removeEvents(subscription_id) ::= <<
     ;
 >>
 
-insertClaimedHistory(sequence_id, owner_id, hostname, claimed_dt, event_id) ::= <<
-    insert into claimed_events (
-        sequence_id
-        , owner_id
-        , hostname
-        , claimed_dt
-        , event_id
-      ) values (
-        :sequence_id
-        , :owner_id
-        , :hostname
-        , :claimed_dt
-        , :event_id
-      );
->>
-
 unactiveEvent(event_id, now) ::= <<
     update events
     set
@@ -154,9 +95,6 @@ getFutureActiveEventForSubscription(subscription_id, now) ::= <<
       , plist_name
       , current_version
       , is_active
-      , processing_owner
-      , processing_available_dt
-      , processing_state    
     from events
     where
       subscription_id = :subscription_id
@@ -185,9 +123,6 @@ getEventsForSubscription(subscription_id) ::= <<
       , plist_name
       , current_version
       , is_active
-      , processing_owner
-      , processing_available_dt
-      , processing_state       
     from events
     where
       subscription_id = :subscription_id
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadMockEntitlementDao.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadMockEntitlementDao.java
index f7d8186..68cae5a 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadMockEntitlementDao.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/billing/BrainDeadMockEntitlementDao.java
@@ -99,18 +99,6 @@ class BrainDeadMockEntitlementDao implements EntitlementDao {
 	}
 
 	@Override
-	public List<EntitlementEvent> getEventsReady(UUID ownerId,
-			int sequenceId) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public void clearEventsReady(UUID ownerId,
-			Collection<EntitlementEvent> cleared) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
 	public void createSubscription(SubscriptionData subscription,
 			List<EntitlementEvent> initialEvents) {
 		throw new UnsupportedOperationException();
@@ -144,5 +132,10 @@ class BrainDeadMockEntitlementDao implements EntitlementDao {
 	public void undoMigration(UUID accountId) {
 		throw new UnsupportedOperationException();
 	}
+
+	@Override
+	public EntitlementEvent getEventById(UUID eventId) {
+		throw new UnsupportedOperationException();
+	}
 	
 }
\ No newline at end of file
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index 81696b0..b0483bb 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -94,7 +94,7 @@ public abstract class TestApiBase {
     protected ApiTestListener testListener;
     protected SubscriptionBundle bundle;
 
-    public static void loadSystemPropertiesFromClasspath( final String resource )
+    public static void loadSystemPropertiesFromClasspath(final String resource)
     {
         final URL url = TestApiBase.class.getResource(resource);
         assertNotNull(url);
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java
index 187c080..87491c7 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java
@@ -32,7 +32,7 @@ public class TestUserApiCancelSql extends TestUserApiCancel {
         return Guice.createInjector(Stage.DEVELOPMENT, new MockEngineModuleSql());
     }
 
-    @Test(enabled= true, groups={"stress"})
+    @Test(enabled= false, groups={"stress"})
     public void stressTest() {
         for (int i = 0; i < MAX_STRESS_ITERATIONS; i++) {
             cleanupTest();
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
index dc6567f..51c3d8b 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
@@ -198,6 +198,15 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
             testListener.pushExpectedEvent(NextEvent.PHASE);
             clock.addDeltaFromReality(currentPhase.getDuration());
             DateTime futureNow = clock.getUTCNow();
+
+            /*
+            try {
+                Thread.sleep(1000 * 3000);
+            } catch (Exception e) {
+
+            }
+            */
+
             assertTrue(futureNow.isAfter(nextExpectedPhaseChange));
             assertTrue(testListener.isCompleted(3000));
 
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java
index 74ac1e7..3623da2 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java
@@ -17,5 +17,6 @@
 package com.ning.billing.entitlement.engine.dao;
 
 public interface MockEntitlementDao {
+
     public void reset();
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index 7fc8342..86e458f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -31,16 +31,26 @@ import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
 import com.ning.billing.entitlement.api.user.SubscriptionFactory;
 
 import com.ning.billing.entitlement.api.user.SubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.engine.core.Engine;
 import com.ning.billing.entitlement.events.EntitlementEvent;
 import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
-import com.ning.billing.entitlement.events.EventLifecycle.EventLifecycleState;
 import com.ning.billing.entitlement.events.user.ApiEvent;
 import com.ning.billing.entitlement.events.user.ApiEventType;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationLifecycle;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlementDao {
 
@@ -52,15 +62,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
     private final Clock clock;
     private final EntitlementConfig config;
     private final SubscriptionFactory factory;
-
-
+    private final NotificationQueueService notificationQueueService;
 
     @Inject
-    public MockEntitlementDaoMemory(Clock clock, EntitlementConfig config, SubscriptionFactory factory) {
+    public MockEntitlementDaoMemory(Clock clock, EntitlementConfig config, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
         super();
         this.clock = clock;
         this.config = config;
         this.factory = factory;
+        this.notificationQueueService = notificationQueueService;
         this.bundles = new ArrayList<SubscriptionBundle>();
         this.subscriptions = new ArrayList<Subscription>();
         this.events = new TreeSet<EntitlementEvent>();
@@ -138,6 +148,14 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
 
         synchronized(events) {
             events.addAll(initalEvents);
+            for (final EntitlementEvent cur : initalEvents) {
+                recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
+                    @Override
+                    public String toString() {
+                        return cur.getId().toString();
+                    }
+                });
+            }
         }
         Subscription updatedSubscription = buildSubscription(subscription);
         subscriptions.add(updatedSubscription);
@@ -174,7 +192,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
             List<EntitlementEvent> results = new LinkedList<EntitlementEvent>();
             for (EntitlementEvent cur : events) {
                 if (cur.isActive() &&
-                        cur.getProcessingState() == EventLifecycleState.AVAILABLE &&
+                        cur.getEffectiveDate().isAfter(clock.getUTCNow()) &&
                             cur.getSubscriptionId().equals(subscriptionId)) {
                     results.add(cur);
                 }
@@ -202,39 +220,6 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
     }
 
 
-    @Override
-    public List<EntitlementEvent> getEventsReady(UUID ownerId, int sequenceId) {
-        synchronized(events) {
-            List<EntitlementEvent> readyList = new LinkedList<EntitlementEvent>();
-            for (EntitlementEvent cur : events) {
-                if (cur.isAvailableForProcessing(clock.getUTCNow())) {
-
-                    if (cur.getOwner() != null) {
-                        log.warn(String.format("EventProcessor %s stealing event %s from %s", ownerId, cur, cur.getOwner()));
-                    }
-                    cur.setOwner(ownerId);
-                    cur.setNextAvailableDate(clock.getUTCNow().plus(config.getDaoClaimTimeMs()));
-                    cur.setProcessingState(EventLifecycleState.IN_PROCESSING);
-                    readyList.add(cur);
-                }
-            }
-            Collections.sort(readyList);
-            return readyList;
-        }
-    }
-
-    @Override
-    public void clearEventsReady(UUID ownerId, Collection<EntitlementEvent> cleared) {
-        synchronized(events) {
-            for (EntitlementEvent cur : cleared) {
-                if (cur.getOwner().equals(ownerId)) {
-                    cur.setProcessingState(EventLifecycleState.PROCESSED);
-                } else {
-                    log.warn(String.format("EventProcessor %s trying to clear event %s that it does not own", ownerId, cur));
-                }
-            }
-        }
-    }
 
     private Subscription buildSubscription(SubscriptionData in) {
         return factory.createSubscription(new SubscriptionBuilder(in), getEventsForSubscription(in.getId()));
@@ -272,12 +257,26 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
             cancelNextChangeEvent(subscriptionId);
             cancelNextPhaseEvent(subscriptionId);
             events.addAll(changeEvents);
+            for (final EntitlementEvent cur : changeEvents) {
+                recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
+                    @Override
+                    public String toString() {
+                        return cur.getId().toString();
+                    }
+                });
+            }
         }
     }
 
-    private void insertEvent(EntitlementEvent event) {
+    private void insertEvent(final EntitlementEvent event) {
         synchronized(events) {
             events.add(event);
+            recordFutureNotificationFromTransaction(null, event.getEffectiveDate(), new NotificationKey() {
+                @Override
+                public String toString() {
+                    return event.getId().toString();
+                }
+            });
         }
     }
 
@@ -298,10 +297,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
                     continue;
                 }
                 if (cur.getType() == EventType.PHASE &&
-                        cur.getProcessingState() == EventLifecycleState.AVAILABLE) {
+                        cur.getEffectiveDate().isAfter(clock.getUTCNow())) {
                     cur.deactivate();
                     break;
                 }
+
             }
         }
     }
@@ -319,7 +319,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
                 }
                 if (cur.getType() == EventType.API_USER &&
                         ApiEventType.CHANGE == ((ApiEvent) cur).getEventType() &&
-                        cur.getProcessingState() == EventLifecycleState.AVAILABLE) {
+                        cur.getEffectiveDate().isAfter(clock.getUTCNow())) {
                     cur.deactivate();
                     break;
                 }
@@ -364,8 +364,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
                 SubscriptionBundleData bundleData = curBundle.getData();
                 for (SubscriptionMigrationData curSubscription : curBundle.getSubscriptions()) {
                     SubscriptionData subData = curSubscription.getData();
-                    for (EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
+                    for (final EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
                         events.add(curEvent);
+                        recordFutureNotificationFromTransaction(null, curEvent.getEffectiveDate(), new NotificationKey() {
+                            @Override
+                            public String toString() {
+                                return curEvent.getId().toString();
+                            }
+                        });
+
                     }
                     subscriptions.add(subData);
                 }
@@ -393,4 +400,26 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
         }
 
     }
+
+    @Override
+    public EntitlementEvent getEventById(UUID eventId) {
+        synchronized(events) {
+            for (EntitlementEvent cur : events) {
+                if (cur.getId().equals(eventId)) {
+                    return cur;
+                }
+            }
+        }
+        return null;
+    }
+
+    private void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime effectiveDate, final NotificationKey notificationKey) {
+        try {
+            NotificationQueue subscritionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+                Engine.NOTIFICATION_QUEUE_NAME);
+            subscritionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
+        } catch (NoSuchNotificationQueue e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
index 2ebdea9..503fd1a 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
@@ -20,6 +20,8 @@ import com.google.inject.Inject;
 import com.ning.billing.config.EntitlementConfig;
 import com.ning.billing.entitlement.api.user.SubscriptionFactory;
 import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
 import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
@@ -32,11 +34,12 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
     private final ResetSqlDao resetDao;
 
     @Inject
-    public MockEntitlementDaoSql(DBI dbi, Clock clock, EntitlementConfig config, SubscriptionFactory factory) {
-        super(dbi, clock, config, factory);
+    public MockEntitlementDaoSql(DBI dbi, Clock clock, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
+        super(dbi, clock, factory, notificationQueueService);
         this.resetDao = dbi.onDemand(ResetSqlDao.class);
     }
 
+
     @Override
     public void reset() {
         resetDao.inTransaction(new Transaction<Void, ResetSqlDao>() {
@@ -45,9 +48,10 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
             public Void inTransaction(ResetSqlDao dao, TransactionStatus status)
                     throws Exception {
                 resetDao.resetEvents();
-                resetDao.resetClaimedEvents();
                 resetDao.resetSubscriptions();
                 resetDao.resetBundles();
+                resetDao.resetClaimedNotifications();
+                resetDao.resetNotifications();
                 return null;
             }
         });
@@ -58,14 +62,17 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
         @SqlUpdate("truncate table events")
         public void resetEvents();
 
-        @SqlUpdate("truncate table claimed_events")
-        public void resetClaimedEvents();
-
         @SqlUpdate("truncate table subscriptions")
         public void resetSubscriptions();
 
         @SqlUpdate("truncate table bundles")
         public void resetBundles();
-    }
 
+        @SqlUpdate("truncate table notifications")
+        public void resetNotifications();
+
+        @SqlUpdate("truncate table claimed_notifications")
+        public void resetClaimedNotifications();
+
+    }
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
index e9033b1..0f6fca3 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
@@ -20,7 +20,6 @@ import com.ning.billing.account.glue.AccountModuleWithMocks;
 import com.ning.billing.catalog.glue.CatalogModule;
 import com.ning.billing.util.clock.MockClockModule;
 import com.ning.billing.util.glue.EventBusModule;
-import com.ning.billing.entitlement.glue.EntitlementModule;
 
 public class MockEngineModule extends EntitlementModule {
 
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
index c6ce269..786f1e3 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
@@ -17,16 +17,12 @@
 package com.ning.billing.entitlement.glue;
 
 
-import com.ning.billing.entitlement.engine.core.EventNotifier;
-import com.ning.billing.entitlement.engine.core.MockApiEventProcessorMemory;
 import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.engine.dao.MockEntitlementDaoMemory;
+import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
 
 public class MockEngineModuleMemory extends MockEngineModule {
-    @Override
-    protected void installApiEventProcessor() {
-        bind(EventNotifier.class).to(MockApiEventProcessorMemory.class).asEagerSingleton();
-    }
 
     @Override
     protected void installEntitlementDao() {
@@ -34,8 +30,12 @@ public class MockEngineModuleMemory extends MockEngineModule {
     }
 
 
+    private void installNotificationQueue() {
+        bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
+    }
     @Override
     protected void configure() {
         super.configure();
+        installNotificationQueue();
     }
 }
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
index fb5c890..2532c6f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
@@ -22,6 +22,8 @@ import com.ning.billing.entitlement.engine.dao.EntitlementDao;
 import com.ning.billing.entitlement.engine.dao.MockEntitlementDaoSql;
 import com.ning.billing.util.clock.Clock;
 import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.glue.NotificationQueueModule;
+
 import org.skife.config.ConfigurationObjectFactory;
 import org.skife.jdbi.v2.DBI;
 
@@ -42,6 +44,7 @@ public class MockEngineModuleSql extends MockEngineModule {
     @Override
     protected void configure() {
         installDBI();
+        install(new NotificationQueueModule());
         super.configure();
     }
 }
diff --git a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
index adc4669..e89e799 100644
--- a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
+++ b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
@@ -68,9 +68,6 @@ public class DefaultPaymentApi implements PaymentApi {
             if (account != null) {
                 return getPaymentProviderPlugin(account);
             }
-            else {
-                throw new IllegalArgumentException("Did not find account with accountKey " + accountKey);
-            }
         }
 
         return pluginRegistry.getPlugin(paymentProviderName);
@@ -105,21 +102,21 @@ public class DefaultPaymentApi implements PaymentApi {
     }
 
     @Override
-    public Either<PaymentError, String> addPaypalPaymentMethod(@Nullable String accountKey, PaypalPaymentMethodInfo paypalPaymentMethod) {
+    public Either<PaymentError, String> addPaymentMethod(@Nullable String accountKey, PaymentMethodInfo paymentMethod) {
         final PaymentProviderPlugin plugin = getPaymentProviderPlugin(accountKey);
-        return plugin.addPaypalPaymentMethod(accountKey, paypalPaymentMethod);
+        return plugin.addPaymentMethod(accountKey, paymentMethod);
     }
 
     @Override
     public Either<PaymentError, Void> deletePaymentMethod(String accountKey, String paymentMethodId) {
         final PaymentProviderPlugin plugin = getPaymentProviderPlugin(accountKey);
-        return plugin.deletePaypalPaymentMethod(accountKey, paymentMethodId);
+        return plugin.deletePaymentMethod(accountKey, paymentMethodId);
     }
 
     @Override
     public Either<PaymentError, PaymentMethodInfo> updatePaymentMethod(String accountKey, PaymentMethodInfo paymentMethodInfo) {
         final PaymentProviderPlugin plugin = getPaymentProviderPlugin(accountKey);
-        return plugin.updatePaypalPaymentMethod(accountKey, paymentMethodInfo);
+        return plugin.updatePaymentMethod(accountKey, paymentMethodInfo);
     }
 
     @Override
@@ -182,9 +179,9 @@ public class DefaultPaymentApi implements PaymentApi {
     }
 
     @Override
-    public Either<PaymentError, PaymentProviderAccount> updatePaymentProviderAccount(Account account) {
+    public Either<PaymentError, Void> updatePaymentProviderAccountContact(Account account) {
         final PaymentProviderPlugin plugin = getPaymentProviderPlugin(account);
-        return plugin.updatePaymentProviderAccount(account);
+        return plugin.updatePaymentProviderAccountExistingContact(account);
     }
 
     @Override
diff --git a/payment/src/main/java/com/ning/billing/payment/provider/PaymentProviderPlugin.java b/payment/src/main/java/com/ning/billing/payment/provider/PaymentProviderPlugin.java
index 460951d..7a9ae71 100644
--- a/payment/src/main/java/com/ning/billing/payment/provider/PaymentProviderPlugin.java
+++ b/payment/src/main/java/com/ning/billing/payment/provider/PaymentProviderPlugin.java
@@ -25,21 +25,22 @@ import com.ning.billing.payment.api.PaymentError;
 import com.ning.billing.payment.api.PaymentInfo;
 import com.ning.billing.payment.api.PaymentMethodInfo;
 import com.ning.billing.payment.api.PaymentProviderAccount;
-import com.ning.billing.payment.api.PaypalPaymentMethodInfo;
 
 public interface PaymentProviderPlugin {
     Either<PaymentError, PaymentInfo> processInvoice(Account account, Invoice invoice);
     Either<PaymentError, String> createPaymentProviderAccount(Account account);
-    Either<PaymentError, String> addPaypalPaymentMethod(String accountId, PaypalPaymentMethodInfo paypalPaymentMethod);
-    Either<PaymentError, PaymentProviderAccount> updatePaymentProviderAccount(Account account);
 
     Either<PaymentError, PaymentInfo> getPaymentInfo(String paymentId);
-    Either<PaymentError, PaymentMethodInfo> getPaymentMethodInfo(String paymentMethodId);
     Either<PaymentError, PaymentProviderAccount> getPaymentProviderAccount(String accountKey);
+    Either<PaymentError, Void> updatePaymentGateway(String accountKey);
+
+    Either<PaymentError, PaymentMethodInfo> getPaymentMethodInfo(String paymentMethodId);
     Either<PaymentError, List<PaymentMethodInfo>> getPaymentMethods(String accountKey);
+    Either<PaymentError, String> addPaymentMethod(String accountKey, PaymentMethodInfo paymentMethod);
+    Either<PaymentError, PaymentMethodInfo> updatePaymentMethod(String accountKey, PaymentMethodInfo paymentMethodInfo);
+    Either<PaymentError, Void> deletePaymentMethod(String accountKey, String paymentMethodId);
 
-    Either<PaymentError, Void> updatePaymentGateway(String accountKey);
-    Either<PaymentError, Void> deletePaypalPaymentMethod(String accountKey, String paymentMethodId);
-    Either<PaymentError, PaymentMethodInfo> updatePaypalPaymentMethod(String accountKey, PaymentMethodInfo paymentMethodInfo);
+    Either<PaymentError, Void> updatePaymentProviderAccountExistingContact(Account account);
+    Either<PaymentError, Void> updatePaymentProviderAccountWithNewContact(Account account);
 
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java b/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
index a1b02b3..c756963 100644
--- a/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
+++ b/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
@@ -25,7 +25,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -33,6 +35,7 @@ import org.testng.annotations.Test;
 import com.google.inject.Inject;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.account.api.user.AccountBuilder;
 import com.ning.billing.catalog.api.Currency;
 import com.ning.billing.invoice.api.Invoice;
 import com.ning.billing.invoice.model.DefaultInvoiceItem;
@@ -58,10 +61,9 @@ public abstract class TestPaymentApi {
         eventBus.stop();
     }
 
-//    @Test(groups = "fast")
     @Test
     public void testCreatePayment() throws AccountApiException {
-        final DateTime now = new DateTime();
+        final DateTime now = new DateTime(DateTimeZone.UTC);
         final Account account = testHelper.createTestAccount();
         final Invoice invoice = testHelper.createTestInvoice(account, now, Currency.USD);
         final BigDecimal amount = new BigDecimal("10.00");
@@ -84,6 +86,113 @@ public abstract class TestPaymentApi {
         PaymentInfo paymentInfo = results.get(0).getRight();
 
         assertNotNull(paymentInfo.getPaymentId());
-        assertEquals(paymentInfo.getAmount().doubleValue(), amount.doubleValue());
+        assertTrue(paymentInfo.getAmount().compareTo(amount) == 0);
+        assertNotNull(paymentInfo.getPaymentNumber());
+
+        PaymentAttempt paymentAttempt = paymentApi.getPaymentAttemptForPaymentId(paymentInfo.getPaymentId());
+        assertNotNull(paymentAttempt);
+        assertNotNull(paymentAttempt.getPaymentAttemptId());
+        assertEquals(paymentAttempt.getInvoiceId(), invoice.getId());
+        assertTrue(paymentAttempt.getAmount().compareTo(amount) == 0);
+        assertEquals(paymentAttempt.getCurrency(), Currency.USD);
+        assertEquals(paymentAttempt.getPaymentId(), paymentInfo.getPaymentId());
+        assertEquals(paymentAttempt.getPaymentAttemptDate().withMillisOfSecond(0), now.withMillisOfSecond(0));
+
+    }
+
+    private PaymentProviderAccount setupAccountWithPaymentMethod() throws AccountApiException {
+        final Account account = testHelper.createTestAccount();
+        paymentApi.createPaymentProviderAccount(account);
+
+        String accountKey = account.getExternalKey();
+
+        PaypalPaymentMethodInfo paymentMethod = new PaypalPaymentMethodInfo.Builder()
+                                                                           .setBaid("12345")
+                                                                           .setEmail(account.getEmail())
+                                                                           .setDefaultMethod(true)
+                                                                           .build();
+        Either<PaymentError, String> paymentMethodIdOrError = paymentApi.addPaymentMethod(accountKey, paymentMethod);
+
+        assertTrue(paymentMethodIdOrError.isRight());
+        assertNotNull(paymentMethodIdOrError.getRight());
+
+        Either<PaymentError, PaymentMethodInfo> paymentMethodInfoOrError = paymentApi.getPaymentMethod(accountKey, paymentMethodIdOrError.getRight());
+
+        assertTrue(paymentMethodInfoOrError.isRight());
+        assertNotNull(paymentMethodInfoOrError.getRight());
+
+        Either<PaymentError, PaymentProviderAccount> accountOrError = paymentApi.getPaymentProviderAccount(accountKey);
+
+        assertTrue(accountOrError.isRight());
+
+        return accountOrError.getRight();
+    }
+
+    @Test
+    public void testCreatePaymentMethod() throws AccountApiException {
+        PaymentProviderAccount account = setupAccountWithPaymentMethod();
+        assertNotNull(account);
+    }
+
+    @Test
+    public void testUpdatePaymentProviderAccountContact() throws AccountApiException {
+        final Account account = testHelper.createTestAccount();
+        paymentApi.createPaymentProviderAccount(account);
+
+        String newName = "Tester " + RandomStringUtils.randomAlphanumeric(10);
+        String newNumber = "888-888-" + RandomStringUtils.randomNumeric(4);
+
+        final Account accountToUpdate = new AccountBuilder(account.getId())
+                                                                  .name(newName)
+                                                                  .firstNameLength(newName.length())
+                                                                  .externalKey(account.getExternalKey())
+                                                                  .phone(newNumber)
+                                                                  .email(account.getEmail())
+                                                                  .currency(account.getCurrency())
+                                                                  .billingCycleDay(account.getBillCycleDay())
+                                                                  .build();
+
+        Either<PaymentError, Void> voidOrError = paymentApi.updatePaymentProviderAccountContact(accountToUpdate);
+        assertTrue(voidOrError.isRight());
+    }
+
+    @Test
+    public void testCannotDeleteDefaultPaymentMethod() throws AccountApiException {
+        PaymentProviderAccount account = setupAccountWithPaymentMethod();
+
+        Either<PaymentError, Void> errorOrVoid = paymentApi.deletePaymentMethod(account.getAccountKey(), account.getDefaultPaymentMethodId());
+
+        assertTrue(errorOrVoid.isLeft());
+    }
+
+    @Test
+    public void testDeleteNonDefaultPaymentMethod() throws AccountApiException {
+        final Account account = testHelper.createTestAccount();
+        paymentApi.createPaymentProviderAccount(account);
+
+        String accountKey = account.getExternalKey();
+
+        PaypalPaymentMethodInfo paymentMethod1 = new PaypalPaymentMethodInfo.Builder().setDefaultMethod(false).setBaid("12345").setEmail(account.getEmail()).build();
+        Either<PaymentError, String> paymentMethodIdOrError1 = paymentApi.addPaymentMethod(accountKey, paymentMethod1);
+
+        assertTrue(paymentMethodIdOrError1.isRight());
+        assertNotNull(paymentMethodIdOrError1.getRight());
+
+        PaypalPaymentMethodInfo paymentMethod2 = new PaypalPaymentMethodInfo.Builder().setDefaultMethod(true).setBaid("12345").setEmail(account.getEmail()).build();
+
+        Either<PaymentError, String> paymentMethodIdOrError2 = paymentApi.addPaymentMethod(accountKey, paymentMethod2);
+
+        assertTrue(paymentMethodIdOrError2.isRight());
+        assertNotNull(paymentMethodIdOrError2.getRight());
+
+        Either<PaymentError, List<PaymentMethodInfo>> paymentMethodsOrError = paymentApi.getPaymentMethods(accountKey);
+
+        assertTrue(paymentMethodsOrError.isRight());
+
+        Either<PaymentError, Void> errorOrVoid1 = paymentApi.deletePaymentMethod(accountKey, paymentMethodIdOrError1.getRight());
+        Either<PaymentError, Void> errorOrVoid2 = paymentApi.deletePaymentMethod(accountKey, paymentMethodIdOrError2.getRight());
+
+        assertTrue(errorOrVoid1.isRight());
+        assertTrue(errorOrVoid2.isLeft());
     }
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java b/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
index 32d0d71..97e6136 100644
--- a/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
+++ b/payment/src/test/java/com/ning/billing/payment/provider/MockPaymentProviderPlugin.java
@@ -16,16 +16,21 @@
 
 package com.ning.billing.payment.provider;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.lang.RandomStringUtils;
 import org.joda.time.DateTime;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
 import com.ning.billing.account.api.Account;
 import com.ning.billing.invoice.api.Invoice;
+import com.ning.billing.payment.api.CreditCardPaymentMethodInfo;
 import com.ning.billing.payment.api.Either;
 import com.ning.billing.payment.api.PaymentError;
 import com.ning.billing.payment.api.PaymentInfo;
@@ -36,19 +41,20 @@ import com.ning.billing.payment.api.PaypalPaymentMethodInfo;
 public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
     private final Map<String, PaymentInfo> payments = new ConcurrentHashMap<String, PaymentInfo>();
     private final Map<String, PaymentProviderAccount> accounts = new ConcurrentHashMap<String, PaymentProviderAccount>();
+    private final Map<String, PaymentMethodInfo> paymentMethods = new ConcurrentHashMap<String, PaymentMethodInfo>();
 
     @Override
     public Either<PaymentError, PaymentInfo> processInvoice(Account account, Invoice invoice) {
         PaymentInfo payment = new PaymentInfo.Builder().setPaymentId(UUID.randomUUID().toString())
-                                            .setAmount(invoice.getAmountOutstanding())
-                                            .setStatus("Processed")
-                                            .setBankIdentificationNumber("1234")
-                                            .setCreatedDate(new DateTime())
-                                            .setEffectiveDate(new DateTime())
-                                            .setPaymentNumber("12345")
-                                            .setReferenceId("12345")
-                                            .setType("Electronic")
-                                            .build();
+                                             .setAmount(invoice.getAmountOutstanding())
+                                             .setStatus("Processed")
+                                             .setBankIdentificationNumber("1234")
+                                             .setCreatedDate(new DateTime())
+                                             .setEffectiveDate(new DateTime())
+                                             .setPaymentNumber("12345")
+                                             .setReferenceId("12345")
+                                             .setType("Electronic")
+                                             .build();
 
         payments.put(payment.getPaymentId(), payment);
         return Either.right(payment);
@@ -69,13 +75,13 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
     @Override
     public Either<PaymentError, String> createPaymentProviderAccount(Account account) {
         if (account != null) {
-            PaymentProviderAccount paymentProviderAccount = accounts.put(account.getExternalKey(),
-                                                                         new PaymentProviderAccount.Builder().setAccountNumber(String.valueOf(RandomUtils.nextInt(10)))
-                                                                                                             .setDefaultPaymentMethod(String.valueOf(RandomUtils.nextInt(10)))
-                                                                                                             .setId(String.valueOf(RandomUtils.nextInt(10)))
-                                                                                                             .build());
+            String id = String.valueOf(RandomStringUtils.randomAlphanumeric(10));
+            accounts.put(account.getExternalKey(),
+                         new PaymentProviderAccount.Builder().setAccountKey(account.getExternalKey())
+                                                             .setId(id)
+                                                             .build());
 
-            return Either.right(paymentProviderAccount.getId());
+            return Either.right(id);
         }
         else {
             return Either.left(new PaymentError("unknown", "Did not get account to create payment provider account"));
@@ -83,51 +89,167 @@ public class MockPaymentProviderPlugin implements PaymentProviderPlugin {
     }
 
     @Override
-    public Either<PaymentError, PaymentProviderAccount> updatePaymentProviderAccount(Account account) {
-        // TODO Auto-generated method stub
-        return null;
+    public Either<PaymentError, PaymentProviderAccount> getPaymentProviderAccount(String accountKey) {
+        if (accountKey != null) {
+            return Either.right(accounts.get(accountKey));
+        }
+        else {
+            return Either.left(new PaymentError("unknown", "Did not get account for accountKey " + accountKey));
+        }
     }
 
     @Override
-    public Either<PaymentError, PaymentMethodInfo> getPaymentMethodInfo(String paymentMethodId) {
-        // TODO
-        return Either.left(new PaymentError("unknown", "Not implemented"));
+    public Either<PaymentError, String> addPaymentMethod(String accountKey, PaymentMethodInfo paymentMethod) {
+        if (paymentMethod != null) {
+            PaymentProviderAccount account = accounts.get(accountKey);
+
+            if (account != null && account.getId() != null) {
+                String existingDefaultMethod = account.getDefaultPaymentMethodId();
+
+                String paymentMethodId = RandomStringUtils.randomAlphanumeric(10);
+                boolean shouldBeDefault = Boolean.TRUE.equals(paymentMethod.getDefaultMethod()) || existingDefaultMethod == null;
+                PaymentMethodInfo realPaymentMethod = null;
+
+                if (paymentMethod instanceof PaypalPaymentMethodInfo) {
+                    PaypalPaymentMethodInfo paypalPaymentMethod = (PaypalPaymentMethodInfo)paymentMethod;
+
+                    realPaymentMethod = new PaypalPaymentMethodInfo.Builder(paypalPaymentMethod)
+                                                                   .setId(paymentMethodId)
+                                                                   .setAccountId(accountKey)
+                                                                   .setDefaultMethod(shouldBeDefault)
+                                                                   .setBaid(paypalPaymentMethod.getBaid())
+                                                                   .setEmail(paypalPaymentMethod.getEmail())
+                                                                   .build();
+                }
+                else if (paymentMethod instanceof CreditCardPaymentMethodInfo) {
+                    CreditCardPaymentMethodInfo ccPaymentMethod = (CreditCardPaymentMethodInfo)paymentMethod;
+                    realPaymentMethod = new CreditCardPaymentMethodInfo.Builder(ccPaymentMethod).setId(paymentMethodId).build();
+                }
+                if (realPaymentMethod == null) {
+                    return Either.left(new PaymentError("unsupported", "Payment method " + paymentMethod.getType() + " not supported by the plugin"));
+                }
+                else {
+                    if (shouldBeDefault) {
+                        setDefaultPaymentMethodOnAccount(account, paymentMethodId);
+                    }
+                    paymentMethods.put(paymentMethodId, realPaymentMethod);
+                    return Either.right(paymentMethodId);
+                }
+            }
+                else {
+                    return Either.left(new PaymentError("noaccount", "Could not retrieve account for accountKey " + accountKey));
+                }
+        }
+        else {
+            return Either.left(new PaymentError("unknown", "Could not create add payment method " + paymentMethod + " for " + accountKey));
+        }
+    }
+
+    public void setDefaultPaymentMethodOnAccount(PaymentProviderAccount account, String paymentMethodId) {
+        if (paymentMethodId != null && account != null) {
+            accounts.put(account.getAccountKey(),
+                new PaymentProviderAccount.Builder()
+                                          .copyFrom(account)
+                                          .setDefaultPaymentMethod(paymentMethodId)
+                                          .build());
+            List<PaymentMethodInfo> paymentMethodsToUpdate = new ArrayList<PaymentMethodInfo>();
+            for (PaymentMethodInfo paymentMethod : paymentMethods.values()) {
+                if (account.getAccountKey().equals(paymentMethod.getAccountId()) && !paymentMethodId.equals(paymentMethod.getId())) {
+                    if (paymentMethod instanceof PaypalPaymentMethodInfo) {
+                        PaypalPaymentMethodInfo paypalPaymentMethod = (PaypalPaymentMethodInfo)paymentMethod;
+                        paymentMethodsToUpdate.add(new PaypalPaymentMethodInfo.Builder(paypalPaymentMethod).setDefaultMethod(false).build());
+                    }
+                    else if (paymentMethod instanceof CreditCardPaymentMethodInfo) {
+                        CreditCardPaymentMethodInfo ccPaymentMethod = (CreditCardPaymentMethodInfo)paymentMethod;
+                        paymentMethodsToUpdate.add(new CreditCardPaymentMethodInfo.Builder(ccPaymentMethod).setDefaultMethod(false).build());
+                    }
+                }
+            }
+            for (PaymentMethodInfo paymentMethod : paymentMethodsToUpdate) {
+                paymentMethods.put(paymentMethod.getId(), paymentMethod);
+            }
+        }
+    }
+
+    @Override
+    public Either<PaymentError, PaymentMethodInfo> updatePaymentMethod(String accountKey, PaymentMethodInfo paymentMethod) {
+        if (paymentMethod != null) {
+            PaymentMethodInfo realPaymentMethod = null;
+
+            if (paymentMethod instanceof PaypalPaymentMethodInfo) {
+                PaypalPaymentMethodInfo paypalPaymentMethod = (PaypalPaymentMethodInfo)paymentMethod;
+                realPaymentMethod = new PaypalPaymentMethodInfo.Builder(paypalPaymentMethod).build();
+            }
+            else if (paymentMethod instanceof CreditCardPaymentMethodInfo) {
+                CreditCardPaymentMethodInfo ccPaymentMethod = (CreditCardPaymentMethodInfo)paymentMethod;
+                realPaymentMethod = new CreditCardPaymentMethodInfo.Builder(ccPaymentMethod).build();
+            }
+            if (realPaymentMethod == null) {
+                return Either.left(new PaymentError("unsupported", "Payment method " + paymentMethod.getType() + " not supported by the plugin"));
+            }
+            else {
+                paymentMethods.put(paymentMethod.getId(), paymentMethod);
+                return Either.right(realPaymentMethod);
+            }
+        }
+        else {
+            return Either.left(new PaymentError("unknown", "Could not create add payment method " + paymentMethod + " for " + accountKey));
+        }
     }
 
     @Override
-    public Either<PaymentError, List<PaymentMethodInfo>> getPaymentMethods(String accountId) {
-        // TODO
-        return Either.left(new PaymentError("unknown", "Not implemented"));
+    public Either<PaymentError, Void> deletePaymentMethod(String accountKey, String paymentMethodId) {
+        PaymentMethodInfo paymentMethodInfo = paymentMethods.get(paymentMethodId);
+        if (paymentMethodInfo != null) {
+            if (Boolean.FALSE.equals(paymentMethodInfo.getDefaultMethod()) || paymentMethodInfo.getDefaultMethod() == null) {
+                if (paymentMethods.remove(paymentMethodId) == null) {
+                    return Either.left(new PaymentError("unknown", "Did not get any result back"));
+                }
+            }
+            else {
+                return Either.left(new PaymentError("error", "Cannot delete default payment method"));
+            }
+        }
+        return Either.right(null);
     }
 
     @Override
-    public Either<PaymentError, Void> updatePaymentGateway(String accountKey) {
-        // TODO
-        return Either.left(new PaymentError("unknown", "Not implemented"));
+    public Either<PaymentError, PaymentMethodInfo> getPaymentMethodInfo(String paymentMethodId) {
+        if (paymentMethodId == null) {
+            return Either.left(new PaymentError("unknown", "Could not retrieve payment method for paymentMethodId " + paymentMethodId));
+        }
+
+        return Either.right(paymentMethods.get(paymentMethodId));
     }
 
     @Override
-    public Either<PaymentError, PaymentProviderAccount> getPaymentProviderAccount(String accountKey) {
-        // TODO
-        return Either.left(new PaymentError("unknown", "Not implemented"));
+    public Either<PaymentError, List<PaymentMethodInfo>> getPaymentMethods(final String accountKey) {
+
+        Collection<PaymentMethodInfo> filteredPaymentMethods = Collections2.filter(paymentMethods.values(), new Predicate<PaymentMethodInfo>() {
+            @Override
+            public boolean apply(PaymentMethodInfo input) {
+                return accountKey.equals(input.getAccountId());
+            }
+        });
+        List<PaymentMethodInfo> result = new ArrayList<PaymentMethodInfo>(filteredPaymentMethods);
+        return Either.right(result);
     }
 
     @Override
-    public Either<PaymentError, String> addPaypalPaymentMethod(String accountId, PaypalPaymentMethodInfo paypalPaymentMethod) {
-        // TODO
-        return Either.left(new PaymentError("unknown", "Not implemented"));
+    public Either<PaymentError, Void> updatePaymentGateway(String accountKey) {
+        return Either.right(null);
     }
 
     @Override
-    public Either<PaymentError, Void> deletePaypalPaymentMethod(String accountKey, String paymentMethodId) {
-        // TODO
-        return Either.left(new PaymentError("unknown", "Not implemented"));
+    public Either<PaymentError, Void> updatePaymentProviderAccountExistingContact(Account account) {
+        // nothing to do here
+        return Either.right(null);
     }
 
     @Override
-    public Either<PaymentError, PaymentMethodInfo> updatePaypalPaymentMethod(String accountKey, PaymentMethodInfo paymentMethodInfo) {
-        // TODO
-        return Either.left(new PaymentError("unknown", "Not implemented"));
+    public Either<PaymentError, Void> updatePaymentProviderAccountWithNewContact(Account account) {
+        // nothing to do here
+        return Either.right(null);
     }
 
 }
diff --git a/payment/src/test/java/com/ning/billing/payment/TestHelper.java b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
index 862d98c..2a1ce91 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestHelper.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
@@ -46,10 +46,11 @@ public class TestHelper {
     }
 
     public Account createTestAccount() throws AccountApiException {
-        final String name = "First" + RandomStringUtils.random(5) + " " + "Last" + RandomStringUtils.random(5);
+        final String name = "First" + RandomStringUtils.randomAlphanumeric(5) + " " + "Last" + RandomStringUtils.randomAlphanumeric(5);
+        final String externalKey = RandomStringUtils.randomAlphanumeric(10);
         final Account account = new AccountBuilder(UUID.randomUUID()).name(name)
                                                                      .firstNameLength(name.length())
-                                                                     .externalKey("12345")
+                                                                     .externalKey(externalKey)
                                                                      .phone("123-456-7890")
                                                                      .email("user@example.com")
                                                                      .currency(Currency.USD)

pom.xml 1(+1 -0)

diff --git a/pom.xml b/pom.xml
index 7b8944a..eb22569 100644
--- a/pom.xml
+++ b/pom.xml
@@ -370,6 +370,7 @@
                                 <exclude>**/.project</exclude>
                                 <exclude>.git/**</exclude>
                                 <exclude>.gitignore</exclude>
+                                <exclude>ignore/**</exclude>
                                 <exclude>API.txt</exclude>
                                 <exclude>RELEASE.sh</exclude>
                                 <exclude>deploy.sh</exclude>

util/pom.xml 15(+14 -1)

diff --git a/util/pom.xml b/util/pom.xml
index c2894a7..2b23ebb 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -33,6 +33,10 @@
             <artifactId>jdbi</artifactId>
         </dependency>
         <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.inject</groupId>
             <artifactId>guice</artifactId>
             <version>3.0</version>
@@ -83,7 +87,16 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
-        
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>stringtemplate</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.mysql</groupId>
+            <artifactId>management-dbfiles</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/util/src/main/java/com/ning/billing/util/glue/ClockModule.java b/util/src/main/java/com/ning/billing/util/glue/ClockModule.java
index f090bba..e7c7c3f 100644
--- a/util/src/main/java/com/ning/billing/util/glue/ClockModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/ClockModule.java
@@ -26,5 +26,4 @@ public class ClockModule extends AbstractModule {
 	protected void configure() {
 		bind(Clock.class).to(DefaultClock.class).asEagerSingleton();
 	}
-
 }
diff --git a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
new file mode 100644
index 0000000..f0babf9
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.glue;
+
+import com.google.inject.AbstractModule;
+import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
+public class NotificationQueueModule extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
new file mode 100644
index 0000000..818d831
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq.dao;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.Binder;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
+import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+import com.ning.billing.util.notificationq.DefaultNotification;
+import com.ning.billing.util.notificationq.Notification;
+import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+
+@ExternalizedSqlViaStringTemplate3()
+public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, CloseMe {
+
+    //
+    // APIs for event notifications
+    //
+    @SqlQuery
+    @Mapper(NotificationSqlMapper.class)
+    public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("max") int max);
+
+    @SqlUpdate
+    public int claimNotification(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("notification_id") String eventId, @Bind("now") Date now);
+
+    @SqlUpdate
+    public void clearNotification(@Bind("notification_id") String eventId, @Bind("owner") String owner);
+
+    @SqlUpdate
+    public void insertNotification(@Bind(binder = NotificationSqlDaoBinder.class) Notification evt);
+
+    @SqlUpdate
+    public void insertClaimedHistory(@Bind("sequence_id") int sequenceId, @Bind("owner") String owner, @Bind("claimed_dt") Date clainedDate, @Bind("notification_id") String notificationId);
+
+    public static class NotificationSqlDaoBinder implements Binder<Bind, Notification> {
+
+        private Date getDate(DateTime dateTime) {
+            return dateTime == null ? null : dateTime.toDate();
+        }
+
+        @Override
+        public void bind(@SuppressWarnings("rawtypes") SQLStatement stmt, Bind bind, Notification evt) {
+            stmt.bind("notification_id", evt.getId().toString());
+            stmt.bind("created_dt", getDate(new DateTime()));
+            stmt.bind("notification_key", evt.getNotificationKey());
+            stmt.bind("effective_dt", getDate(evt.getEffectiveDate()));
+            stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
+            stmt.bind("processing_owner", (String) null);
+            stmt.bind("processing_state", NotificationLifecycleState.AVAILABLE.toString());
+        }
+    }
+
+
+    public static class NotificationSqlMapper implements ResultSetMapper<Notification> {
+
+        private DateTime getDate(ResultSet r, String fieldName) throws SQLException {
+            final Timestamp resultStamp = r.getTimestamp(fieldName);
+            return r.wasNull() ? null : new DateTime(resultStamp).toDateTime(DateTimeZone.UTC);
+        }
+
+        @Override
+        public Notification map(int index, ResultSet r, StatementContext ctx)
+        throws SQLException {
+
+            final UUID id = UUID.fromString(r.getString("notification_id"));
+            final String notificationKey = r.getString("notification_key");
+            final DateTime effectiveDate = getDate(r, "effective_dt");
+            final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
+            final String processingOwner = r.getString("processing_owner");
+            final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
+
+            return new DefaultNotification(id, processingOwner, nextAvailableDate,
+                    processingState, notificationKey, effectiveDate);
+
+        }
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
new file mode 100644
index 0000000..2946e13
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+public class DefaultNotification implements Notification {
+
+    private final UUID id;
+    private final String owner;
+    private final DateTime nextAvailableDate;
+    private final NotificationLifecycleState lifecycleState;
+    private final String notificationKey;
+    private final DateTime effectiveDate;
+
+
+    public DefaultNotification(UUID id, String owner, DateTime nextAvailableDate,
+            NotificationLifecycleState lifecycleState,
+            String notificationKey, DateTime effectiveDate) {
+        super();
+        this.id = id;
+        this.owner = owner;
+        this.nextAvailableDate = nextAvailableDate;
+        this.lifecycleState = lifecycleState;
+        this.notificationKey = notificationKey;
+        this.effectiveDate = effectiveDate;
+    }
+
+    public DefaultNotification(String notificationKey, DateTime effectiveDate) {
+        this(UUID.randomUUID(), null, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+    }
+
+    @Override
+    public UUID getId() {
+        return id;
+    }
+
+    @Override
+    public String getOwner() {
+        return owner;
+    }
+
+    @Override
+    public DateTime getNextAvailableDate() {
+        return nextAvailableDate;
+    }
+
+    @Override
+    public NotificationLifecycleState getProcessingState() {
+        return lifecycleState;
+    }
+
+    @Override
+    public boolean isAvailableForProcessing(DateTime now) {
+        switch(lifecycleState) {
+        case AVAILABLE:
+            break;
+        case IN_PROCESSING:
+            // Somebody already got the event, not available yet
+            if (nextAvailableDate.isAfter(now)) {
+                return false;
+            }
+            break;
+        case PROCESSED:
+            return false;
+        default:
+            throw new RuntimeException(String.format("Unkwnon IEvent processing state %s", lifecycleState));
+        }
+        return effectiveDate.isBefore(now);
+    }
+
+    @Override
+    public String getNotificationKey() {
+        return notificationKey;
+    }
+
+    @Override
+    public DateTime getEffectiveDate() {
+        return effectiveDate;
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
new file mode 100644
index 0000000..16d28b2
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+
+public class DefaultNotificationQueue extends NotificationQueueBase {
+
+    protected final NotificationSqlDao dao;
+
+    public DefaultNotificationQueue(final IDBI dbi, final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+        super(clock, svcName, queueName, handler, config);
+        this.dao = dbi.onDemand(NotificationSqlDao.class);
+    }
+
+    @Override
+    protected void doProcessEvents(int sequenceId) {
+        List<Notification> notifications = getReadyNotifications(sequenceId);
+        for (Notification cur : notifications) {
+            nbProcessedEvents.incrementAndGet();
+            handler.handleReadyNotification(cur.getNotificationKey());
+        }
+        // If anything happens before we get to clear those notifications, somebody else will pick them up
+        clearNotifications(notifications);
+    }
+
+    @Override
+    public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
+            final DateTime futureNotificationTime, final NotificationKey notificationKey) {
+        NotificationSqlDao transactionalNotificationDao =  transactionalDao.become(NotificationSqlDao.class);
+        Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+        transactionalNotificationDao.insertNotification(notification);
+    }
+
+
+    private void clearNotifications(final Collection<Notification> cleared) {
+
+        log.debug(String.format("NotificationQueue %s clearEventsReady START cleared size = %d",
+                getFullQName(),
+                cleared.size()));
+
+        dao.inTransaction(new Transaction<Void, NotificationSqlDao>() {
+
+            @Override
+            public Void inTransaction(NotificationSqlDao transactional,
+                    TransactionStatus status) throws Exception {
+                for (Notification cur : cleared) {
+                    transactional.clearNotification(cur.getId().toString(), hostname);
+                    log.debug(String.format("NotificationQueue %s cleared events %s", getFullQName(), cur.getId()));
+                }
+                return null;
+            }
+        });
+    }
+
+    private List<Notification> getReadyNotifications(final int seqId) {
+
+        final Date now = clock.getUTCNow().toDate();
+        final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
+
+        log.debug(String.format("NotificationQueue %s getEventsReady START effectiveNow =  %s",  getFullQName(), now));
+
+        List<Notification> result = dao.inTransaction(new Transaction<List<Notification>, NotificationSqlDao>() {
+
+            @Override
+            public List<Notification> inTransaction(NotificationSqlDao transactionalDao,
+                    TransactionStatus status) throws Exception {
+
+                List<Notification> claimedNotifications = new ArrayList<Notification>();
+                List<Notification> input = transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents());
+                for (Notification cur : input) {
+                    final boolean claimed = (transactionalDao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
+                    if (claimed) {
+                        claimedNotifications.add(cur);
+                        transactionalDao.insertClaimedHistory(seqId, hostname, now, cur.getId().toString());
+                    }
+                }
+                return claimedNotifications;
+            }
+        });
+
+        for (Notification cur : result) {
+            log.debug(String.format("NotificationQueue %sclaimed events %s",
+                    getFullQName(), cur.getId()));
+            if (cur.getOwner() != null && !cur.getOwner().equals(hostname)) {
+                log.warn(String.format("NotificationQueue %s stealing notification %s from %s",
+                        getFullQName(), cur, cur.getOwner()));
+            }
+        }
+        return result;
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
new file mode 100644
index 0000000..fe18ead
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import org.skife.jdbi.v2.IDBI;
+
+import com.google.inject.Inject;
+import com.ning.billing.util.clock.Clock;
+
+public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
+
+    private final IDBI dbi;
+
+    @Inject
+    public DefaultNotificationQueueService(final IDBI dbi, final Clock clock) {
+        super(clock);
+        this.dbi = dbi;
+    }
+
+    @Override
+    protected NotificationQueue createNotificationQueueInternal(String svcName,
+            String queueName, NotificationQueueHandler handler,
+            NotificationConfig config) {
+        return new DefaultNotificationQueue(dbi, clock, svcName, queueName, handler, config);
+    }
+
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
new file mode 100644
index 0000000..23f0de0
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public interface NotificationQueue {
+
+    /**
+    *
+    *  Record from within a transaction the need to be called back when the notification is ready
+    *
+    * @param transactionalDao the transactionalDao
+    * @param futureNotificationTime the time at which the notification is ready
+    * @param notificationKey the key for that notification
+    */
+   public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
+           final DateTime futureNotificationTime, final NotificationKey notificationKey);
+
+   /**
+    * This is only valid when the queue has been configured with isNotificationProcessingOff is true
+    * In which case, it will callback users for all the ready notifications.
+    *
+    */
+   public void processReadyNotification();
+
+   /**
+    * Stops the queue.
+    *
+    * @see NotificationQueueHandler.completedQueueStop to be notified when the notification thread exited
+    */
+   public void stopQueue();
+
+   /**
+    * Starts the queue.
+    *
+    * @see NotificationQueueHandler.completedQueueStart to be notified when the notification thread started
+    */
+   public void startQueue();
+
+
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
new file mode 100644
index 0000000..6eaf33f
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.util.Hostname;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+
+
+public abstract class NotificationQueueBase implements NotificationQueue {
+
+    protected final static Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
+
+    protected static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
+    protected final long STOP_WAIT_TIMEOUT_MS = 60000;
+
+    protected final String svcName;
+    protected final String queueName;
+    protected final NotificationQueueHandler handler;
+    protected final NotificationConfig config;
+
+    protected final Executor executor;
+    protected final Clock clock;
+    protected final String hostname;
+
+    protected static final AtomicInteger sequenceId = new AtomicInteger();
+
+    protected AtomicLong nbProcessedEvents;
+
+    // Use this object's monitor for synchronization (no need for volatile)
+    protected boolean isProcessingEvents;
+
+    // Package visibility on purpose
+    NotificationQueueBase(final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+        this.clock = clock;
+        this.svcName = svcName;
+        this.queueName = queueName;
+        this.handler = handler;
+        this.config = config;
+        this.hostname = Hostname.get();
+
+        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread th = new Thread(r);
+                th.setName(NOTIFICATION_THREAD_PREFIX + svcName + "-" + queueName);
+                th.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+                    @Override
+                    public void uncaughtException(Thread t, Throwable e) {
+                        log.error("Uncaught exception for thread " + t.getName(), e);
+                    }
+                });
+                return th;
+            }
+        });
+    }
+
+
+    @Override
+    public void processReadyNotification() {
+        // STEPH to be implemented
+    }
+
+
+    @Override
+    public void stopQueue() {
+        if (config.isNotificationProcessingOff()) {
+            handler.completedQueueStop();
+            return;
+        }
+
+        synchronized(this) {
+            isProcessingEvents = false;
+            try {
+                log.info("NotificationQueue requested to stop");
+                wait(STOP_WAIT_TIMEOUT_MS);
+                log.info("NotificationQueue requested should have exited");
+            } catch (InterruptedException e) {
+                log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
+            }
+        }
+
+    }
+
+    @Override
+    public void startQueue() {
+
+        this.isProcessingEvents = true;
+        this.nbProcessedEvents = new AtomicLong();
+
+
+        if (config.isNotificationProcessingOff()) {
+            log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
+            handler.completedQueueStart();
+            return;
+        }
+        final NotificationQueueBase notificationQueue = this;
+
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+
+                log.info(String.format("NotificationQueue thread %s [%d] started",
+                        Thread.currentThread().getName(),
+                        Thread.currentThread().getId()));
+
+                // Thread is now started, notify the listener
+                handler.completedQueueStart();
+
+                try {
+                    while (true) {
+
+                        synchronized (notificationQueue) {
+                            if (!isProcessingEvents) {
+                                log.info(String.format("NotificationQueue has been requested to stop, thread  %s  [%d] stopping...",
+                                        Thread.currentThread().getName(),
+                                        Thread.currentThread().getId()));
+                                notificationQueue.notify();
+                                break;
+                            }
+                        }
+
+                        // Callback may trigger exceptions in user code so catch anything here and live with it.
+                        try {
+                            doProcessEvents(sequenceId.getAndIncrement());
+                        } catch (Exception e) {
+                            log.error(String.format("NotificationQueue thread  %s  [%d] got an exception..",
+                                    Thread.currentThread().getName(),
+                                    Thread.currentThread().getId()), e);
+                        }
+                        sleepALittle();
+                    }
+                } catch (InterruptedException e) {
+                    log.warn(Thread.currentThread().getName() + " got interrupted ", e);
+                } catch (Throwable e) {
+                    log.error(Thread.currentThread().getName() + " got an exception exiting...", e);
+                    // Just to make it really obvious in the log
+                    e.printStackTrace();
+                } finally {
+                    handler.completedQueueStop();
+                    log.info(String.format("NotificationQueue thread  %s  [%d] exited...",
+                            Thread.currentThread().getName(),
+                            Thread.currentThread().getId()));
+                }
+            }
+
+            private void sleepALittle() throws InterruptedException {
+                Thread.sleep(config.getNotificationSleepTimeMs());
+            }
+        });
+    }
+
+
+    protected String getFullQName() {
+        return svcName + ":" +  queueName;
+    }
+
+    protected abstract void doProcessEvents(int sequenceId);
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
new file mode 100644
index 0000000..a18906b
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.NoSuchElementException;
+
+
+public interface NotificationQueueService {
+
+    public interface NotificationQueueHandler {
+        /**
+         * Called when the Notification thread has been started
+         */
+        public void completedQueueStart();
+
+        /**
+         * Called for each notification ready
+         *
+         * @param key the notification key associated to that notification entry
+         */
+        public void handleReadyNotification(String notificationKey);
+        /**
+         * Called right before the Notification thread is about to exit
+         */
+        public void completedQueueStop();
+    }
+
+    public static final class NotficationQueueAlreadyExists extends Exception {
+        private static final long serialVersionUID = 1541281L;
+
+        public NotficationQueueAlreadyExists(String msg) {
+            super(msg);
+        }
+    }
+
+    public static final class NoSuchNotificationQueue extends Exception {
+        private static final long serialVersionUID = 1541281L;
+
+        public NoSuchNotificationQueue(String msg) {
+            super(msg);
+        }
+    }
+
+    /**
+     * Creates a new NotificationQueue for a given associated with the given service and queueName
+     *
+     * @param svcName the name of the service using that queue
+     * @param queueName a name for that queue (unique per service)
+     * @param handler the handler required for notifying the caller of state change
+     * @param config the notification queue configuration
+     *
+     * @return a new NotificationQueue
+     *
+     * @throws NotficationQueueAlreadyExists is the queue associated with that service and name already exits
+     *
+     */
+    NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config)
+        throws NotficationQueueAlreadyExists;
+
+    /**
+     * Retrieves an already created NotificationQueue by service and name if it exists
+     *
+     * @param svcName
+     * @param queueName
+     * @return
+     *
+     * @throws NoSuchNotificationQueue if queue does not exist
+     */
+    NotificationQueue getNotificationQueue(final String svcName, final String queueName)
+        throws NoSuchNotificationQueue;
+
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
new file mode 100644
index 0000000..a4dc64e
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.util.clock.Clock;
+
+public abstract class NotificationQueueServiceBase implements NotificationQueueService {
+
+    protected final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+
+    protected final Clock clock;
+
+    private final Map<String, NotificationQueue> queues;
+
+    @Inject
+    public NotificationQueueServiceBase(final Clock clock) {
+
+        this.clock = clock;
+        this.queues = new TreeMap<String, NotificationQueue>();
+    }
+
+    @Override
+    public NotificationQueue createNotificationQueue(String svcName,
+            String queueName, NotificationQueueHandler handler,
+            NotificationConfig config) throws NotficationQueueAlreadyExists {
+        if (svcName == null || queueName == null || handler == null || config == null) {
+            throw new RuntimeException("Need to specify all parameters");
+        }
+
+        String compositeName = getCompositeName(svcName, queueName);
+        NotificationQueue result = null;
+        synchronized(queues) {
+            result = queues.get(compositeName);
+            if (result != null) {
+                throw new NotficationQueueAlreadyExists(String.format("Queue for svc %s and name %s already exist",
+                        svcName, queueName));
+            }
+            result = createNotificationQueueInternal(svcName, queueName, handler, config);
+            queues.put(compositeName, result);
+        }
+        return result;
+    }
+
+    @Override
+    public NotificationQueue getNotificationQueue(String svcName,
+            String queueName) throws NoSuchNotificationQueue {
+
+        NotificationQueue result = null;
+        String compositeName = getCompositeName(svcName, queueName);
+        synchronized(queues) {
+            result = queues.get(compositeName);
+            if (result == null) {
+                throw new NoSuchNotificationQueue(String.format("Queue for svc %s and name %s does not exist",
+                        svcName, queueName));
+            }
+        }
+        return result;
+    }
+
+
+    protected abstract NotificationQueue createNotificationQueueInternal(String svcName,
+            String queueName, NotificationQueueHandler handler,
+            NotificationConfig config);
+
+
+    private String getCompositeName(String svcName, String queueName) {
+        return svcName + ":" + queueName;
+    }
+}
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index c0bff95..a65e0cd 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -32,4 +32,31 @@ CREATE TABLE tags (
   PRIMARY KEY(id)
 ) ENGINE = innodb;
 CREATE INDEX tags_by_object ON tags(object_id);
-CREATE UNIQUE INDEX tags_unique ON tags(tag_definition_name, object_id);
\ No newline at end of file
+CREATE UNIQUE INDEX tags_unique ON tags(tag_definition_name, object_id);
+
+DROP TABLE IF EXISTS notifications;
+CREATE TABLE notifications (
+    id int(11) unsigned NOT NULL AUTO_INCREMENT,
+    notification_id char(36) NOT NULL,
+    created_dt datetime NOT NULL,
+	notification_key varchar(256) NOT NULL,
+    effective_dt datetime NOT NULL,
+    processing_owner char(36) DEFAULT NULL,
+    processing_available_dt datetime DEFAULT NULL,
+    processing_state varchar(14) DEFAULT 'AVAILABLE',
+    PRIMARY KEY(id)
+) ENGINE=innodb;
+CREATE INDEX  `idx_comp_where` ON notifications (`effective_dt`,`processing_state`,`processing_owner`,`processing_available_dt`);
+CREATE INDEX  `idx_update` ON notifications (`notification_id`,`processing_state`,`processing_owner`,`processing_available_dt`);
+CREATE INDEX  `idx_update1` ON notifications (`notification_id`,`processing_owner`);
+CREATE INDEX  `idx_get_ready` ON notifications (`effective_dt`,`created_dt`,`id`);
+
+DROP TABLE IF EXISTS claimed_notifications;
+CREATE TABLE claimed_notifications (
+    id int(11) unsigned NOT NULL AUTO_INCREMENT,    
+    sequence_id int(11) unsigned NOT NULL,    
+    owner_id varchar(64) NOT NULL,
+    claimed_dt datetime NOT NULL,
+    notification_id char(36) NOT NULL,
+    PRIMARY KEY(id)
+) ENGINE=innodb;
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
new file mode 100644
index 0000000..5a44431
--- /dev/null
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -0,0 +1,83 @@
+group NotificationSqlDao;
+
+getReadyNotifications(now, max) ::= <<
+    select
+      notification_id
+    , notification_key
+      , created_dt
+      , effective_dt
+      , processing_owner
+      , processing_available_dt
+      , processing_state
+    from notifications
+    where
+      effective_dt \<= :now
+      and processing_state != 'PROCESSED'
+      and (processing_owner IS NULL OR processing_available_dt \<= :now)
+    order by
+      effective_dt asc
+      , created_dt asc
+      , id asc
+    limit :max
+    ;
+>>
+
+
+claimNotification(owner, next_available, notification_id, now) ::= <<
+    update notifications
+    set
+      processing_owner = :owner
+      , processing_available_dt = :next_available
+      , processing_state = 'IN_PROCESSING'
+    where
+      notification_id = :notification_id
+      and processing_state != 'PROCESSED'
+      and (processing_owner IS NULL OR processing_available_dt \<= :now)
+    ;
+>>
+
+clearNotification(notification_id, owner) ::= <<
+    update notifications
+    set
+      processing_owner = NULL
+      , processing_state = 'PROCESSED'
+    where
+      notification_id = :notification_id
+      and processing_owner = :owner
+    ;
+>>
+
+insertNotification() ::= <<
+    insert into notifications (
+      notification_id
+    , notification_key
+      , created_dt
+      , effective_dt
+      , processing_owner
+      , processing_available_dt
+      , processing_state
+    ) values (
+      :notification_id
+      , :notification_key
+      , :created_dt
+      , :effective_dt
+      , :processing_owner
+      , :processing_available_dt
+      , :processing_state
+    );   
+>>
+
+
+insertClaimedHistory(sequence_id, owner, hostname, claimed_dt, notification_id) ::= <<
+    insert into claimed_notifications (
+        sequence_id
+        , owner_id
+        , claimed_dt
+        , notification_id
+      ) values (
+        :sequence_id
+        , :owner
+        , :claimed_dt
+        , :notification_id
+      );
+>>
diff --git a/util/src/test/java/com/ning/billing/util/clock/ClockMock.java b/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
index 0fb473d..7698697 100644
--- a/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
+++ b/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
@@ -76,6 +76,15 @@ public class ClockMock extends DefaultClock {
         deltaFromRealityMs = delta;
     }
 
+    public synchronized void addDeltaFromReality(long delta) {
+        if (deltaType != DeltaType.DELTA_ABS) {
+            throw new RuntimeException("ClockMock should be set with type DELTA_ABS");
+        }
+        deltaFromRealityDuration = null;
+        deltaFromRealitDurationEpsilon = 0;
+        deltaFromRealityMs += delta;
+    }
+
     public synchronized void resetDeltaFromReality() {
         deltaType = DeltaType.DELTA_NONE;
         deltaFromRealityDuration = null;
diff --git a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
index 1419b8f..ba9f76a 100644
--- a/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
+++ b/util/src/test/java/com/ning/billing/util/eventbus/TestEventBus.java
@@ -81,7 +81,7 @@ public class TestEventBus {
         @Subscribe
         public synchronized void processEvent(MyEvent event) {
             gotEvents++;
-            log.info("Got event {} {}", event.name, event.value);
+            //log.debug("Got event {} {}", event.name, event.value);
         }
 
         public synchronized boolean waitForCompletion(long timeoutMs) {
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
new file mode 100644
index 0000000..89dfd76
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq.dao;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.IOUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.ning.billing.dbi.DBIProvider;
+import com.ning.billing.dbi.DbiConfig;
+import com.ning.billing.dbi.MysqlTestingHelper;
+import com.ning.billing.util.notificationq.DefaultNotification;
+import com.ning.billing.util.notificationq.Notification;
+import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao.NotificationSqlMapper;
+
+@Guice(modules = TestNotificationSqlDao.TestNotificationSqlDaoModule.class)
+public class TestNotificationSqlDao {
+
+    private static AtomicInteger sequenceId = new AtomicInteger();
+
+    @Inject
+    private DBI dbi;
+
+    @Inject
+    MysqlTestingHelper helper;
+
+    private NotificationSqlDao dao;
+
+    private void startMysql() throws IOException, ClassNotFoundException, SQLException {
+
+
+        final String ddl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
+        helper.startMysql();
+        helper.initDb(ddl);
+    }
+
+    @BeforeSuite(alwaysRun = true)
+    public void setup()  {
+        try {
+            startMysql();
+            dao = dbi.onDemand(NotificationSqlDao.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @AfterSuite(alwaysRun = true)
+    public void stopMysql()
+    {
+        helper.stopMysql();
+    }
+
+
+    @BeforeTest
+    public void cleanupDb() {
+        dbi.withHandle(new HandleCallback<Void>() {
+
+            @Override
+            public Void withHandle(Handle handle) throws Exception {
+                handle.execute("delete from notifications");
+                handle.execute("delete from claimed_notifications");
+                return null;
+            }
+        });
+    }
+
+    @Test
+    public void testBasic() throws InterruptedException {
+
+        final String ownerId = UUID.randomUUID().toString();
+
+        String notificationKey = UUID.randomUUID().toString();
+        DateTime effDt = new DateTime();
+        Notification notif = new DefaultNotification(notificationKey, effDt);
+        dao.insertNotification(notif);
+
+        Thread.sleep(1000);
+        DateTime now = new DateTime();
+        List<Notification> notifications = dao.getReadyNotifications(now.toDate(), 3);
+        assertNotNull(notifications);
+        assertEquals(notifications.size(), 1);
+
+        Notification notification = notifications.get(0);
+        assertEquals(notification.getNotificationKey(), notificationKey);
+        validateDate(notification.getEffectiveDate(), effDt);
+        assertEquals(notification.getOwner(), null);
+        assertEquals(notification.getProcessingState(), NotificationLifecycleState.AVAILABLE);
+        assertEquals(notification.getNextAvailableDate(), null);
+
+        DateTime nextAvailable = now.plusMinutes(5);
+        int res = dao.claimNotification(ownerId, nextAvailable.toDate(), notification.getId().toString(), now.toDate());
+        assertEquals(res, 1);
+        dao.insertClaimedHistory(sequenceId.incrementAndGet(), ownerId, now.toDate(), notification.getId().toString());
+
+        notification = fetchNotification(notification.getId().toString());
+        assertEquals(notification.getNotificationKey(), notificationKey);
+        validateDate(notification.getEffectiveDate(), effDt);
+        assertEquals(notification.getOwner().toString(), ownerId);
+        assertEquals(notification.getProcessingState(), NotificationLifecycleState.IN_PROCESSING);
+        validateDate(notification.getNextAvailableDate(), nextAvailable);
+
+        dao.clearNotification(notification.getId().toString(), ownerId);
+
+        notification = fetchNotification(notification.getId().toString());
+        assertEquals(notification.getNotificationKey(), notificationKey);
+        validateDate(notification.getEffectiveDate(), effDt);
+        assertEquals(notification.getOwner(), null);
+        assertEquals(notification.getProcessingState(), NotificationLifecycleState.PROCESSED);
+        validateDate(notification.getNextAvailableDate(), nextAvailable);
+
+    }
+
+    private Notification fetchNotification(final String notificationId) {
+        Notification res =  dbi.withHandle(new HandleCallback<Notification>() {
+
+            @Override
+            public Notification withHandle(Handle handle) throws Exception {
+                Notification res = handle.createQuery("   select" +
+                		" notification_id" +
+                		", notification_key" +
+                		", created_dt" +
+                		", effective_dt" +
+                		", processing_owner" +
+                		", processing_available_dt" +
+                		", processing_state" +
+                		"    from notifications " +
+                		" where " +
+                		" notification_id = '" + notificationId + "';")
+                		.map(new NotificationSqlMapper())
+                		.first();
+                return res;
+            }
+        });
+        return res;
+    }
+
+    private void validateDate(DateTime input, DateTime expected) {
+        if (input == null && expected != null) {
+            Assert.fail("Got input date null");
+        }
+        if (input != null && expected == null) {
+            Assert.fail("Was expecting null date");
+        }
+        expected = truncateAndUTC(expected);
+        input = truncateAndUTC(input);
+        Assert.assertEquals(input, expected);
+    }
+
+    private DateTime truncateAndUTC(DateTime input) {
+        if (input == null) {
+            return null;
+        }
+        DateTime result = input.minus(input.getMillisOfSecond());
+        return result.toDateTime(DateTimeZone.UTC);
+    }
+
+    public static class TestNotificationSqlDaoModule extends AbstractModule {
+        @Override
+        protected void configure() {
+
+            final MysqlTestingHelper helper = new MysqlTestingHelper();
+            bind(MysqlTestingHelper.class).toInstance(helper);
+            DBI dbi = helper.getDBI();
+            bind(DBI.class).toInstance(dbi);
+
+            /*
+            bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
+            final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
+            bind(DbiConfig.class).toInstance(config);
+            */
+        }
+    }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java b/util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java
new file mode 100644
index 0000000..9495001
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.UUID;
+
+public class DummyObject {
+    private final String value;
+    private final UUID key;
+
+    public DummyObject(String value, UUID key) {
+        super();
+        this.value = value;
+        this.key = key;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    public UUID getKey() {
+        return key;
+    }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java b/util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java
new file mode 100644
index 0000000..83b1b20
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.Binder;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
+import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+
+@ExternalizedSqlViaStringTemplate3()
+public interface DummySqlTest extends Transactional<DummySqlTest>, Transmogrifier, CloseMe {
+
+    @SqlUpdate
+    public void insertDummy(@Bind(binder = DummySqlTestBinder.class) DummyObject dummy);
+
+    @SqlQuery
+    @Mapper(DummySqlTestMapper.class)
+    public DummyObject getDummyFromId(@Bind("dummy_id") String dummyId);
+
+    public static class DummySqlTestBinder implements Binder<Bind, DummyObject> {
+        @Override
+        public void bind(@SuppressWarnings("rawtypes") SQLStatement stmt, Bind bind, DummyObject dummy) {
+            stmt.bind("dummy_id", dummy.getKey().toString());
+            stmt.bind("value", dummy.getValue());
+        }
+    }
+
+    public static class DummySqlTestMapper  implements ResultSetMapper<DummyObject> {
+        @Override
+        public DummyObject map(int index, ResultSet r, StatementContext ctx)
+                throws SQLException {
+            final UUID key = UUID.fromString(r.getString("dummy_id"));
+            final String value = r.getString("value");
+            return new DummyObject(value, key);
+        }
+    }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
new file mode 100644
index 0000000..b141310
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
+
+
+    private TreeSet<Notification> notifications;
+
+    public MockNotificationQueue(final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+        super(clock, svcName, queueName, handler, config);
+        notifications = new TreeSet<Notification>(new Comparator<Notification>() {
+            @Override
+            public int compare(Notification o1, Notification o2) {
+                if (o1.getEffectiveDate().equals(o2.getEffectiveDate())) {
+                    return o1.getNotificationKey().compareTo(o2.getNotificationKey());
+                } else {
+                    return o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
+                }
+            }
+        });
+    }
+
+    @Override
+    public void recordFutureNotificationFromTransaction(
+            Transmogrifier transactionalDao, DateTime futureNotificationTime,
+            NotificationKey notificationKey) {
+        Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+        synchronized(notifications) {
+            notifications.add(notification);
+        }
+    }
+
+    @Override
+    protected void doProcessEvents(int sequenceId) {
+
+        List<Notification> processedNotifications = new ArrayList<Notification>();
+        List<Notification> oldNotifications = new ArrayList<Notification>();
+
+        List<Notification> readyNotifications = new ArrayList<Notification>();
+        synchronized(notifications) {
+            Iterator<Notification> it = notifications.iterator();
+            while (it.hasNext()) {
+                Notification cur = it.next();
+                if (cur.isAvailableForProcessing(clock.getUTCNow())) {
+                    readyNotifications.add(cur);
+                }
+            }
+            for (Notification cur : readyNotifications) {
+                handler.handleReadyNotification(cur.getNotificationKey());
+                DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+                oldNotifications.add(cur);
+                processedNotifications.add(processedNotification);
+
+            }
+            if (oldNotifications.size() > 0) {
+                notifications.removeAll(oldNotifications);
+            }
+            if (processedNotifications.size() > 0) {
+                notifications.addAll(processedNotifications);
+            }
+        }
+
+    }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
new file mode 100644
index 0000000..2e3bb3c
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -0,0 +1,428 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.joda.time.DateTime;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.ning.billing.dbi.DBIProvider;
+import com.ning.billing.dbi.DbiConfig;
+import com.ning.billing.dbi.MysqlTestingHelper;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+
+@Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
+public class TestNotificationQueue {
+
+    private final static Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
+
+    @Inject
+    private DBI dbi;
+
+    @Inject
+    MysqlTestingHelper helper;
+
+    @Inject
+    private Clock clock;
+
+    private DummySqlTest dao;
+
+   // private NotificationQueue queue;
+
+    private void startMysql() throws IOException, ClassNotFoundException, SQLException {
+        final String ddl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl.sql"));
+        final String testDdl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl_test.sql"));
+        helper.startMysql();
+        helper.initDb(ddl);
+        helper.initDb(testDdl);
+    }
+
+    @BeforeSuite(alwaysRun = true)
+    public void setup() throws Exception {
+        startMysql();
+        dao = dbi.onDemand(DummySqlTest.class);
+    }
+
+    @BeforeTest
+    public void beforeTest() {
+        dbi.withHandle(new HandleCallback<Void>() {
+
+            @Override
+            public Void withHandle(Handle handle) throws Exception {
+                handle.execute("delete from notifications");
+                handle.execute("delete from claimed_notifications");
+                handle.execute("delete from dummy");
+                return null;
+            }
+        });
+        // Reset time to real value
+        ((ClockMock) clock).resetDeltaFromReality();
+    }
+
+
+
+    /**
+     * Verify that we can call start/stop on a disabled queue and that both start/stop callbacks are called
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testSimpleQueueDisabled() throws InterruptedException {
+
+        final TestStartStop testStartStop = new TestStartStop(false, false);
+        DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "dead",
+                new NotificationQueueHandler() {
+                    @Override
+                    public void handleReadyNotification(String notificationKey) {
+                    }
+                    @Override
+                    public void completedQueueStop() {
+                        testStartStop.stopped();
+                    }
+                    @Override
+                    public void completedQueueStart() {
+                        testStartStop.started();
+                    }
+                },
+                getNotificationConfig(true, 100, 1, 10000));
+
+        executeTest(testStartStop, queue, new WithTest() {
+            @Override
+            public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
+                // Do nothing
+            }
+        });
+        assertTrue(true);
+    }
+
+    /**
+     * Test that we can post a notification in the future from a transaction and get the notification
+     * callback with the correct key when the time is ready
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testSimpleNotification() throws InterruptedException {
+
+        final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+        final TestStartStop testStartStop = new TestStartStop(false, false);
+        DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
+                new NotificationQueueHandler() {
+                    @Override
+                    public void handleReadyNotification(String notificationKey) {
+                        synchronized (expectedNotifications) {
+                            expectedNotifications.put(notificationKey, Boolean.TRUE);
+                            expectedNotifications.notify();
+                        }
+                    }
+                    @Override
+                    public void completedQueueStop() {
+                        testStartStop.stopped();
+                    }
+                    @Override
+                    public void completedQueueStart() {
+                        testStartStop.started();
+                    }
+                },
+                getNotificationConfig(false, 100, 1, 10000));
+
+
+        executeTest(testStartStop, queue, new WithTest() {
+            @Override
+            public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
+
+                final UUID key = UUID.randomUUID();
+                final DummyObject obj = new DummyObject("foo", key);
+                final DateTime now = new DateTime();
+                final DateTime readyTime = now.plusMillis(2000);
+                final NotificationKey notificationKey = new NotificationKey() {
+                    @Override
+                    public String toString() {
+                        return key.toString();
+                    }
+                };
+                expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+
+                // Insert dummy to be processed in 2 sec'
+                dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+                    @Override
+                    public Void inTransaction(DummySqlTest transactional,
+                            TransactionStatus status) throws Exception {
+
+                        transactional.insertDummy(obj);
+                        readyQueue.recordFutureNotificationFromTransaction(transactional,
+                                readyTime, notificationKey);
+                        return null;
+                    }
+                });
+
+                // Move time in the future after the notification effectiveDate
+                ((ClockMock) clock).setDeltaFromReality(3000);
+
+                // Notification should have kicked but give it at least a sec' for thread scheduling
+                int nbTry = 1;
+                boolean success = false;
+                do {
+                    synchronized(expectedNotifications) {
+                        if (expectedNotifications.get(notificationKey.toString())) {
+                            success = true;
+                            break;
+                        }
+                        expectedNotifications.wait(1000);
+                    }
+                } while (nbTry-- > 0);
+                assertEquals(success, true);
+            }
+        });
+    }
+
+    @Test
+    public void testManyNotifications() throws InterruptedException {
+        final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+        final TestStartStop testStartStop = new TestStartStop(false, false);
+        DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
+                new NotificationQueueHandler() {
+                    @Override
+                    public void handleReadyNotification(String notificationKey) {
+                        synchronized (expectedNotifications) {
+                            expectedNotifications.put(notificationKey, Boolean.TRUE);
+                            expectedNotifications.notify();
+                        }
+                    }
+                    @Override
+                    public void completedQueueStop() {
+                        testStartStop.stopped();
+                    }
+                    @Override
+                    public void completedQueueStart() {
+                        testStartStop.started();
+                    }
+                },
+                getNotificationConfig(false, 100, 10, 10000));
+
+
+        executeTest(testStartStop, queue, new WithTest() {
+            @Override
+            public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
+
+                final DateTime now = clock.getUTCNow();
+                final int MAX_NOTIFICATIONS = 100;
+                for (int i = 0; i < MAX_NOTIFICATIONS; i++) {
+
+                    final int nextReadyTimeIncrementMs = 1000;
+
+                    final UUID key = UUID.randomUUID();
+                    final DummyObject obj = new DummyObject("foo", key);
+                    final int currentIteration = i;
+
+                    final NotificationKey notificationKey = new NotificationKey() {
+                        @Override
+                        public String toString() {
+                            return key.toString();
+                        }
+                    };
+                    expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+                    dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+                        @Override
+                        public Void inTransaction(DummySqlTest transactional,
+                                TransactionStatus status) throws Exception {
+
+                            transactional.insertDummy(obj);
+                            readyQueue.recordFutureNotificationFromTransaction(transactional,
+                                    now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
+                            return null;
+                        }
+                    });
+
+                    // Move time in the future after the notification effectiveDate
+                    if (i == 0) {
+                        ((ClockMock) clock).setDeltaFromReality(nextReadyTimeIncrementMs);
+                    } else {
+                        ((ClockMock) clock).addDeltaFromReality(nextReadyTimeIncrementMs);
+                    }
+                }
+
+                // Wait a little longer since there are a lot of callback that need to happen
+                int nbTry = MAX_NOTIFICATIONS + 1;
+                boolean success = false;
+                do {
+                    synchronized(expectedNotifications) {
+
+                        Collection<Boolean> completed =  Collections2.filter(expectedNotifications.values(), new Predicate<Boolean>() {
+                            @Override
+                            public boolean apply(Boolean input) {
+                                return input;
+                            }
+                        });
+
+                        if (completed.size() == MAX_NOTIFICATIONS) {
+                            success = true;
+                            break;
+                        }
+                        //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+                        expectedNotifications.wait(1000);
+                    }
+                } while (nbTry-- > 0);
+                assertEquals(success, true);
+            }
+        });
+    }
+
+
+    NotificationConfig getNotificationConfig(final boolean off,
+            final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
+        return new NotificationConfig() {
+            @Override
+            public boolean isNotificationProcessingOff() {
+                return off;
+            }
+            @Override
+            public long getNotificationSleepTimeMs() {
+                return sleepTime;
+            }
+            @Override
+            public int getDaoMaxReadyEvents() {
+                return maxReadyEvents;
+            }
+            @Override
+            public long getDaoClaimTimeMs() {
+                return claimTimeMs;
+            }
+        };
+    }
+
+    private static class TestStartStop {
+        private boolean started;
+        private boolean stopped;
+
+        public TestStartStop(boolean started, boolean stopped) {
+            super();
+            this.started = started;
+            this.stopped = stopped;
+        }
+
+        public void started() {
+            synchronized(this) {
+                started = true;
+                notify();
+            }
+        }
+
+        public void stopped() {
+            synchronized(this) {
+                stopped = true;
+                notify();
+            }
+        }
+
+        public boolean waitForStartComplete(int timeoutMs) throws InterruptedException {
+            return waitForEventCompletion(timeoutMs, true);
+        }
+
+        public boolean waitForStopComplete(int timeoutMs) throws InterruptedException {
+            return waitForEventCompletion(timeoutMs, false);
+        }
+
+        private boolean waitForEventCompletion(int timeoutMs, boolean start) throws InterruptedException {
+            DateTime init = new DateTime();
+            synchronized(this) {
+                while (! ((start ? started : stopped))) {
+                    wait(timeoutMs);
+                    if (init.plusMillis(timeoutMs).isAfterNow()) {
+                        break;
+                    }
+                }
+            }
+            return (start ? started : stopped);
+        }
+    }
+
+    private interface WithTest {
+        public void test(DefaultNotificationQueue readyQueue) throws InterruptedException;
+    }
+
+    private void executeTest(final TestStartStop testStartStop,
+            DefaultNotificationQueue queue, WithTest test) throws InterruptedException{
+
+        queue.startQueue();
+        boolean started = testStartStop.waitForStartComplete(3000);
+        assertEquals(started, true);
+
+        test.test(queue);
+
+        queue.stopQueue();
+        boolean stopped = testStartStop.waitForStopComplete(3000);
+        assertEquals(stopped, true);
+    }
+
+
+    public static class TestNotificationQueueModule extends AbstractModule {
+        @Override
+        protected void configure() {
+
+            bind(Clock.class).to(ClockMock.class);
+
+            final MysqlTestingHelper helper = new MysqlTestingHelper();
+            bind(MysqlTestingHelper.class).toInstance(helper);
+            DBI dbi = helper.getDBI();
+            bind(DBI.class).toInstance(dbi);
+            /*
+            bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
+            final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
+            bind(DbiConfig.class).toInstance(config);
+            */
+        }
+    }
+
+
+}
diff --git a/util/src/test/resources/com/ning/billing/util/ddl_test.sql b/util/src/test/resources/com/ning/billing/util/ddl_test.sql
new file mode 100644
index 0000000..50de498
--- /dev/null
+++ b/util/src/test/resources/com/ning/billing/util/ddl_test.sql
@@ -0,0 +1,6 @@
+DROP TABLE IF EXISTS dummy;
+CREATE TABLE dummy (
+    dummy_id char(36) NOT NULL,
+    value varchar(256) NOT NULL,
+    PRIMARY KEY(dummy_id)
+) ENGINE = innodb;
diff --git a/util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg b/util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg
new file mode 100644
index 0000000..9a2e6e2
--- /dev/null
+++ b/util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg
@@ -0,0 +1,21 @@
+group DummySqlTest;
+
+insertDummy() ::= <<
+  insert into dummy (
+    dummy_id
+    , value
+  ) values (
+    :dummy_id
+    , :value
+  );
+>>
+
+getDummyFromId(dummy_id)  ::= <<
+  select 
+    dummy_id
+    , value
+  from dummy
+  where
+    dummy_id = :dummy_id
+  ;
+>> 
\ No newline at end of file