killbill-aplcache
Changes
entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java 2(+0 -2)
util/pom.xml 10(+9 -1)
util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java 64(+64 -0)
Details
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
index cfeaca7..e828e27 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java
@@ -67,8 +67,6 @@ public abstract class ApiEventProcessorBase implements EventNotifier {
this.nbProcessedEvents = 0;
}
-
-
@Override
public void startNotifications(final EventListener listener) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
index 016f005..fc4db0f 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
@@ -16,6 +16,13 @@
package com.ning.billing.entitlement.engine.dao;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
import com.google.inject.Inject;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.config.EntitlementConfig;
@@ -41,7 +48,6 @@ import org.skife.jdbi.v2.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
public class EntitlementSqlDao implements EntitlementDao {
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index ff3a443..fe5d6a7 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -93,7 +93,7 @@ public abstract class TestApiBase {
protected ApiTestListener testListener;
protected SubscriptionBundle bundle;
- public static void loadSystemPropertiesFromClasspath( final String resource )
+ public static void loadSystemPropertiesFromClasspath(final String resource)
{
final URL url = TestApiBase.class.getResource(resource);
assertNotNull(url);
util/pom.xml 10(+9 -1)
diff --git a/util/pom.xml b/util/pom.xml
index 7884db7..da7e452 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -33,6 +33,10 @@
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>3.0</version>
@@ -69,7 +73,11 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>stringtemplate</artifactId>
+ <scope>runtime</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
new file mode 100644
index 0000000..f0babf9
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.glue;
+
+import com.google.inject.AbstractModule;
+import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
+public class NotificationQueueModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
new file mode 100644
index 0000000..802d16b
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq.dao;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.Binder;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
+import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+import com.ning.billing.util.notificationq.DefaultNotification;
+import com.ning.billing.util.notificationq.Notification;
+import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+
+@ExternalizedSqlViaStringTemplate3()
+public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, CloseMe {
+
+ //
+ // APIs for event notifications
+ //
+ @SqlQuery
+ @Mapper(NotificationSqlMapper.class)
+ public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("max") int max);
+
+ @SqlUpdate
+ public int claimNotification(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("notification_id") String eventId, @Bind("now") Date now);
+
+ @SqlUpdate
+ public void clearNotification(@Bind("notification_id") String eventId, @Bind("owner") String owner);
+
+ @SqlUpdate
+ public void insertNotification(@Bind(binder = NotificationSqlDaoBinder.class) Notification evt);
+
+ @SqlUpdate
+ public void insertClaimedHistory(@Bind("sequence_id") int sequenceId, @Bind("owner") String owner, @Bind("claimed_dt") Date clainedDate, @Bind("notification_id") String notificationId);
+
+ public static class NotificationSqlDaoBinder implements Binder<Bind, Notification> {
+
+ private Date getDate(DateTime dateTime) {
+ return dateTime == null ? null : dateTime.toDate();
+ }
+
+ @Override
+ public void bind(@SuppressWarnings("rawtypes") SQLStatement stmt, Bind bind, Notification evt) {
+ stmt.bind("notification_id", evt.getId().toString());
+ stmt.bind("created_dt", getDate(new DateTime()));
+ stmt.bind("notification_key", evt.getNotificationKey());
+ stmt.bind("effective_dt", getDate(evt.getEffectiveDate()));
+ stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
+ stmt.bind("processing_owner", (String) null);
+ stmt.bind("processing_state", NotificationLifecycleState.AVAILABLE.toString());
+ }
+ }
+
+
+ public static class NotificationSqlMapper implements ResultSetMapper<Notification> {
+
+ private DateTime getDate(ResultSet r, String fieldName) throws SQLException {
+ final Timestamp resultStamp = r.getTimestamp(fieldName);
+ return r.wasNull() ? null : new DateTime(resultStamp).toDateTime(DateTimeZone.UTC);
+ }
+
+ @Override
+ public Notification map(int index, ResultSet r, StatementContext ctx)
+ throws SQLException {
+
+ final UUID id = UUID.fromString(r.getString("notification_id"));
+ final String notificationKey = r.getString("notification_key");
+ final DateTime effectiveDate = getDate(r, "effective_dt");
+ final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
+ final UUID processingOwner = (r.getString("processing_owner") != null) ? UUID.fromString(r.getString("processing_owner")) : null;
+ final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
+
+ return new DefaultNotification(id, processingOwner, nextAvailableDate,
+ processingState, notificationKey, effectiveDate);
+
+ }
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
new file mode 100644
index 0000000..56a8547
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+public class DefaultNotification implements Notification {
+
+ private final UUID id;
+ private final UUID owner;
+ private final DateTime nextAvailableDate;
+ private final NotificationLifecycleState lifecycleState;
+ private final String notificationKey;
+ private final DateTime effectiveDate;
+
+
+ public DefaultNotification(UUID id, UUID owner, DateTime nextAvailableDate,
+ NotificationLifecycleState lifecycleState,
+ String notificationKey, DateTime effectiveDate) {
+ super();
+ this.id = id;
+ this.owner = owner;
+ this.nextAvailableDate = nextAvailableDate;
+ this.lifecycleState = lifecycleState;
+ this.notificationKey = notificationKey;
+ this.effectiveDate = effectiveDate;
+ }
+
+ public DefaultNotification(String notificationKey, DateTime effectiveDate) {
+ this(UUID.randomUUID(), null, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+ }
+
+ @Override
+ public UUID getId() {
+ return id;
+ }
+
+ @Override
+ public UUID getOwner() {
+ return owner;
+ }
+
+ @Override
+ public DateTime getNextAvailableDate() {
+ return nextAvailableDate;
+ }
+
+ @Override
+ public NotificationLifecycleState getProcessingState() {
+ return lifecycleState;
+ }
+
+ @Override
+ public boolean isAvailableForProcessing(DateTime now) {
+ switch(lifecycleState) {
+ case AVAILABLE:
+ break;
+ case IN_PROCESSING:
+ // Somebody already got the event, not available yet
+ if (nextAvailableDate.isAfter(now)) {
+ return false;
+ }
+ break;
+ case PROCESSED:
+ return false;
+ default:
+ throw new RuntimeException(String.format("Unkwnon IEvent processing state %s", lifecycleState));
+ }
+ return effectiveDate.isBefore(now);
+ }
+
+ @Override
+ public String getNotificationKey() {
+ return notificationKey;
+ }
+
+ @Override
+ public DateTime getEffectiveDate() {
+ return effectiveDate;
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
new file mode 100644
index 0000000..6685e05
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.skife.jdbi.v2.DBI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.util.clock.Clock;
+
+public class DefaultNotificationQueueService implements NotificationQueueService {
+
+ private final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+
+ private final DBI dbi;
+ private final Clock clock;
+
+ private final Map<String, NotificationQueue> queues;
+
+ public DefaultNotificationQueueService(final DBI dbi, final Clock clock) {
+ this.dbi = dbi;
+ this.clock = clock;
+ this.queues = new TreeMap<String, NotificationQueue>();
+ }
+
+ @Override
+ public NotificationQueue createNotificationQueue(String svcName,
+ String queueName, NotificationQueueHandler handler,
+ NotificationConfig config) {
+ if (svcName == null || queueName == null || handler == null || config == null) {
+ throw new RuntimeException("Need to specify all parameters");
+ }
+
+ String compositeName = svcName + ":" + queueName;
+ NotificationQueue result = null;
+ synchronized(queues) {
+ result = queues.get(compositeName);
+ if (result == null) {
+ result = new NotificationQueue(dbi, clock, svcName, queueName, handler, config);
+ queues.put(compositeName, result);
+ } else {
+ log.warn("Queue for svc {} and name {} already exist", svcName, queueName);
+ }
+ }
+ return result;
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java
new file mode 100644
index 0000000..bfe4aa9
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+/**
+ *
+ * The notification key associated with a given notification
+ */
+public interface NotificationKey {
+
+ @Override
+ public String toString();
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
new file mode 100644
index 0000000..7c81108
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+
+
+public interface NotificationLifecycle {
+
+ public enum NotificationLifecycleState {
+ AVAILABLE,
+ IN_PROCESSING,
+ PROCESSED
+ }
+
+ public UUID getOwner();
+
+ //public void setOwner(UUID owner);
+
+ public DateTime getNextAvailableDate();
+
+ //public void setNextAvailableDate(DateTime dateTime);
+
+ public NotificationLifecycleState getProcessingState();
+
+ //public void setProcessingState(NotificationLifecycleState procesingState);
+
+ public boolean isAvailableForProcessing(DateTime now);
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
new file mode 100644
index 0000000..5677335
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.util.Hostname;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+
+
+public class NotificationQueue {
+
+ protected final static Logger log = LoggerFactory.getLogger(NotificationQueue.class);
+
+ private static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
+ private final long STOP_WAIT_TIMEOUT_MS = 60000;
+
+ private final String svcName;
+ private final String queueName;
+ private final NotificationQueueHandler handler;
+ private final NotificationConfig config;
+ private final NotificationSqlDao dao;
+ private final Executor executor;
+ private final Clock clock;
+ private final String hostname;
+
+ private static final AtomicInteger sequenceId = new AtomicInteger();
+
+ protected AtomicLong nbProcessedEvents;
+
+ // Use this object's monitor for synchronization (no need for volatile)
+ protected boolean isProcessingEvents;
+
+ // Package visibility on purpose
+ NotificationQueue(final DBI dbi, final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+ this.clock = clock;
+ this.svcName = svcName;
+ this.queueName = queueName;
+ this.handler = handler;
+ this.config = config;
+ this.hostname = Hostname.get();
+
+ this.dao = dbi.onDemand(NotificationSqlDao.class);
+
+ this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread th = new Thread(r);
+ th.setName(NOTIFICATION_THREAD_PREFIX + svcName + "-" + queueName);
+ th.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ log.error("Uncaught exception for thread " + t.getName(), e);
+ }
+ });
+ return th;
+ }
+ });
+ }
+
+ /**
+ *
+ * Record from within a transaction the need to be called back when the notification is ready
+ *
+ * @param transactionalDao the transactionalDao
+ * @param futureNotificationTime the time at which the notificatoin is ready
+ * @param notificationKey the key for that notification
+ */
+ public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
+ final DateTime futureNotificationTime, final NotificationKey notificationKey) {
+ NotificationSqlDao transactionalNotificationDao = transactionalDao.become(NotificationSqlDao.class);
+ Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+ transactionalNotificationDao.insertNotification(notification);
+ }
+
+ /**
+ * Stops the queue.
+ *
+ * @see NotificationQueueHandler.completedQueueStop to be notified when the notification thread exited
+ */
+ public void stopQueue() {
+ if (config.isNotificationProcessingOff()) {
+ handler.completedQueueStop();
+ return;
+ }
+
+ synchronized(this) {
+ isProcessingEvents = false;
+ try {
+ log.info("NotificationQueue requested to stop");
+ wait(STOP_WAIT_TIMEOUT_MS);
+ log.info("NotificationQueue requested should have exited");
+ } catch (InterruptedException e) {
+ log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
+ }
+ }
+
+ }
+
+ /**
+ * Starts the queue.
+ *
+ * @see NotificationQueueHandler.completedQueueStart to be notified when the notification thread started
+ */
+ public void startQueue() {
+
+ this.isProcessingEvents = true;
+ this.nbProcessedEvents = new AtomicLong();
+
+
+ if (config.isNotificationProcessingOff()) {
+ log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
+ handler.completedQueueStart();
+ return;
+ }
+ final NotificationQueue notificationQueue = this;
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+
+ log.info(String.format("NotificationQueue thread %s [%d] started",
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()));
+
+ // Thread is now started, notify the listener
+ handler.completedQueueStart();
+
+ try {
+ while (true) {
+
+ synchronized (notificationQueue) {
+ if (!isProcessingEvents) {
+ log.info(String.format("NotificationQueue has been requested to stop, thread %s [%d] stopping...",
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()));
+ notificationQueue.notify();
+ break;
+ }
+ }
+
+ // Callback may trigger exceptions in user code so catch anything here and live with it.
+ try {
+ doProcessEvents(sequenceId.getAndIncrement());
+ } catch (Exception e) {
+ log.error(String.format("NotificationQueue thread %s [%d] got an exception..",
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()), e);
+ }
+ sleepALittle();
+ }
+ } catch (InterruptedException e) {
+ log.warn(Thread.currentThread().getName() + " got interrupted ", e);
+ } catch (Throwable e) {
+ log.error(Thread.currentThread().getName() + " got an exception exiting...", e);
+ // Just to make it really obvious in the log
+ e.printStackTrace();
+ } finally {
+ handler.completedQueueStop();
+ log.info(String.format("NotificationQueue thread %s [%d] exited...",
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()));
+ }
+ }
+
+ private void sleepALittle() throws InterruptedException {
+ Thread.sleep(config.getNotificationSleepTimeMs());
+ }
+ });
+ }
+
+ private void doProcessEvents(int sequenceId) {
+ List<Notification> notifications = getReadyNotifications(sequenceId);
+ for (Notification cur : notifications) {
+ nbProcessedEvents.incrementAndGet();
+ handler.handleReadyNotification(cur.getNotificationKey());
+ }
+ // If anything happens before we get to clear those notifications, somebody else will pick them up
+ clearNotifications(notifications);
+ }
+
+ private String getFullQName() {
+ return svcName + ":" + queueName;
+ }
+
+ private void clearNotifications(final Collection<Notification> cleared) {
+
+ log.debug(String.format("NotificationQueue %s clearEventsReady START cleared size = %d",
+ getFullQName(),
+ cleared.size()));
+
+ dao.inTransaction(new Transaction<Void, NotificationSqlDao>() {
+
+ @Override
+ public Void inTransaction(NotificationSqlDao transactional,
+ TransactionStatus status) throws Exception {
+ for (Notification cur : cleared) {
+ transactional.clearNotification(cur.getId().toString(), hostname);
+ log.debug(String.format("NotificationQueue %s cleared events %s", getFullQName(), cur.getId()));
+ }
+ return null;
+ }
+ });
+ }
+
+ private List<Notification> getReadyNotifications(final int seqId) {
+
+ final Date now = clock.getUTCNow().toDate();
+ final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
+
+ log.debug(String.format("NotificationQueue %s getEventsReady START effectiveNow = %s", getFullQName(), now));
+
+ List<Notification> result = dao.inTransaction(new Transaction<List<Notification>, NotificationSqlDao>() {
+
+ @Override
+ public List<Notification> inTransaction(NotificationSqlDao transactionalDao,
+ TransactionStatus status) throws Exception {
+
+ List<Notification> claimedNotifications = new ArrayList<Notification>();
+ List<Notification> input = transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents());
+ for (Notification cur : input) {
+ final boolean claimed = (transactionalDao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
+ if (claimed) {
+ claimedNotifications.add(cur);
+ transactionalDao.insertClaimedHistory(seqId, hostname, now, cur.getId().toString());
+ }
+ }
+ return claimedNotifications;
+ }
+ });
+
+ for (Notification cur : result) {
+ log.debug(String.format("NotificationQueue %sclaimed events %s",
+ getFullQName(), cur.getId()));
+ if (cur.getOwner() != null && !cur.getOwner().equals(hostname)) {
+ log.warn(String.format("NotificationQueue %s stealing notification %s from %s",
+ getFullQName(), cur, cur.getOwner()));
+ }
+ }
+ return result;
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
new file mode 100644
index 0000000..d1544c8
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+
+public interface NotificationQueueService {
+
+ public interface NotificationQueueHandler {
+ /**
+ * Called when the Notification thread has been started
+ */
+ public void completedQueueStart();
+
+ /**
+ * Called for each notification ready
+ *
+ * @param key the notification key associated to that notification entry
+ */
+ public void handleReadyNotification(String notificationKey);
+ /**
+ * Called right before the Notification thread is about to exit
+ */
+ public void completedQueueStop();
+ }
+
+ /**
+ * Creates a new NotificationQueue for a given associated with the given service and queueName
+ *
+ * @param svcName the name of the service using that queue
+ * @param queueName a name for that queue (unique per service)
+ * @param handler the handler required for notifying the caller of state change
+ * @param config the notification queue configuration
+ *
+ * @return a new NotificationQueue
+ */
+ NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config);
+}
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index c68b18b..30471c7 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -34,4 +34,31 @@ CREATE TABLE tags (
PRIMARY KEY(id)
) ENGINE = innodb;
CREATE INDEX tags_by_object ON tags(object_id);
-CREATE UNIQUE INDEX tags_unique ON tags(tag_description_id, object_id);
\ No newline at end of file
+CREATE UNIQUE INDEX tags_unique ON tags(tag_description_id, object_id);
+
+DROP TABLE IF EXISTS notifications;
+CREATE TABLE notifications (
+ id int(11) unsigned NOT NULL AUTO_INCREMENT,
+ notification_id char(36) NOT NULL,
+ created_dt datetime NOT NULL,
+ notification_key varchar(256) NOT NULL,
+ effective_dt datetime NOT NULL,
+ processing_owner char(36) DEFAULT NULL,
+ processing_available_dt datetime DEFAULT NULL,
+ processing_state varchar(14) DEFAULT 'AVAILABLE',
+ PRIMARY KEY(id)
+) ENGINE=innodb;
+CREATE INDEX `idx_comp_where` ON notifications (`effective_dt`,`processing_state`,`processing_owner`,`processing_available_dt`);
+CREATE INDEX `idx_update` ON notifications (`notification_id`,`processing_state`,`processing_owner`,`processing_available_dt`);
+CREATE INDEX `idx_update1` ON notifications (`notification_id`,`processing_owner`);
+CREATE INDEX `idx_get_ready` ON notifications (`effective_dt`,`created_dt`,`id`);
+
+DROP TABLE IF EXISTS claimed_notifications;
+CREATE TABLE claimed_notifications (
+ id int(11) unsigned NOT NULL AUTO_INCREMENT,
+ sequence_id int(11) unsigned NOT NULL,
+ owner_id varchar(64) NOT NULL,
+ claimed_dt datetime NOT NULL,
+ notification_id char(36) NOT NULL,
+ PRIMARY KEY(id)
+) ENGINE=innodb;
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
new file mode 100644
index 0000000..5a44431
--- /dev/null
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -0,0 +1,83 @@
+group NotificationSqlDao;
+
+getReadyNotifications(now, max) ::= <<
+ select
+ notification_id
+ , notification_key
+ , created_dt
+ , effective_dt
+ , processing_owner
+ , processing_available_dt
+ , processing_state
+ from notifications
+ where
+ effective_dt \<= :now
+ and processing_state != 'PROCESSED'
+ and (processing_owner IS NULL OR processing_available_dt \<= :now)
+ order by
+ effective_dt asc
+ , created_dt asc
+ , id asc
+ limit :max
+ ;
+>>
+
+
+claimNotification(owner, next_available, notification_id, now) ::= <<
+ update notifications
+ set
+ processing_owner = :owner
+ , processing_available_dt = :next_available
+ , processing_state = 'IN_PROCESSING'
+ where
+ notification_id = :notification_id
+ and processing_state != 'PROCESSED'
+ and (processing_owner IS NULL OR processing_available_dt \<= :now)
+ ;
+>>
+
+clearNotification(notification_id, owner) ::= <<
+ update notifications
+ set
+ processing_owner = NULL
+ , processing_state = 'PROCESSED'
+ where
+ notification_id = :notification_id
+ and processing_owner = :owner
+ ;
+>>
+
+insertNotification() ::= <<
+ insert into notifications (
+ notification_id
+ , notification_key
+ , created_dt
+ , effective_dt
+ , processing_owner
+ , processing_available_dt
+ , processing_state
+ ) values (
+ :notification_id
+ , :notification_key
+ , :created_dt
+ , :effective_dt
+ , :processing_owner
+ , :processing_available_dt
+ , :processing_state
+ );
+>>
+
+
+insertClaimedHistory(sequence_id, owner, hostname, claimed_dt, notification_id) ::= <<
+ insert into claimed_notifications (
+ sequence_id
+ , owner_id
+ , claimed_dt
+ , notification_id
+ ) values (
+ :sequence_id
+ , :owner
+ , :claimed_dt
+ , :notification_id
+ );
+>>
diff --git a/util/src/test/java/com/ning/billing/util/clock/ClockMock.java b/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
index 0fb473d..7698697 100644
--- a/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
+++ b/util/src/test/java/com/ning/billing/util/clock/ClockMock.java
@@ -76,6 +76,15 @@ public class ClockMock extends DefaultClock {
deltaFromRealityMs = delta;
}
+ public synchronized void addDeltaFromReality(long delta) {
+ if (deltaType != DeltaType.DELTA_ABS) {
+ throw new RuntimeException("ClockMock should be set with type DELTA_ABS");
+ }
+ deltaFromRealityDuration = null;
+ deltaFromRealitDurationEpsilon = 0;
+ deltaFromRealityMs += delta;
+ }
+
public synchronized void resetDeltaFromReality() {
deltaType = DeltaType.DELTA_NONE;
deltaFromRealityDuration = null;
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
new file mode 100644
index 0000000..bfd29ef
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq.dao;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.ning.billing.dbi.DBIProvider;
+import com.ning.billing.dbi.DbiConfig;
+import com.ning.billing.util.notificationq.DefaultNotification;
+import com.ning.billing.util.notificationq.Notification;
+import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao.NotificationSqlMapper;
+
+@Guice(modules = TestNotificationSqlDao.TestNotificationSqlDaoModule.class)
+public class TestNotificationSqlDao {
+
+ private static AtomicInteger sequenceId = new AtomicInteger();
+
+ @Inject
+ private DBI dbi;
+
+ private NotificationSqlDao dao;
+
+ @BeforeClass(alwaysRun = true)
+ public void setup() {
+ dao = dbi.onDemand(NotificationSqlDao.class);
+ }
+
+
+ @BeforeTest
+ public void cleanupDb() {
+ dbi.withHandle(new HandleCallback<Void>() {
+
+ @Override
+ public Void withHandle(Handle handle) throws Exception {
+ handle.execute("delete from notifications");
+ handle.execute("delete from claimed_notifications");
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void testBasic() throws InterruptedException {
+
+ final String ownerId = UUID.randomUUID().toString();
+
+ String notificationKey = UUID.randomUUID().toString();
+ DateTime effDt = new DateTime();
+ Notification notif = new DefaultNotification(notificationKey, effDt);
+ dao.insertNotification(notif);
+
+ Thread.sleep(1000);
+ DateTime now = new DateTime();
+ List<Notification> notifications = dao.getReadyNotifications(now.toDate(), 3);
+ assertNotNull(notifications);
+ assertEquals(notifications.size(), 1);
+
+ Notification notification = notifications.get(0);
+ assertEquals(notification.getNotificationKey(), notificationKey);
+ validateDate(notification.getEffectiveDate(), effDt);
+ assertEquals(notification.getOwner(), null);
+ assertEquals(notification.getProcessingState(), NotificationLifecycleState.AVAILABLE);
+ assertEquals(notification.getNextAvailableDate(), null);
+
+ DateTime nextAvailable = now.plusMinutes(5);
+ int res = dao.claimNotification(ownerId, nextAvailable.toDate(), notification.getId().toString(), now.toDate());
+ assertEquals(res, 1);
+ dao.insertClaimedHistory(sequenceId.incrementAndGet(), ownerId, now.toDate(), notification.getId().toString());
+
+ notification = fetchNotification(notification.getId().toString());
+ assertEquals(notification.getNotificationKey(), notificationKey);
+ validateDate(notification.getEffectiveDate(), effDt);
+ assertEquals(notification.getOwner().toString(), ownerId);
+ assertEquals(notification.getProcessingState(), NotificationLifecycleState.IN_PROCESSING);
+ validateDate(notification.getNextAvailableDate(), nextAvailable);
+
+ dao.clearNotification(notification.getId().toString(), ownerId);
+
+ notification = fetchNotification(notification.getId().toString());
+ assertEquals(notification.getNotificationKey(), notificationKey);
+ validateDate(notification.getEffectiveDate(), effDt);
+ assertEquals(notification.getOwner(), null);
+ assertEquals(notification.getProcessingState(), NotificationLifecycleState.PROCESSED);
+ validateDate(notification.getNextAvailableDate(), nextAvailable);
+
+ }
+
+ private Notification fetchNotification(final String notificationId) {
+ Notification res = dbi.withHandle(new HandleCallback<Notification>() {
+
+ @Override
+ public Notification withHandle(Handle handle) throws Exception {
+ Notification res = handle.createQuery(" select" +
+ " notification_id" +
+ ", notification_key" +
+ ", created_dt" +
+ ", effective_dt" +
+ ", processing_owner" +
+ ", processing_available_dt" +
+ ", processing_state" +
+ " from notifications " +
+ " where " +
+ " notification_id = '" + notificationId + "';")
+ .map(new NotificationSqlMapper())
+ .first();
+ return res;
+ }
+ });
+ return res;
+ }
+
+ private void validateDate(DateTime input, DateTime expected) {
+ if (input == null && expected != null) {
+ Assert.fail("Got input date null");
+ }
+ if (input != null && expected == null) {
+ Assert.fail("Was expecting null date");
+ }
+ expected = truncateAndUTC(expected);
+ input = truncateAndUTC(input);
+ Assert.assertEquals(input, expected);
+ }
+
+ private DateTime truncateAndUTC(DateTime input) {
+ if (input == null) {
+ return null;
+ }
+ DateTime result = input.minus(input.getMillisOfSecond());
+ return result.toDateTime(DateTimeZone.UTC);
+ }
+
+ public static class TestNotificationSqlDaoModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
+ final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
+ bind(DbiConfig.class).toInstance(config);
+ }
+ }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java b/util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java
new file mode 100644
index 0000000..9495001
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/DummyObject.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.UUID;
+
+public class DummyObject {
+ private final String value;
+ private final UUID key;
+
+ public DummyObject(String value, UUID key) {
+ super();
+ this.value = value;
+ this.key = key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public UUID getKey() {
+ return key;
+ }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java b/util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java
new file mode 100644
index 0000000..83b1b20
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/DummySqlTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
+import org.skife.jdbi.v2.SQLStatement;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.sqlobject.Bind;
+import org.skife.jdbi.v2.sqlobject.Binder;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
+import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
+import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.ExternalizedSqlViaStringTemplate3;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+
+@ExternalizedSqlViaStringTemplate3()
+public interface DummySqlTest extends Transactional<DummySqlTest>, Transmogrifier, CloseMe {
+
+ @SqlUpdate
+ public void insertDummy(@Bind(binder = DummySqlTestBinder.class) DummyObject dummy);
+
+ @SqlQuery
+ @Mapper(DummySqlTestMapper.class)
+ public DummyObject getDummyFromId(@Bind("dummy_id") String dummyId);
+
+ public static class DummySqlTestBinder implements Binder<Bind, DummyObject> {
+ @Override
+ public void bind(@SuppressWarnings("rawtypes") SQLStatement stmt, Bind bind, DummyObject dummy) {
+ stmt.bind("dummy_id", dummy.getKey().toString());
+ stmt.bind("value", dummy.getValue());
+ }
+ }
+
+ public static class DummySqlTestMapper implements ResultSetMapper<DummyObject> {
+ @Override
+ public DummyObject map(int index, ResultSet r, StatementContext ctx)
+ throws SQLException {
+ final UUID key = UUID.fromString(r.getString("dummy_id"));
+ final String value = r.getString("value");
+ return new DummyObject(value, key);
+ }
+ }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
new file mode 100644
index 0000000..5703da1
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -0,0 +1,405 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.skife.config.ConfigurationObjectFactory;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.ning.billing.dbi.DBIProvider;
+import com.ning.billing.dbi.DbiConfig;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+@Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
+public class TestNotificationQueue {
+
+ private final static Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
+
+ @Inject
+ private DBI dbi;
+
+ @Inject
+ private Clock clock;
+
+ private DummySqlTest dao;
+
+ // private NotificationQueue queue;
+
+
+ @BeforeClass(alwaysRun = true)
+ public void setup() {
+ dao = dbi.onDemand(DummySqlTest.class);
+ }
+
+ @BeforeTest
+ public void beforeTest() {
+ dbi.withHandle(new HandleCallback<Void>() {
+
+ @Override
+ public Void withHandle(Handle handle) throws Exception {
+ handle.execute("delete from notifications");
+ handle.execute("delete from claimed_notifications");
+ handle.execute("delete from dummy");
+ return null;
+ }
+ });
+ // Reset time to real value
+ ((ClockMock) clock).resetDeltaFromReality();
+ }
+
+ @AfterTest
+ public void afterTest() {
+
+ }
+
+ /**
+ * Verify that we can call start/stop on a disabled queue and that both start/stop callbacks are called
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testSimpleQueueDisabled() throws InterruptedException {
+
+ final TestStartStop testStartStop = new TestStartStop(false, false);
+ NotificationQueue queue = new NotificationQueue(dbi, clock, "test-svc", "dead",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ }
+ @Override
+ public void completedQueueStop() {
+ testStartStop.stopped();
+ }
+ @Override
+ public void completedQueueStart() {
+ testStartStop.started();
+ }
+ },
+ getNotificationConfig(true, 100, 1, 10000));
+
+ executeTest(testStartStop, queue, new WithTest() {
+ @Override
+ public void test(final NotificationQueue readyQueue) throws InterruptedException {
+ // Do nothing
+ }
+ });
+ assertTrue(true);
+ }
+
+ /**
+ * Test that we can post a notification in the future from a transaction and get the notification
+ * callback with the correct key when the time is ready
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testSimpleNotification() throws InterruptedException {
+
+ final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+ final TestStartStop testStartStop = new TestStartStop(false, false);
+ NotificationQueue queue = new NotificationQueue(dbi, clock, "test-svc", "foo",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ synchronized (expectedNotifications) {
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ @Override
+ public void completedQueueStop() {
+ testStartStop.stopped();
+ }
+ @Override
+ public void completedQueueStart() {
+ testStartStop.started();
+ }
+ },
+ getNotificationConfig(false, 100, 1, 10000));
+
+
+ executeTest(testStartStop, queue, new WithTest() {
+ @Override
+ public void test(final NotificationQueue readyQueue) throws InterruptedException {
+
+ final UUID key = UUID.randomUUID();
+ final DummyObject obj = new DummyObject("foo", key);
+ final DateTime now = new DateTime();
+ final DateTime readyTime = now.plusMillis(2000);
+ final NotificationKey notificationKey = new NotificationKey() {
+ @Override
+ public String toString() {
+ return key.toString();
+ }
+ };
+ expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+
+ // Insert dummy to be processed in 2 sec'
+ dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+ @Override
+ public Void inTransaction(DummySqlTest transactional,
+ TransactionStatus status) throws Exception {
+
+ transactional.insertDummy(obj);
+ readyQueue.recordFutureNotificationFromTransaction(transactional,
+ readyTime, notificationKey);
+ return null;
+ }
+ });
+
+ // Move time in the future after the notification effectiveDate
+ ((ClockMock) clock).setDeltaFromReality(3000);
+
+ // Notification should have kicked but give it at least a sec' for thread scheduling
+ int nbTry = 1;
+ boolean success = false;
+ do {
+ synchronized(expectedNotifications) {
+ if (expectedNotifications.get(notificationKey.toString())) {
+ success = true;
+ break;
+ }
+ expectedNotifications.wait(1000);
+ }
+ } while (nbTry-- > 0);
+ assertEquals(success, true);
+ }
+ });
+ }
+
+ @Test
+ public void testManyNotifications() throws InterruptedException {
+ final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+
+ final TestStartStop testStartStop = new TestStartStop(false, false);
+ NotificationQueue queue = new NotificationQueue(dbi, clock, "test-svc", "many",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ synchronized (expectedNotifications) {
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ @Override
+ public void completedQueueStop() {
+ testStartStop.stopped();
+ }
+ @Override
+ public void completedQueueStart() {
+ testStartStop.started();
+ }
+ },
+ getNotificationConfig(false, 100, 10, 10000));
+
+
+ executeTest(testStartStop, queue, new WithTest() {
+ @Override
+ public void test(final NotificationQueue readyQueue) throws InterruptedException {
+
+ final DateTime now = clock.getUTCNow();
+ final int MAX_NOTIFICATIONS = 100;
+ for (int i = 0; i < MAX_NOTIFICATIONS; i++) {
+
+ final int nextReadyTimeIncrementMs = 1000;
+
+ final UUID key = UUID.randomUUID();
+ final DummyObject obj = new DummyObject("foo", key);
+ final int currentIteration = i;
+
+ final NotificationKey notificationKey = new NotificationKey() {
+ @Override
+ public String toString() {
+ return key.toString();
+ }
+ };
+ expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+
+ dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+ @Override
+ public Void inTransaction(DummySqlTest transactional,
+ TransactionStatus status) throws Exception {
+
+ transactional.insertDummy(obj);
+ readyQueue.recordFutureNotificationFromTransaction(transactional,
+ now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
+ return null;
+ }
+ });
+
+ // Move time in the future after the notification effectiveDate
+ if (i == 0) {
+ ((ClockMock) clock).setDeltaFromReality(nextReadyTimeIncrementMs);
+ } else {
+ ((ClockMock) clock).addDeltaFromReality(nextReadyTimeIncrementMs);
+ }
+ }
+
+ // Wait a little longer since there are a lot of callback that need to happen
+ int nbTry = MAX_NOTIFICATIONS + 1;
+ boolean success = false;
+ do {
+ synchronized(expectedNotifications) {
+
+ Collection<Boolean> completed = Collections2.filter(expectedNotifications.values(), new Predicate<Boolean>() {
+ @Override
+ public boolean apply(Boolean input) {
+ return input;
+ }
+ });
+
+ if (completed.size() == MAX_NOTIFICATIONS) {
+ success = true;
+ break;
+ }
+ //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+ expectedNotifications.wait(1000);
+ }
+ } while (nbTry-- > 0);
+ assertEquals(success, true);
+ }
+ });
+ }
+
+
+ NotificationConfig getNotificationConfig(final boolean off,
+ final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
+ return new NotificationConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return off;
+ }
+ @Override
+ public long getNotificationSleepTimeMs() {
+ return sleepTime;
+ }
+ @Override
+ public int getDaoMaxReadyEvents() {
+ return maxReadyEvents;
+ }
+ @Override
+ public long getDaoClaimTimeMs() {
+ return claimTimeMs;
+ }
+ };
+ }
+
+ private static class TestStartStop {
+ private boolean started;
+ private boolean stopped;
+
+ public TestStartStop(boolean started, boolean stopped) {
+ super();
+ this.started = started;
+ this.stopped = stopped;
+ }
+
+ public void started() {
+ synchronized(this) {
+ started = true;
+ notify();
+ }
+ }
+
+ public void stopped() {
+ synchronized(this) {
+ stopped = true;
+ notify();
+ }
+ }
+
+ public boolean waitForStartComplete(int timeoutMs) throws InterruptedException {
+ return waitForEventCompletion(timeoutMs, true);
+ }
+
+ public boolean waitForStopComplete(int timeoutMs) throws InterruptedException {
+ return waitForEventCompletion(timeoutMs, false);
+ }
+
+ private boolean waitForEventCompletion(int timeoutMs, boolean start) throws InterruptedException {
+ DateTime init = new DateTime();
+ synchronized(this) {
+ while (! ((start ? started : stopped))) {
+ wait(timeoutMs);
+ if (init.plusMillis(timeoutMs).isAfterNow()) {
+ break;
+ }
+ }
+ }
+ return (start ? started : stopped);
+ }
+ }
+
+ private interface WithTest {
+ public void test(NotificationQueue readyQueue) throws InterruptedException;
+ }
+
+ private void executeTest(final TestStartStop testStartStop,
+ NotificationQueue queue, WithTest test) throws InterruptedException{
+
+ queue.startQueue();
+ boolean started = testStartStop.waitForStartComplete(3000);
+ assertEquals(started, true);
+
+ test.test(queue);
+
+ queue.stopQueue();
+ boolean stopped = testStartStop.waitForStopComplete(3000);
+ assertEquals(stopped, true);
+ }
+
+
+ public static class TestNotificationQueueModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ bind(DBI.class).toProvider(DBIProvider.class).asEagerSingleton();
+ final DbiConfig config = new ConfigurationObjectFactory(System.getProperties()).build(DbiConfig.class);
+ bind(DbiConfig.class).toInstance(config);
+ bind(Clock.class).to(ClockMock.class);
+ }
+ }
+
+
+}
diff --git a/util/src/test/resources/com/ning/billing/util/ddl.sql b/util/src/test/resources/com/ning/billing/util/ddl.sql
new file mode 100644
index 0000000..50de498
--- /dev/null
+++ b/util/src/test/resources/com/ning/billing/util/ddl.sql
@@ -0,0 +1,6 @@
+DROP TABLE IF EXISTS dummy;
+CREATE TABLE dummy (
+ dummy_id char(36) NOT NULL,
+ value varchar(256) NOT NULL,
+ PRIMARY KEY(dummy_id)
+) ENGINE = innodb;
diff --git a/util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg b/util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg
new file mode 100644
index 0000000..9a2e6e2
--- /dev/null
+++ b/util/src/test/resources/com/ning/billing/util/notificationq/DummySqlTest.sql.stg
@@ -0,0 +1,21 @@
+group DummySqlTest;
+
+insertDummy() ::= <<
+ insert into dummy (
+ dummy_id
+ , value
+ ) values (
+ :dummy_id
+ , :value
+ );
+>>
+
+getDummyFromId(dummy_id) ::= <<
+ select
+ dummy_id
+ , value
+ from dummy
+ where
+ dummy_id = :dummy_id
+ ;
+>>
\ No newline at end of file