killbill-aplcache
Changes
entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java 15(+7 -8)
util/pom.xml 5(+5 -0)
Details
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java
index 1cd0c60..e6b7dcc 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java
@@ -170,7 +170,7 @@ public class TestBasic {
return ctd;
}
- @Test(groups = "fast", enabled = true)
+ @Test(groups = "fast", enabled = false)
public void testSimple() throws Exception {
Account account = accountUserApi.createAccount(getAccountData(), null, null);
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
index 503fd1a..2204274 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
@@ -16,25 +16,24 @@
package com.ning.billing.entitlement.engine.dao;
-import com.google.inject.Inject;
-import com.ning.billing.config.EntitlementConfig;
-import com.ning.billing.entitlement.api.user.SubscriptionFactory;
-import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-
-import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
+import com.google.inject.Inject;
+import com.ning.billing.entitlement.api.user.SubscriptionFactory;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEntitlementDao {
private final ResetSqlDao resetDao;
@Inject
- public MockEntitlementDaoSql(DBI dbi, Clock clock, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
+ public MockEntitlementDaoSql(IDBI dbi, Clock clock, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
super(dbi, clock, factory, notificationQueueService);
this.resetDao = dbi.onDemand(ResetSqlDao.class);
}
util/pom.xml 5(+5 -0)
diff --git a/util/pom.xml b/util/pom.xml
index 2b23ebb..f582868 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -97,6 +97,11 @@
<artifactId>management-dbfiles</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.jayway.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 818d831..a297581 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -49,7 +49,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
//
@SqlQuery
@Mapper(NotificationSqlMapper.class)
- public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("max") int max);
+ public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("max") int max, @Bind("queue_name") String queueName);
@SqlUpdate
public int claimNotification(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("notification_id") String eventId, @Bind("now") Date now);
@@ -75,8 +75,9 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
stmt.bind("created_dt", getDate(new DateTime()));
stmt.bind("notification_key", evt.getNotificationKey());
stmt.bind("effective_dt", getDate(evt.getEffectiveDate()));
+ stmt.bind("queue_name", evt.getQueueName());
stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
- stmt.bind("processing_owner", (String) null);
+ stmt.bind("processing_owner", evt.getOwner());
stmt.bind("processing_state", NotificationLifecycleState.AVAILABLE.toString());
}
}
@@ -95,12 +96,13 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
final UUID id = UUID.fromString(r.getString("notification_id"));
final String notificationKey = r.getString("notification_key");
+ final String queueName = r.getString("queue_name");
final DateTime effectiveDate = getDate(r, "effective_dt");
final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
final String processingOwner = r.getString("processing_owner");
final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
- return new DefaultNotification(id, processingOwner, nextAvailableDate,
+ return new DefaultNotification(id, processingOwner, queueName, nextAvailableDate,
processingState, notificationKey, effectiveDate);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 2946e13..3c4c476 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -24,26 +24,28 @@ public class DefaultNotification implements Notification {
private final UUID id;
private final String owner;
+ private final String queueName;
private final DateTime nextAvailableDate;
private final NotificationLifecycleState lifecycleState;
private final String notificationKey;
private final DateTime effectiveDate;
- public DefaultNotification(UUID id, String owner, DateTime nextAvailableDate,
+ public DefaultNotification(UUID id, String owner, String queueName, DateTime nextAvailableDate,
NotificationLifecycleState lifecycleState,
String notificationKey, DateTime effectiveDate) {
super();
this.id = id;
this.owner = owner;
+ this.queueName = queueName;
this.nextAvailableDate = nextAvailableDate;
this.lifecycleState = lifecycleState;
this.notificationKey = notificationKey;
this.effectiveDate = effectiveDate;
}
- public DefaultNotification(String notificationKey, DateTime effectiveDate) {
- this(UUID.randomUUID(), null, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+ public DefaultNotification(String queueName, String notificationKey, DateTime effectiveDate) {
+ this(UUID.randomUUID(), null, queueName, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
}
@Override
@@ -94,4 +96,9 @@ public class DefaultNotification implements Notification {
public DateTime getEffectiveDate() {
return effectiveDate;
}
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index bfa3094..b6f9b09 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -53,7 +53,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
final DateTime futureNotificationTime, final NotificationKey notificationKey) {
NotificationSqlDao transactionalNotificationDao = transactionalDao.become(NotificationSqlDao.class);
- Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+ Notification notification = new DefaultNotification(getFullQName(), notificationKey.toString(), futureNotificationTime);
transactionalNotificationDao.insertNotification(notification);
}
@@ -92,7 +92,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
TransactionStatus status) throws Exception {
List<Notification> claimedNotifications = new ArrayList<Notification>();
- List<Notification> input = transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents());
+ List<Notification> input = transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents(), getFullQName());
for (Notification cur : input) {
final boolean claimed = (transactionalDao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
if (claimed) {
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index 651469b..8749fa0 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -28,4 +28,8 @@ public interface Notification extends NotificationLifecycle {
public String getNotificationKey();
public DateTime getEffectiveDate();
+
+ public String getQueueName();
+
+
}
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 3dfc9be..cf2ceb2 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -42,7 +42,8 @@ CREATE TABLE notifications (
created_dt datetime NOT NULL,
notification_key varchar(256) NOT NULL,
effective_dt datetime NOT NULL,
- processing_owner char(36) DEFAULT NULL,
+ queue_name char(64) NOT NULL,
+ processing_owner char(50) DEFAULT NULL,
processing_available_dt datetime DEFAULT NULL,
processing_state varchar(14) DEFAULT 'AVAILABLE',
PRIMARY KEY(id)
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 5a44431..efe7e4f 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -6,12 +6,14 @@ getReadyNotifications(now, max) ::= <<
, notification_key
, created_dt
, effective_dt
+ , queue_name
, processing_owner
, processing_available_dt
, processing_state
from notifications
where
effective_dt \<= :now
+ and queue_name = :queue_name
and processing_state != 'PROCESSED'
and (processing_owner IS NULL OR processing_available_dt \<= :now)
order by
@@ -53,6 +55,7 @@ insertNotification() ::= <<
, notification_key
, created_dt
, effective_dt
+ , queue_name
, processing_owner
, processing_available_dt
, processing_state
@@ -61,6 +64,7 @@ insertNotification() ::= <<
, :notification_key
, :created_dt
, :effective_dt
+ , :queue_name
, :processing_owner
, :processing_available_dt
, :processing_state
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 6d99504..d744caf 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -103,12 +103,12 @@ public class TestNotificationSqlDao {
String notificationKey = UUID.randomUUID().toString();
DateTime effDt = new DateTime();
- Notification notif = new DefaultNotification(notificationKey, effDt);
+ Notification notif = new DefaultNotification("testBasic",notificationKey, effDt);
dao.insertNotification(notif);
Thread.sleep(1000);
DateTime now = new DateTime();
- List<Notification> notifications = dao.getReadyNotifications(now.toDate(), 3);
+ List<Notification> notifications = dao.getReadyNotifications(now.toDate(), 3, "testBasic");
assertNotNull(notifications);
assertEquals(notifications.size(), 1);
@@ -152,6 +152,7 @@ public class TestNotificationSqlDao {
", notification_key" +
", created_dt" +
", effective_dt" +
+ ", queue_name" +
", processing_owner" +
", processing_available_dt" +
", processing_state" +
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 7ee2e10..0fe65ec 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -52,7 +52,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
public void recordFutureNotificationFromTransaction(
Transmogrifier transactionalDao, DateTime futureNotificationTime,
NotificationKey notificationKey) {
- Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+ Notification notification = new DefaultNotification("MockQueue", notificationKey.toString(), futureNotificationTime);
synchronized(notifications) {
notifications.add(notification);
}
@@ -75,7 +75,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
}
for (Notification cur : readyNotifications) {
handler.handleReadyNotification(cur.getNotificationKey());
- DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+ DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
oldNotifications.add(cur);
processedNotifications.add(processedNotification);
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index bcfa44a..6db48d0 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -16,24 +16,34 @@
package com.ning.billing.util.notificationq;
+import static com.jayway.awaitility.Awaitility.await;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.testng.Assert.assertEquals;
+
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.io.IOUtils;
import org.joda.time.DateTime;
-import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
+
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.inject.AbstractModule;
@@ -44,10 +54,9 @@ import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
-import static org.testng.Assert.assertEquals;
-
@Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
public class TestNotificationQueue {
+ Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
@Inject
private IDBI dbi;
@@ -58,6 +67,8 @@ public class TestNotificationQueue {
private Clock clock;
private DummySqlTest dao;
+
+ private int eventsReceived;
// private NotificationQueue queue;
@@ -96,11 +107,10 @@ public class TestNotificationQueue {
/**
* Test that we can post a notification in the future from a transaction and get the notification
* callback with the correct key when the time is ready
- *
- * @throws InterruptedException
+ * @throws Exception
*/
- @Test
- public void testSimpleNotification() throws InterruptedException {
+ @Test(groups={"fast"}, enabled = true)
+ public void testSimpleNotification() throws Exception {
final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
@@ -109,7 +119,9 @@ public class TestNotificationQueue {
@Override
public void handleReadyNotification(String notificationKey) {
synchronized (expectedNotifications) {
- expectedNotifications.put(notificationKey, Boolean.TRUE);
+ log.info("Handler received key: " + notificationKey);
+
+ expectedNotifications.put(notificationKey.toString(), Boolean.TRUE);
expectedNotifications.notify();
}
}
@@ -141,6 +153,8 @@ public class TestNotificationQueue {
transactional.insertDummy(obj);
queue.recordFutureNotificationFromTransaction(transactional,
readyTime, notificationKey);
+ log.info("Posted key: " + notificationKey);
+
return null;
}
});
@@ -149,7 +163,14 @@ public class TestNotificationQueue {
((ClockMock) clock).setDeltaFromReality(3000);
// Notification should have kicked but give it at least a sec' for thread scheduling
-
+ await().atMost(1, MINUTES).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return expectedNotifications.get(notificationKey.toString());
+ }
+ });
+
+ Assert.assertTrue(expectedNotifications.get(notificationKey.toString()));
}
@Test
@@ -233,6 +254,126 @@ public class TestNotificationQueue {
assertEquals(success, true);
}
+
+ /**
+ * Test that we can post a notification in the future from a transaction and get the notification
+ * callback with the correct key when the time is ready
+ * @throws Exception
+ */
+ @Test(groups={"fast"}, enabled = true)
+ public void testMultipleHandlerNotification() throws Exception {
+
+ final Map<String, Boolean> expectedNotificationsFred = new TreeMap<String, Boolean>();
+ final Map<String, Boolean> expectedNotificationsBarney = new TreeMap<String, Boolean>();
+
+ NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock);
+
+ NotificationConfig config=new NotificationConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return false;
+ }
+ @Override
+ public long getNotificationSleepTimeMs() {
+ return 10;
+ }
+ @Override
+ public int getDaoMaxReadyEvents() {
+ return 1;
+ }
+ @Override
+ public long getDaoClaimTimeMs() {
+ return 60000;
+ }
+ };
+
+
+ final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ log.info("Fred received key: " + notificationKey);
+ expectedNotificationsFred.put(notificationKey, Boolean.TRUE);
+ eventsReceived++;
+ }
+ },
+ config);
+
+ final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ log.info("Barney received key: " + notificationKey);
+ expectedNotificationsBarney.put(notificationKey, Boolean.TRUE);
+ eventsReceived++;
+ }
+ },
+ config);
+
+ queueFred.startQueue();
+// We don't start Barney so it can never pick up notifications
+
+
+ final UUID key = UUID.randomUUID();
+ final DummyObject obj = new DummyObject("foo", key);
+ final DateTime now = new DateTime();
+ final DateTime readyTime = now.plusMillis(2000);
+ final NotificationKey notificationKeyFred = new NotificationKey() {
+ @Override
+ public String toString() {
+ return "Fred" ;
+ }
+ };
+
+
+ final NotificationKey notificationKeyBarney = new NotificationKey() {
+ @Override
+ public String toString() {
+ return "Barney" ;
+ }
+ };
+
+ expectedNotificationsFred.put(notificationKeyFred.toString(), Boolean.FALSE);
+ expectedNotificationsFred.put(notificationKeyBarney.toString(), Boolean.FALSE);
+
+
+ // Insert dummy to be processed in 2 sec'
+ dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+ @Override
+ public Void inTransaction(DummySqlTest transactional,
+ TransactionStatus status) throws Exception {
+
+ transactional.insertDummy(obj);
+ queueFred.recordFutureNotificationFromTransaction(transactional,
+ readyTime, notificationKeyFred);
+ log.info("posted key: " + notificationKeyFred.toString());
+ queueBarney.recordFutureNotificationFromTransaction(transactional,
+ readyTime, notificationKeyBarney);
+ log.info("posted key: " + notificationKeyBarney.toString());
+
+ return null;
+ }
+ });
+
+ // Move time in the future after the notification effectiveDate
+ ((ClockMock) clock).setDeltaFromReality(3000);
+
+ // Note the timeout is short on this test, but expected behaviour is that it times out.
+ // We are checking that the Fred queue does not pick up the Barney event
+ try {
+ await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return eventsReceived >= 2;
+ }
+ });
+ Assert.fail("There should only have been one event for the queue to pick up - it got more than that");
+ } catch (Exception e) {
+ // expected behavior
+ }
+
+ Assert.assertTrue(expectedNotificationsFred.get(notificationKeyFred.toString()));
+ Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney.toString()));
+
+ }
NotificationConfig getNotificationConfig(final boolean off,
final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {