killbill-aplcache

Changes

util/pom.xml 10(+9 -1)

util/src/main/java/com/ning/billing/util/notification/NotificationSystem.java 51(+0 -51)

Details

diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
index cfeaca7..e828e27 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
@@ -67,8 +67,6 @@ public abstract class ApiEventProcessorBase implements EventNotifier {
         this.nbProcessedEvents = 0;
     }
 
-
-
     @Override
     public void startNotifications(final EventListener listener) {
 
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
index 016f005..fc4db0f 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
@@ -16,6 +16,13 @@
 
 package com.ning.billing.entitlement.engine.dao;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
 import com.google.inject.Inject;
 import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.config.EntitlementConfig;
@@ -41,7 +48,6 @@ import org.skife.jdbi.v2.TransactionStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
 
 public class EntitlementSqlDao implements EntitlementDao {
 
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index ff3a443..fe5d6a7 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -93,7 +93,7 @@ public abstract class TestApiBase {
     protected ApiTestListener testListener;
     protected SubscriptionBundle bundle;
 
-    public static void loadSystemPropertiesFromClasspath( final String resource )
+    public static void loadSystemPropertiesFromClasspath(final String resource)
     {
         final URL url = TestApiBase.class.getResource(resource);
         assertNotNull(url);

util/pom.xml 10(+9 -1)

diff --git a/util/pom.xml b/util/pom.xml
index 7884db7..da7e452 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -33,6 +33,10 @@
             <artifactId>jdbi</artifactId>
         </dependency>
         <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.inject</groupId>
             <artifactId>guice</artifactId>
             <version>3.0</version>
@@ -69,7 +73,11 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
-        
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>stringtemplate</artifactId>
+            <scope>runtime</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
new file mode 100644
index 0000000..f0babf9
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.glue;
+
+import com.google.inject.AbstractModule;
+import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
+public class NotificationQueueModule extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
new file mode 100644
index 0000000..802d16b
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq.dao;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.Binder;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
+import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+import com.ning.billing.util.notificationq.DefaultNotification;
+import com.ning.billing.util.notificationq.Notification;
+import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+
+@ExternalizedSqlViaStringTemplate3()
+public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, CloseMe {
+
+    //
+    // APIs for event notifications
+    //
+    @SqlQuery
+    @Mapper(NotificationSqlMapper.class)
+    public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("max") int max);
+
+    @SqlUpdate
+    public int claimNotification(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("notification_id") String eventId, @Bind("now") Date now);
+
+    @SqlUpdate
+    public void clearNotification(@Bind("notification_id") String eventId, @Bind("owner") String owner);
+
+    @SqlUpdate
+    public void insertNotification(@Bind(binder = NotificationSqlDaoBinder.class) Notification evt);
+
+    @SqlUpdate
+    public void insertClaimedHistory(@Bind("sequence_id") int sequenceId, @Bind("owner") String owner, @Bind("claimed_dt") Date clainedDate, @Bind("notification_id") String notificationId);
+
+    public static class NotificationSqlDaoBinder implements Binder<Bind, Notification> {
+
+        private Date getDate(DateTime dateTime) {
+            return dateTime == null ? null : dateTime.toDate();
+        }
+
+        @Override
+        public void bind(@SuppressWarnings("rawtypes") SQLStatement stmt, Bind bind, Notification evt) {
+            stmt.bind("notification_id", evt.getId().toString());
+            stmt.bind("created_dt", getDate(new DateTime()));
+            stmt.bind("notification_key", evt.getNotificationKey());
+            stmt.bind("effective_dt", getDate(evt.getEffectiveDate()));
+            stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
+            stmt.bind("processing_owner", (String) null);
+            stmt.bind("processing_state", NotificationLifecycleState.AVAILABLE.toString());
+        }
+    }
+
+
+    public static class NotificationSqlMapper implements ResultSetMapper<Notification> {
+
+        private DateTime getDate(ResultSet r, String fieldName) throws SQLException {
+            final Timestamp resultStamp = r.getTimestamp(fieldName);
+            return r.wasNull() ? null : new DateTime(resultStamp).toDateTime(DateTimeZone.UTC);
+        }
+
+        @Override
+        public Notification map(int index, ResultSet r, StatementContext ctx)
+        throws SQLException {
+
+            final UUID id = UUID.fromString(r.getString("notification_id"));
+            final String notificationKey = r.getString("notification_key");
+            final DateTime effectiveDate = getDate(r, "effective_dt");
+            final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
+            final UUID processingOwner = (r.getString("processing_owner") != null) ? UUID.fromString(r.getString("processing_owner")) : null;
+            final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
+
+            return new DefaultNotification(id, processingOwner, nextAvailableDate,
+                    processingState, notificationKey, effectiveDate);
+
+        }
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
new file mode 100644
index 0000000..56a8547
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+public class DefaultNotification implements Notification {
+
+    private final UUID id;
+    private final UUID owner;
+    private final DateTime nextAvailableDate;
+    private final NotificationLifecycleState lifecycleState;
+    private final String notificationKey;
+    private final DateTime effectiveDate;
+
+
+    public DefaultNotification(UUID id, UUID owner, DateTime nextAvailableDate,
+            NotificationLifecycleState lifecycleState,
+            String notificationKey, DateTime effectiveDate) {
+        super();
+        this.id = id;
+        this.owner = owner;
+        this.nextAvailableDate = nextAvailableDate;
+        this.lifecycleState = lifecycleState;
+        this.notificationKey = notificationKey;
+        this.effectiveDate = effectiveDate;
+    }
+
+    public DefaultNotification(String notificationKey, DateTime effectiveDate) {
+        this(UUID.randomUUID(), null, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+    }
+
+    @Override
+    public UUID getId() {
+        return id;
+    }
+
+    @Override
+    public UUID getOwner() {
+        return owner;
+    }
+
+    @Override
+    public DateTime getNextAvailableDate() {
+        return nextAvailableDate;
+    }
+
+    @Override
+    public NotificationLifecycleState getProcessingState() {
+        return lifecycleState;
+    }
+
+    @Override
+    public boolean isAvailableForProcessing(DateTime now) {
+        switch(lifecycleState) {
+        case AVAILABLE:
+            break;
+        case IN_PROCESSING:
+            // Somebody already got the event, not available yet
+            if (nextAvailableDate.isAfter(now)) {
+                return false;
+            }
+            break;
+        case PROCESSED:
+            return false;
+        default:
+            throw new RuntimeException(String.format("Unkwnon IEvent processing state %s", lifecycleState));
+        }
+        return effectiveDate.isBefore(now);
+    }
+
+    @Override
+    public String getNotificationKey() {
+        return notificationKey;
+    }
+
+    @Override
+    public DateTime getEffectiveDate() {
+        return effectiveDate;
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
new file mode 100644
index 0000000..6685e05
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.skife.jdbi.v2.DBI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.util.clock.Clock;
+
+public class DefaultNotificationQueueService implements NotificationQueueService {
+
+    private final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+
+    private final DBI dbi;
+    private final Clock clock;
+
+    private final Map<String, NotificationQueue> queues;
+
+    public DefaultNotificationQueueService(final DBI dbi, final Clock clock) {
+        this.dbi = dbi;
+        this.clock = clock;
+        this.queues = new TreeMap<String, NotificationQueue>();
+    }
+
+    @Override
+    public NotificationQueue createNotificationQueue(String svcName,
+            String queueName, NotificationQueueHandler handler,
+            NotificationConfig config) {
+        if (svcName == null || queueName == null || handler == null || config == null) {
+            throw new RuntimeException("Need to specify all parameters");
+        }
+
+        String compositeName = svcName + ":" + queueName;
+        NotificationQueue result = null;
+        synchronized(queues) {
+            result = queues.get(compositeName);
+            if (result == null) {
+                result = new NotificationQueue(dbi, clock, svcName, queueName, handler, config);
+                queues.put(compositeName, result);
+            } else {
+                log.warn("Queue for svc {} and name {} already exist", svcName, queueName);
+            }
+        }
+        return result;
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java
new file mode 100644
index 0000000..bfe4aa9
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+/**
+ *
+ * The notification key associated with a given notification
+ */
+public interface NotificationKey {
+
+    @Override
+    public String toString();
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
new file mode 100644
index 0000000..7c81108
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+
+public interface NotificationLifecycle {
+
+    public enum NotificationLifecycleState {
+        AVAILABLE,
+        IN_PROCESSING,
+        PROCESSED
+    }
+
+    public UUID getOwner();
+
+    //public void setOwner(UUID owner);
+
+    public DateTime getNextAvailableDate();
+
+    //public void setNextAvailableDate(DateTime dateTime);
+
+    public NotificationLifecycleState getProcessingState();
+
+    //public void setProcessingState(NotificationLifecycleState procesingState);
+
+    public boolean isAvailableForProcessing(DateTime now);
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
new file mode 100644
index 0000000..5677335
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.util.Hostname;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+
+
+public class NotificationQueue {
+
+    protected final static Logger log = LoggerFactory.getLogger(NotificationQueue.class);
+
+    private static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
+    private final long STOP_WAIT_TIMEOUT_MS = 60000;
+
+    private final String svcName;
+    private final String queueName;
+    private final NotificationQueueHandler handler;
+    private final NotificationConfig config;
+    private final NotificationSqlDao dao;
+    private final Executor executor;
+    private final Clock clock;
+    private final String hostname;
+
+    private static final AtomicInteger sequenceId = new AtomicInteger();
+
+    protected AtomicLong nbProcessedEvents;
+
+    // Use this object's monitor for synchronization (no need for volatile)
+    protected boolean isProcessingEvents;
+
+    // Package visibility on purpose
+    NotificationQueue(final DBI dbi, final Clock clock,  final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+        this.clock = clock;
+        this.svcName = svcName;
+        this.queueName = queueName;
+        this.handler = handler;
+        this.config = config;
+        this.hostname = Hostname.get();
+
+        this.dao = dbi.onDemand(NotificationSqlDao.class);
+
+        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread th = new Thread(r);
+                th.setName(NOTIFICATION_THREAD_PREFIX + svcName + "-" + queueName);
+                th.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+                    @Override
+                    public void uncaughtException(Thread t, Throwable e) {
+                        log.error("Uncaught exception for thread " + t.getName(), e);
+                    }
+                });
+                return th;
+            }
+        });
+    }
+
+    /**
+     *
+     *  Record from within a transaction the need to be called back when the notification is ready
+     *
+     * @param transactionalDao the transactionalDao
+     * @param futureNotificationTime the time at which the notificatoin is ready
+     * @param notificationKey the key for that notification
+     */
+    public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
+            final DateTime futureNotificationTime, final NotificationKey notificationKey) {
+        NotificationSqlDao transactionalNotificationDao =  transactionalDao.become(NotificationSqlDao.class);
+        Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+        transactionalNotificationDao.insertNotification(notification);
+    }
+
+    /**
+     * Stops the queue.
+     *
+     * @see NotificationQueueHandler.completedQueueStop to be notified when the notification thread exited
+     */
+    public void stopQueue() {
+        if (config.isNotificationProcessingOff()) {
+            handler.completedQueueStop();
+            return;
+        }
+
+        synchronized(this) {
+            isProcessingEvents = false;
+            try {
+                log.info("NotificationQueue requested to stop");
+                wait(STOP_WAIT_TIMEOUT_MS);
+                log.info("NotificationQueue requested should have exited");
+            } catch (InterruptedException e) {
+                log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
+            }
+        }
+
+    }
+
+    /**
+     * Starts the queue.
+     *
+     * @see NotificationQueueHandler.completedQueueStart to be notified when the notification thread started
+     */
+    public void startQueue() {
+
+        this.isProcessingEvents = true;
+        this.nbProcessedEvents = new AtomicLong();
+
+
+        if (config.isNotificationProcessingOff()) {
+            log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
+            handler.completedQueueStart();
+            return;
+        }
+        final NotificationQueue notificationQueue = this;
+
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+
+                log.info(String.format("NotificationQueue thread %s [%d] started",
+                        Thread.currentThread().getName(),
+                        Thread.currentThread().getId()));
+
+                // Thread is now started, notify the listener
+                handler.completedQueueStart();
+
+                try {
+                    while (true) {
+
+                        synchronized (notificationQueue) {
+                            if (!isProcessingEvents) {
+                                log.info(String.format("NotificationQueue has been requested to stop, thread  %s  [%d] stopping...",
+                                        Thread.currentThread().getName(),
+                                        Thread.currentThread().getId()));
+                                notificationQueue.notify();
+                                break;
+                            }
+                        }
+
+                        // Callback may trigger exceptions in user code so catch anything here and live with it.
+                        try {
+                            doProcessEvents(sequenceId.getAndIncrement());
+                        } catch (Exception e) {
+                            log.error(String.format("NotificationQueue thread  %s  [%d] got an exception..",
+                                    Thread.currentThread().getName(),
+                                    Thread.currentThread().getId()), e);
+                        }
+                        sleepALittle();
+                    }
+                } catch (InterruptedException e) {
+                    log.warn(Thread.currentThread().getName() + " got interrupted ", e);
+                } catch (Throwable e) {
+                    log.error(Thread.currentThread().getName() + " got an exception exiting...", e);
+                    // Just to make it really obvious in the log
+                    e.printStackTrace();
+                } finally {
+                    handler.completedQueueStop();
+                    log.info(String.format("NotificationQueue thread  %s  [%d] exited...",
+                            Thread.currentThread().getName(),
+                            Thread.currentThread().getId()));
+                }
+            }
+
+            private void sleepALittle() throws InterruptedException {
+                Thread.sleep(config.getNotificationSleepTimeMs());
+            }
+        });
+    }
+
+    private void doProcessEvents(int sequenceId) {
+        List<Notification> notifications = getReadyNotifications(sequenceId);
+        for (Notification cur : notifications) {
+            nbProcessedEvents.incrementAndGet();
+            handler.handleReadyNotification(cur.getNotificationKey());
+        }
+        // If anything happens before we get to clear those notifications, somebody else will pick them up
+        clearNotifications(notifications);
+    }
+
+    private String getFullQName() {
+        return svcName + ":" +  queueName;
+    }
+
+    private void clearNotifications(final Collection<Notification> cleared) {
+
+        log.debug(String.format("NotificationQueue %s clearEventsReady START cleared size = %d",
+                getFullQName(),
+                cleared.size()));
+
+        dao.inTransaction(new Transaction<Void, NotificationSqlDao>() {
+
+            @Override
+            public Void inTransaction(NotificationSqlDao transactional,
+                    TransactionStatus status) throws Exception {
+                for (Notification cur : cleared) {
+                    transactional.clearNotification(cur.getId().toString(), hostname);
+                    log.debug(String.format("NotificationQueue %s cleared events %s", getFullQName(), cur.getId()));
+                }
+                return null;
+            }
+        });
+    }
+
+    private List<Notification> getReadyNotifications(final int seqId) {
+
+        final Date now = clock.getUTCNow().toDate();
+        final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
+
+        log.debug(String.format("NotificationQueue %s getEventsReady START effectiveNow =  %s",  getFullQName(), now));
+
+        List<Notification> result = dao.inTransaction(new Transaction<List<Notification>, NotificationSqlDao>() {
+
+            @Override
+            public List<Notification> inTransaction(NotificationSqlDao transactionalDao,
+                    TransactionStatus status) throws Exception {
+
+                List<Notification> claimedNotifications = new ArrayList<Notification>();
+                List<Notification> input = transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents());
+                for (Notification cur : input) {
+                    final boolean claimed = (transactionalDao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
+                    if (claimed) {
+                        claimedNotifications.add(cur);
+                        transactionalDao.insertClaimedHistory(seqId, hostname, now, cur.getId().toString());
+                    }
+                }
+                return claimedNotifications;
+            }
+        });
+
+        for (Notification cur : result) {
+            log.debug(String.format("NotificationQueue %sclaimed events %s",
+                    getFullQName(), cur.getId()));
+            if (cur.getOwner() != null && !cur.getOwner().equals(hostname)) {
+                log.warn(String.format("NotificationQueue %s stealing notification %s from %s",
+                        getFullQName(), cur, cur.getOwner()));
+            }
+        }
+        return result;
+    }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
new file mode 100644
index 0000000..d1544c8
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+
+public interface NotificationQueueService {
+
+    public interface NotificationQueueHandler {
+        /**
+         * Called when the Notification thread has been started
+         */
+        public void completedQueueStart();
+
+        /**
+         * Called for each notification ready
+         *
+         * @param key the notification key associated to that notification entry
+         */
+        public void handleReadyNotification(String notificationKey);
+        /**
+         * Called right before the Notification thread is about to exit
+         */
+        public void completedQueueStop();
+    }
+
+    /**
+     * Creates a new NotificationQueue for a given associated with the given service and queueName
+     *
+     * @param svcName the name of the service using that queue
+     * @param queueName a name for that queue (unique per service)
+     * @param handler the handler required for notifying the caller of state change
+     * @param config the notification queue configuration
+     *
+     * @return a new NotificationQueue
+     */
+    NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config);
+}
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index c68b18b..30471c7 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -34,4 +34,31 @@ CREATE TABLE tags (
   PRIMARY KEY(id)
 ) ENGINE = innodb;
 CREATE INDEX tags_by_object ON tags(object_id);
-CREATE UNIQUE INDEX tags_unique ON tags(tag_description_id, object_id);
\ No newline at end of file
+CREATE UNIQUE INDEX tags_unique ON tags(tag_description_id, object_id);
+
+DROP TABLE IF EXISTS notifications;
+CREATE TABLE notifications (
+    id int(11) unsigned NOT NULL AUTO_INCREMENT,
+    notification_id char(36) NOT NULL,
+    created_dt datetime NOT NULL,
+	notification_key varchar(256) NOT NULL,
+    effective_dt datetime NOT NULL,
+    processing_owner char(36) DEFAULT NULL,
+    processing_available_dt datetime DEFAULT NULL,
+    processing_state varchar(14) DEFAULT 'AVAILABLE',
+    PRIMARY KEY(id)
+) ENGINE=innodb;
+CREATE INDEX  `idx_comp_where` ON notifications (`effective_dt`,`processing_state`,`processing_owner`,`processing_available_dt`);
+CREATE INDEX  `idx_update` ON notifications (`notification_id`,`processing_state`,`processing_owner`,`processing_available_dt`);
+CREATE INDEX  `idx_update1` ON notifications (`notification_id`,`processing_owner`);
+CREATE INDEX  `idx_get_ready` ON notifications (`effective_dt`,`created_dt`,`id`);
+
+DROP TABLE IF EXISTS claimed_notifications;
+CREATE TABLE claimed_notifications (
+    id int(11) unsigned NOT NULL AUTO_INCREMENT,    
+    sequence_id int(11) unsigned NOT NULL,    
+    owner_id varchar(64) NOT NULL,
+    claimed_dt datetime NOT NULL,
+    notification_id char(36) NOT NULL,
+    PRIMARY KEY(id)
+) ENGINE=innodb;
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
new file mode 100644
index 0000000..5a44431
--- /dev/null
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -0,0 +1,83 @@
+group NotificationSqlDao;
+
+getReadyNotifications(now, max) ::= <<
+    select
+      notification_id
+    , notification_key
+      , created_dt
+      , effective_dt
+      , processing_owner
+      , processing_available_dt
+      , processing_state
+    from notifications
+    where
+      effective_dt \<= :now
+      and processing_state != 'PROCESSED'
+      and (processing_owner IS NULL OR processing_available_dt \<= :now)
+    order by
+      effective_dt asc
+      , created_dt asc
+      , id asc
+    limit :max
+    ;
+>>
+
+
+claimNotification(owner, next_available, notification_id, now) ::= <<
+    update notifications
+    set
+      processing_owner = :owner
+      , processing_available_dt = :next_available
+      , processing_state = 'IN_PROCESSING'
+    where
+      notification_id = :notification_id
+      and processing_state != 'PROCESSED'
+      and (processing_owner IS NULL OR processing_available_dt \<= :now)
+    ;
+>>
+
+clearNotification(notification_id, owner) ::= <<
+    update notifications
+    set
+      processing_owner = NULL
+      , processing_state = 'PROCESSED'
+    where
+      notification_id = :notification_id
+      and processing_owner = :owner
+    ;
+>>
+
+insertNotification() ::= <<
+    insert into notifications (
+      notification_id
+    , notification_key
+      , created_dt
+      , effective_dt
+      , processing_owner
+      , processing_available_dt
+      , processing_state
+    ) values (
+      :notification_id
+      , :notification_key
+      , :created_dt
+      , :effective_dt
+      , :processing_owner
+      , :processing_available_dt
+      , :processing_state
+    );   
+>>
+
+
+insertClaimedHistory(sequence_id, owner, hostname, claimed_dt, notification_id) ::= <<
+    insert into claimed_notifications (
+        sequence_id
+        , owner_id
+        , claimed_dt
+        , notification_id
+      ) values (
+        :sequence_id
+        , :owner
+        , :claimed_dt
+        , :notification_id
+      );
+>>
diff --git a/util/src/test/java/com/ning/billing/util/clock/ClockMock.java b/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
index 0fb473d..7698697 100644
--- a/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
+++ b/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
@@ -76,6 +76,15 @@ public class ClockMock extends DefaultClock {
         deltaFromRealityMs = delta;
     }
 
+    public synchronized void addDeltaFromReality(long delta) {
+        if (deltaType != DeltaType.DELTA_ABS) {
+            throw new RuntimeException("ClockMock should be set with type DELTA_ABS");
+        }
+        deltaFromRealityDuration = null;
+        deltaFromRealitDurationEpsilon = 0;
+        deltaFromRealityMs += delta;
+    }
+
     public synchronized void resetDeltaFromReality() {
         deltaType = DeltaType.DELTA_NONE;
         deltaFromRealityDuration = null;
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
new file mode 100644
index 0000000..bfd29ef
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq.dao;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.ning.billing.dbi.DBIProvider;
+import com.ning.billing.dbi.DbiConfig;
+import com.ning.billing.util.notificationq.DefaultNotification;
+import com.ning.billing.util.notificationq.Notification;
+import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao.NotificationSqlMapper;
+
+@Guice(modules = TestNotificationSqlDao.TestNotificationSqlDaoModule.class)
+public class TestNotificationSqlDao {
+
+    private static AtomicInteger sequenceId = new AtomicInteger();
+
+    @Inject
+    private DBI dbi;
+
+    private NotificationSqlDao dao;
+
+    @BeforeClass(alwaysRun = true)
+    public void setup() {
+       dao = dbi.onDemand(NotificationSqlDao.class);
+    }
+
+
+    @BeforeTest
+    public void cleanupDb() {
+        dbi.withHandle(new HandleCallback<Void>() {
+
+            @Override
+            public Void withHandle(Handle handle) throws Exception {
+                handle.execute("delete from notifications");
+                handle.execute("delete from claimed_notifications");
+                return null;
+            }
+        });
+    }
+
+    @Test
+    public void testBasic() throws InterruptedException {
+
+        final String ownerId = UUID.randomUUID().toString();
+
+        String notificationKey = UUID.randomUUID().toString();
+        DateTime effDt = new DateTime();
+        Notification notif = new DefaultNotification(notificationKey, effDt);
+        dao.insertNotification(notif);
+
+        Thread.sleep(1000);
+        DateTime now = new DateTime();
+        List<Notification> notifications = dao.getReadyNotifications(now.toDate(), 3);
+        assertNotNull(notifications);
+        assertEquals(notifications.size(), 1);
+
+        Notification notification = notifications.get(0);
+        assertEquals(notification.getNotificationKey(), notificationKey);
+        validateDate(notification.getEffectiveDate(), effDt);
+        assertEquals(notification.getOwner(), null);
+        assertEquals(notification.getProcessingState(), NotificationLifecycleState.AVAILABLE);
+        assertEquals(notification.getNextAvailableDate(), null);
+
+        DateTime nextAvailable = now.plusMinutes(5);
+        int res = dao.claimNotification(ownerId, nextAvailable.toDate(), notification.getId().toString(), now.toDate());
+        assertEquals(res, 1);
+        dao.insertClaimedHistory(sequenceId.incrementAndGet(), ownerId, now.toDate(), notification.getId().toString());
+
+        notification = fetchNotification(notification.getId().toString());
+        assertEquals(notification.getNotificationKey(), notificationKey);
+        validateDate(notification.getEffectiveDate(), effDt);
+        assertEquals(notification.getOwner().toString(), ownerId);
+        assertEquals(notification.getProcessingState(), NotificationLifecycleState.IN_PROCESSING);
+        validateDate(notification.getNextAvailableDate(), nextAvailable);
+
+        dao.clearNotification(notification.getId().toString(), ownerId);
+
+        notification = fetchNotification(notification.getId().toString());
+        assertEquals(notification.getNotificationKey(), notificationKey);
+        validateDate(notification.getEffectiveDate(), effDt);
+        assertEquals(notification.getOwner(), null);
+        assertEquals(notification.getProcessingState(), NotificationLifecycleState.PROCESSED);
+        validateDate(notification.getNextAvailableDate(), nextAvailable);
+
+    }
+
+    private Notification fetchNotification(final String notificationId) {
+        Notification res =  dbi.withHandle(new HandleCallback<Notification>() {
+
+            @Override
+            public Notification withHandle(Handle handle) throws Exception {
+                Notification res = handle.createQuery("   select" +
+                		" notification_id" +
+                		", notification_key" +
+                		", created_dt" +
+                		", effective_dt" +
+                		", processing_owner" +
+                		", processing_available_dt" +
+                		", processing_state" +
+                		"    from notifications " +
+                		" where " +
+                		" notification_id = '" + notificationId + "';")
+                		.map(new NotificationSqlMapper())
+                		.first();
+                return res;
+            }
+        });
+        return res;
+    }
+
+    private void validateDate(DateTime input, DateTime expected) {
+        if (input == null && expected != null) {
+            Assert.fail("Got input date null");
+        }
+        if (input != null && expected == null) {
+            Assert.fail("Was expecting null date");
+        }
+        expected = truncateAndUTC(expected);
+        input = truncateAndUTC(input);
+        Assert.assertEquals(input, expected);
+    }
+
+    private DateTime truncateAndUTC(DateTime input) {
+        if (input == null) {
+            return null;
+        }
+        DateTime result = input.minus(input.getMillisOfSecond());
+        return result.toDateTime(DateTimeZone.UTC);
+    }
+
+    public static class TestNotificationSqlDaoModule extends AbstractModule {
+        @Override
+        protected void configure() {
+            bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
+            final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
+            bind(DbiConfig.class).toInstance(config);
+        }
+    }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java b/util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java
new file mode 100644
index 0000000..9495001
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.UUID;
+
+public class DummyObject {
+    private final String value;
+    private final UUID key;
+
+    public DummyObject(String value, UUID key) {
+        super();
+        this.value = value;
+        this.key = key;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    public UUID getKey() {
+        return key;
+    }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java b/util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java
new file mode 100644
index 0000000..83b1b20
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.Binder;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
+import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+
+@ExternalizedSqlViaStringTemplate3()
+public interface DummySqlTest extends Transactional<DummySqlTest>, Transmogrifier, CloseMe {
+
+    @SqlUpdate
+    public void insertDummy(@Bind(binder = DummySqlTestBinder.class) DummyObject dummy);
+
+    @SqlQuery
+    @Mapper(DummySqlTestMapper.class)
+    public DummyObject getDummyFromId(@Bind("dummy_id") String dummyId);
+
+    public static class DummySqlTestBinder implements Binder<Bind, DummyObject> {
+        @Override
+        public void bind(@SuppressWarnings("rawtypes") SQLStatement stmt, Bind bind, DummyObject dummy) {
+            stmt.bind("dummy_id", dummy.getKey().toString());
+            stmt.bind("value", dummy.getValue());
+        }
+    }
+
+    public static class DummySqlTestMapper  implements ResultSetMapper<DummyObject> {
+        @Override
+        public DummyObject map(int index, ResultSet r, StatementContext ctx)
+                throws SQLException {
+            final UUID key = UUID.fromString(r.getString("dummy_id"));
+            final String value = r.getString("value");
+            return new DummyObject(value, key);
+        }
+    }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
new file mode 100644
index 0000000..5703da1
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -0,0 +1,405 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.ning.billing.dbi.DBIProvider;
+import com.ning.billing.dbi.DbiConfig;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+@Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
+public class TestNotificationQueue {
+
+    private final static Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
+
+    @Inject
+    private DBI dbi;
+
+    @Inject
+    private Clock clock;
+
+    private DummySqlTest dao;
+
+   // private NotificationQueue queue;
+
+
+    @BeforeClass(alwaysRun = true)
+    public void setup() {
+        dao = dbi.onDemand(DummySqlTest.class);
+    }
+
+    @BeforeTest
+    public void beforeTest() {
+        dbi.withHandle(new HandleCallback<Void>() {
+
+            @Override
+            public Void withHandle(Handle handle) throws Exception {
+                handle.execute("delete from notifications");
+                handle.execute("delete from claimed_notifications");
+                handle.execute("delete from dummy");
+                return null;
+            }
+        });
+        // Reset time to real value
+        ((ClockMock) clock).resetDeltaFromReality();
+    }
+
+    @AfterTest
+    public void afterTest() {
+
+    }
+
+    /**
+     * Verify that we can call start/stop on a disabled queue and that both start/stop callbacks are called
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testSimpleQueueDisabled() throws InterruptedException {
+
+        final TestStartStop testStartStop = new TestStartStop(false, false);
+        NotificationQueue queue = new NotificationQueue(dbi, clock, "test-svc", "dead",
+                new NotificationQueueHandler() {
+                    @Override
+                    public void handleReadyNotification(String notificationKey) {
+                    }
+                    @Override
+                    public void completedQueueStop() {
+                        testStartStop.stopped();
+                    }
+                    @Override
+                    public void completedQueueStart() {
+                        testStartStop.started();
+                    }
+                },
+                getNotificationConfig(true, 100, 1, 10000));
+
+        executeTest(testStartStop, queue, new WithTest() {
+            @Override
+            public void test(final NotificationQueue readyQueue) throws InterruptedException {
+                // Do nothing
+            }
+        });
+        assertTrue(true);
+    }
+
+    /**
+     * Test that we can post a notification in the future from a transaction and get the notification
+     * callback with the correct key when the time is ready
+     *
+     * @throws InterruptedException
+     */
+    @Test
+    public void testSimpleNotification() throws InterruptedException {
+
+        final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+        final TestStartStop testStartStop = new TestStartStop(false, false);
+        NotificationQueue queue = new NotificationQueue(dbi, clock, "test-svc", "foo",
+                new NotificationQueueHandler() {
+                    @Override
+                    public void handleReadyNotification(String notificationKey) {
+                        synchronized (expectedNotifications) {
+                            expectedNotifications.put(notificationKey, Boolean.TRUE);
+                            expectedNotifications.notify();
+                        }
+                    }
+                    @Override
+                    public void completedQueueStop() {
+                        testStartStop.stopped();
+                    }
+                    @Override
+                    public void completedQueueStart() {
+                        testStartStop.started();
+                    }
+                },
+                getNotificationConfig(false, 100, 1, 10000));
+
+
+        executeTest(testStartStop, queue, new WithTest() {
+            @Override
+            public void test(final NotificationQueue readyQueue) throws InterruptedException {
+
+                final UUID key = UUID.randomUUID();
+                final DummyObject obj = new DummyObject("foo", key);
+                final DateTime now = new DateTime();
+                final DateTime readyTime = now.plusMillis(2000);
+                final NotificationKey notificationKey = new NotificationKey() {
+                    @Override
+                    public String toString() {
+                        return key.toString();
+                    }
+                };
+                expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+
+                // Insert dummy to be processed in 2 sec'
+                dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+                    @Override
+                    public Void inTransaction(DummySqlTest transactional,
+                            TransactionStatus status) throws Exception {
+
+                        transactional.insertDummy(obj);
+                        readyQueue.recordFutureNotificationFromTransaction(transactional,
+                                readyTime, notificationKey);
+                        return null;
+                    }
+                });
+
+                // Move time in the future after the notification effectiveDate
+                ((ClockMock) clock).setDeltaFromReality(3000);
+
+                // Notification should have kicked but give it at least a sec' for thread scheduling
+                int nbTry = 1;
+                boolean success = false;
+                do {
+                    synchronized(expectedNotifications) {
+                        if (expectedNotifications.get(notificationKey.toString())) {
+                            success = true;
+                            break;
+                        }
+                        expectedNotifications.wait(1000);
+                    }
+                } while (nbTry-- > 0);
+                assertEquals(success, true);
+            }
+        });
+    }
+
+    @Test
+    public void testManyNotifications() throws InterruptedException {
+        final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+        final TestStartStop testStartStop = new TestStartStop(false, false);
+        NotificationQueue queue = new NotificationQueue(dbi, clock, "test-svc", "many",
+                new NotificationQueueHandler() {
+                    @Override
+                    public void handleReadyNotification(String notificationKey) {
+                        synchronized (expectedNotifications) {
+                            expectedNotifications.put(notificationKey, Boolean.TRUE);
+                            expectedNotifications.notify();
+                        }
+                    }
+                    @Override
+                    public void completedQueueStop() {
+                        testStartStop.stopped();
+                    }
+                    @Override
+                    public void completedQueueStart() {
+                        testStartStop.started();
+                    }
+                },
+                getNotificationConfig(false, 100, 10, 10000));
+
+
+        executeTest(testStartStop, queue, new WithTest() {
+            @Override
+            public void test(final NotificationQueue readyQueue) throws InterruptedException {
+
+                final DateTime now = clock.getUTCNow();
+                final int MAX_NOTIFICATIONS = 100;
+                for (int i = 0; i < MAX_NOTIFICATIONS; i++) {
+
+                    final int nextReadyTimeIncrementMs = 1000;
+
+                    final UUID key = UUID.randomUUID();
+                    final DummyObject obj = new DummyObject("foo", key);
+                    final int currentIteration = i;
+
+                    final NotificationKey notificationKey = new NotificationKey() {
+                        @Override
+                        public String toString() {
+                            return key.toString();
+                        }
+                    };
+                    expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+                    dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+                        @Override
+                        public Void inTransaction(DummySqlTest transactional,
+                                TransactionStatus status) throws Exception {
+
+                            transactional.insertDummy(obj);
+                            readyQueue.recordFutureNotificationFromTransaction(transactional,
+                                    now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
+                            return null;
+                        }
+                    });
+
+                    // Move time in the future after the notification effectiveDate
+                    if (i == 0) {
+                        ((ClockMock) clock).setDeltaFromReality(nextReadyTimeIncrementMs);
+                    } else {
+                        ((ClockMock) clock).addDeltaFromReality(nextReadyTimeIncrementMs);
+                    }
+                }
+
+                // Wait a little longer since there are a lot of callback that need to happen
+                int nbTry = MAX_NOTIFICATIONS + 1;
+                boolean success = false;
+                do {
+                    synchronized(expectedNotifications) {
+
+                        Collection<Boolean> completed =  Collections2.filter(expectedNotifications.values(), new Predicate<Boolean>() {
+                            @Override
+                            public boolean apply(Boolean input) {
+                                return input;
+                            }
+                        });
+
+                        if (completed.size() == MAX_NOTIFICATIONS) {
+                            success = true;
+                            break;
+                        }
+                        //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+                        expectedNotifications.wait(1000);
+                    }
+                } while (nbTry-- > 0);
+                assertEquals(success, true);
+            }
+        });
+    }
+
+
+    NotificationConfig getNotificationConfig(final boolean off,
+            final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
+        return new NotificationConfig() {
+            @Override
+            public boolean isNotificationProcessingOff() {
+                return off;
+            }
+            @Override
+            public long getNotificationSleepTimeMs() {
+                return sleepTime;
+            }
+            @Override
+            public int getDaoMaxReadyEvents() {
+                return maxReadyEvents;
+            }
+            @Override
+            public long getDaoClaimTimeMs() {
+                return claimTimeMs;
+            }
+        };
+    }
+
+    private static class TestStartStop {
+        private boolean started;
+        private boolean stopped;
+
+        public TestStartStop(boolean started, boolean stopped) {
+            super();
+            this.started = started;
+            this.stopped = stopped;
+        }
+
+        public void started() {
+            synchronized(this) {
+                started = true;
+                notify();
+            }
+        }
+
+        public void stopped() {
+            synchronized(this) {
+                stopped = true;
+                notify();
+            }
+        }
+
+        public boolean waitForStartComplete(int timeoutMs) throws InterruptedException {
+            return waitForEventCompletion(timeoutMs, true);
+        }
+
+        public boolean waitForStopComplete(int timeoutMs) throws InterruptedException {
+            return waitForEventCompletion(timeoutMs, false);
+        }
+
+        private boolean waitForEventCompletion(int timeoutMs, boolean start) throws InterruptedException {
+            DateTime init = new DateTime();
+            synchronized(this) {
+                while (! ((start ? started : stopped))) {
+                    wait(timeoutMs);
+                    if (init.plusMillis(timeoutMs).isAfterNow()) {
+                        break;
+                    }
+                }
+            }
+            return (start ? started : stopped);
+        }
+    }
+
+    private interface WithTest {
+        public void test(NotificationQueue readyQueue) throws InterruptedException;
+    }
+
+    private void executeTest(final TestStartStop testStartStop,
+            NotificationQueue queue, WithTest test) throws InterruptedException{
+
+        queue.startQueue();
+        boolean started = testStartStop.waitForStartComplete(3000);
+        assertEquals(started, true);
+
+        test.test(queue);
+
+        queue.stopQueue();
+        boolean stopped = testStartStop.waitForStopComplete(3000);
+        assertEquals(stopped, true);
+    }
+
+
+    public static class TestNotificationQueueModule extends AbstractModule {
+        @Override
+        protected void configure() {
+            bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
+            final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
+            bind(DbiConfig.class).toInstance(config);
+            bind(Clock.class).to(ClockMock.class);
+        }
+    }
+
+
+}
diff --git a/util/src/test/resources/com/ning/billing/util/ddl.sql b/util/src/test/resources/com/ning/billing/util/ddl.sql
new file mode 100644
index 0000000..50de498
--- /dev/null
+++ b/util/src/test/resources/com/ning/billing/util/ddl.sql
@@ -0,0 +1,6 @@
+DROP TABLE IF EXISTS dummy;
+CREATE TABLE dummy (
+    dummy_id char(36) NOT NULL,
+    value varchar(256) NOT NULL,
+    PRIMARY KEY(dummy_id)
+) ENGINE = innodb;
diff --git a/util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg b/util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg
new file mode 100644
index 0000000..9a2e6e2
--- /dev/null
+++ b/util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg
@@ -0,0 +1,21 @@
+group DummySqlTest;
+
+insertDummy() ::= <<
+  insert into dummy (
+    dummy_id
+    , value
+  ) values (
+    :dummy_id
+    , :value
+  );
+>>
+
+getDummyFromId(dummy_id)  ::= <<
+  select 
+    dummy_id
+    , value
+  from dummy
+  where
+    dummy_id = :dummy_id
+  ;
+>> 
\ No newline at end of file