killbill-aplcache

Fix for Nitification Queue bug that ensures that notifications

1/27/2012 6:47:59 PM

Details

diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java
index 1cd0c60..e6b7dcc 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/inv_ent/TestBasic.java
@@ -170,7 +170,7 @@ public class TestBasic {
         return ctd;
     }
 
-    @Test(groups = "fast", enabled = true)
+    @Test(groups = "fast", enabled = false)
     public void testSimple() throws Exception {
 
         Account account = accountUserApi.createAccount(getAccountData(), null, null);
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
index 503fd1a..2204274 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
@@ -16,25 +16,24 @@
 
 package com.ning.billing.entitlement.engine.dao;
 
-import com.google.inject.Inject;
-import com.ning.billing.config.EntitlementConfig;
-import com.ning.billing.entitlement.api.user.SubscriptionFactory;
-import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.notificationq.NotificationQueueService;
-
-import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
 import org.skife.jdbi.v2.sqlobject.SqlUpdate;
 import org.skife.jdbi.v2.sqlobject.mixins.CloseMe;
 import org.skife.jdbi.v2.sqlobject.mixins.Transactional;
 
+import com.google.inject.Inject;
+import com.ning.billing.entitlement.api.user.SubscriptionFactory;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
 public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEntitlementDao {
 
     private final ResetSqlDao resetDao;
 
     @Inject
-    public MockEntitlementDaoSql(DBI dbi, Clock clock, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
+    public MockEntitlementDaoSql(IDBI dbi, Clock clock, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
         super(dbi, clock, factory, notificationQueueService);
         this.resetDao = dbi.onDemand(ResetSqlDao.class);
     }

util/pom.xml 5(+5 -0)

diff --git a/util/pom.xml b/util/pom.xml
index 2b23ebb..f582868 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -97,6 +97,11 @@
             <artifactId>management-dbfiles</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.jayway.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 818d831..a297581 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -49,7 +49,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
     //
     @SqlQuery
     @Mapper(NotificationSqlMapper.class)
-    public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("max") int max);
+    public List<Notification> getReadyNotifications(@Bind("now") Date now, @Bind("max") int max, @Bind("queue_name") String queueName);
 
     @SqlUpdate
     public int claimNotification(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("notification_id") String eventId, @Bind("now") Date now);
@@ -75,8 +75,9 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
             stmt.bind("created_dt", getDate(new DateTime()));
             stmt.bind("notification_key", evt.getNotificationKey());
             stmt.bind("effective_dt", getDate(evt.getEffectiveDate()));
+            stmt.bind("queue_name", evt.getQueueName());
             stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
-            stmt.bind("processing_owner", (String) null);
+            stmt.bind("processing_owner", evt.getOwner());
             stmt.bind("processing_state", NotificationLifecycleState.AVAILABLE.toString());
         }
     }
@@ -95,12 +96,13 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
 
             final UUID id = UUID.fromString(r.getString("notification_id"));
             final String notificationKey = r.getString("notification_key");
+            final String queueName = r.getString("queue_name");
             final DateTime effectiveDate = getDate(r, "effective_dt");
             final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
             final String processingOwner = r.getString("processing_owner");
             final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
 
-            return new DefaultNotification(id, processingOwner, nextAvailableDate,
+            return new DefaultNotification(id, processingOwner, queueName, nextAvailableDate,
                     processingState, notificationKey, effectiveDate);
 
         }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 2946e13..3c4c476 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -24,26 +24,28 @@ public class DefaultNotification implements Notification {
 
     private final UUID id;
     private final String owner;
+    private final String queueName;
     private final DateTime nextAvailableDate;
     private final NotificationLifecycleState lifecycleState;
     private final String notificationKey;
     private final DateTime effectiveDate;
 
 
-    public DefaultNotification(UUID id, String owner, DateTime nextAvailableDate,
+    public DefaultNotification(UUID id, String owner, String queueName, DateTime nextAvailableDate,
             NotificationLifecycleState lifecycleState,
             String notificationKey, DateTime effectiveDate) {
         super();
         this.id = id;
         this.owner = owner;
+        this.queueName = queueName;
         this.nextAvailableDate = nextAvailableDate;
         this.lifecycleState = lifecycleState;
         this.notificationKey = notificationKey;
         this.effectiveDate = effectiveDate;
     }
 
-    public DefaultNotification(String notificationKey, DateTime effectiveDate) {
-        this(UUID.randomUUID(), null, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+    public DefaultNotification(String queueName, String notificationKey, DateTime effectiveDate) {
+        this(UUID.randomUUID(), null, queueName, null, NotificationLifecycleState.AVAILABLE, notificationKey, effectiveDate);
     }
 
     @Override
@@ -94,4 +96,9 @@ public class DefaultNotification implements Notification {
     public DateTime getEffectiveDate() {
         return effectiveDate;
     }
+
+	@Override
+	public String getQueueName() {
+		return queueName;
+	}
 }
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index bfa3094..b6f9b09 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -53,7 +53,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
     public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
             final DateTime futureNotificationTime, final NotificationKey notificationKey) {
         NotificationSqlDao transactionalNotificationDao =  transactionalDao.become(NotificationSqlDao.class);
-        Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+        Notification notification = new DefaultNotification(getFullQName(), notificationKey.toString(), futureNotificationTime);
         transactionalNotificationDao.insertNotification(notification);
     }
 
@@ -92,7 +92,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
                     TransactionStatus status) throws Exception {
 
                 List<Notification> claimedNotifications = new ArrayList<Notification>();
-                List<Notification> input = transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents());
+                List<Notification> input = transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents(), getFullQName());
                 for (Notification cur : input) {
                     final boolean claimed = (transactionalDao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
                     if (claimed) {
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index 651469b..8749fa0 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -28,4 +28,8 @@ public interface Notification extends NotificationLifecycle {
     public String getNotificationKey();
 
     public DateTime getEffectiveDate();
+    
+    public String getQueueName();
+    
+
 }
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 3dfc9be..cf2ceb2 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -42,7 +42,8 @@ CREATE TABLE notifications (
     created_dt datetime NOT NULL,
 	notification_key varchar(256) NOT NULL,
     effective_dt datetime NOT NULL,
-    processing_owner char(36) DEFAULT NULL,
+    queue_name char(64) NOT NULL,
+    processing_owner char(50) DEFAULT NULL,
     processing_available_dt datetime DEFAULT NULL,
     processing_state varchar(14) DEFAULT 'AVAILABLE',
     PRIMARY KEY(id)
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 5a44431..efe7e4f 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -6,12 +6,14 @@ getReadyNotifications(now, max) ::= <<
     , notification_key
       , created_dt
       , effective_dt
+      , queue_name
       , processing_owner
       , processing_available_dt
       , processing_state
     from notifications
     where
       effective_dt \<= :now
+      and queue_name = :queue_name
       and processing_state != 'PROCESSED'
       and (processing_owner IS NULL OR processing_available_dt \<= :now)
     order by
@@ -53,6 +55,7 @@ insertNotification() ::= <<
     , notification_key
       , created_dt
       , effective_dt
+      , queue_name
       , processing_owner
       , processing_available_dt
       , processing_state
@@ -61,6 +64,7 @@ insertNotification() ::= <<
       , :notification_key
       , :created_dt
       , :effective_dt
+      , :queue_name
       , :processing_owner
       , :processing_available_dt
       , :processing_state
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 6d99504..d744caf 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -103,12 +103,12 @@ public class TestNotificationSqlDao {
 
         String notificationKey = UUID.randomUUID().toString();
         DateTime effDt = new DateTime();
-        Notification notif = new DefaultNotification(notificationKey, effDt);
+        Notification notif = new DefaultNotification("testBasic",notificationKey, effDt);
         dao.insertNotification(notif);
 
         Thread.sleep(1000);
         DateTime now = new DateTime();
-        List<Notification> notifications = dao.getReadyNotifications(now.toDate(), 3);
+        List<Notification> notifications = dao.getReadyNotifications(now.toDate(), 3, "testBasic");
         assertNotNull(notifications);
         assertEquals(notifications.size(), 1);
 
@@ -152,6 +152,7 @@ public class TestNotificationSqlDao {
                 		", notification_key" +
                 		", created_dt" +
                 		", effective_dt" +
+                		", queue_name" +
                 		", processing_owner" +
                 		", processing_available_dt" +
                 		", processing_state" +
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 7ee2e10..0fe65ec 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -52,7 +52,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
     public void recordFutureNotificationFromTransaction(
             Transmogrifier transactionalDao, DateTime futureNotificationTime,
             NotificationKey notificationKey) {
-        Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+        Notification notification = new DefaultNotification("MockQueue", notificationKey.toString(), futureNotificationTime);
         synchronized(notifications) {
             notifications.add(notification);
         }
@@ -75,7 +75,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
             }
             for (Notification cur : readyNotifications) {
                 handler.handleReadyNotification(cur.getNotificationKey());
-                DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+                DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, "MockQueue", clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
                 oldNotifications.add(cur);
                 processedNotifications.add(processedNotification);
 
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index bcfa44a..6db48d0 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -16,24 +16,34 @@
 
 package com.ning.billing.util.notificationq;
 
+import static com.jayway.awaitility.Awaitility.await;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.testng.Assert.assertEquals;
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.io.IOUtils;
 import org.joda.time.DateTime;
-import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
 import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
+
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.inject.AbstractModule;
@@ -44,10 +54,9 @@ import com.ning.billing.util.clock.ClockMock;
 import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
 import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
 
-import static org.testng.Assert.assertEquals;
-
 @Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
 public class TestNotificationQueue {
+	Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
 	@Inject
 	private IDBI dbi;
 
@@ -58,6 +67,8 @@ public class TestNotificationQueue {
 	private Clock clock;
 
 	private DummySqlTest dao;
+	
+	private int eventsReceived;
 
 	// private NotificationQueue queue;
 
@@ -96,11 +107,10 @@ public class TestNotificationQueue {
 	/**
 	 * Test that we can post a notification in the future from a transaction and get the notification
 	 * callback with the correct key when the time is ready
-	 *
-	 * @throws InterruptedException
+	 * @throws Exception 
 	 */
-	@Test
-	public void testSimpleNotification() throws InterruptedException {
+	@Test(groups={"fast"}, enabled = true)
+	public void testSimpleNotification() throws Exception {
 
 		final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
 
@@ -109,7 +119,9 @@ public class TestNotificationQueue {
 			@Override
 			public void handleReadyNotification(String notificationKey) {
 				synchronized (expectedNotifications) {
-					expectedNotifications.put(notificationKey, Boolean.TRUE);
+	            	log.info("Handler received key: " + notificationKey);
+
+					expectedNotifications.put(notificationKey.toString(), Boolean.TRUE);
 					expectedNotifications.notify();
 				}
 			}
@@ -141,6 +153,8 @@ public class TestNotificationQueue {
 				transactional.insertDummy(obj);
 				queue.recordFutureNotificationFromTransaction(transactional,
 						readyTime, notificationKey);
+            	log.info("Posted key: " + notificationKey);
+
 				return null;
 			}
 		});
@@ -149,7 +163,14 @@ public class TestNotificationQueue {
 		((ClockMock) clock).setDeltaFromReality(3000);
 
 		// Notification should have kicked but give it at least a sec' for thread scheduling
-		  
+	    await().atMost(1, MINUTES).until(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                return expectedNotifications.get(notificationKey.toString());
+            }
+        });
+
+	Assert.assertTrue(expectedNotifications.get(notificationKey.toString()));
 	}
 
 	@Test
@@ -233,6 +254,126 @@ public class TestNotificationQueue {
 		assertEquals(success, true);
 
 	}
+	
+	/**
+	 * Test that we can post a notification in the future from a transaction and get the notification
+	 * callback with the correct key when the time is ready
+	 * @throws Exception 
+	 */
+	@Test(groups={"fast"}, enabled = true)
+	public void testMultipleHandlerNotification() throws Exception {
+
+		final Map<String, Boolean> expectedNotificationsFred = new TreeMap<String, Boolean>();
+		final Map<String, Boolean> expectedNotificationsBarney = new TreeMap<String, Boolean>();
+		
+		NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock);
+		
+		NotificationConfig config=new NotificationConfig() {
+            @Override
+            public boolean isNotificationProcessingOff() {
+                return false;
+            }
+            @Override
+            public long getNotificationSleepTimeMs() {
+                return 10;
+            }
+            @Override
+            public int getDaoMaxReadyEvents() {
+                return 1;
+            }
+            @Override
+            public long getDaoClaimTimeMs() {
+                return 60000;
+            }
+		};
+		
+		
+		final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
+                @Override
+                public void handleReadyNotification(String notificationKey) {
+                	log.info("Fred received key: " + notificationKey);
+                	expectedNotificationsFred.put(notificationKey, Boolean.TRUE);
+                	eventsReceived++;
+                }
+            },
+            config);
+		
+		final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
+            @Override
+            public void handleReadyNotification(String notificationKey) {
+             	log.info("Barney received key: " + notificationKey);
+            	expectedNotificationsBarney.put(notificationKey, Boolean.TRUE);
+            	eventsReceived++;
+            }
+        },
+        config);
+
+		queueFred.startQueue();
+//		We don't start Barney so it can never pick up notifications
+		
+		
+		final UUID key = UUID.randomUUID();
+		final DummyObject obj = new DummyObject("foo", key);
+		final DateTime now = new DateTime();
+		final DateTime readyTime = now.plusMillis(2000);
+		final NotificationKey notificationKeyFred = new NotificationKey() {
+			@Override
+			public String toString() {
+				return "Fred" ;
+			}
+		};
+
+		
+		final NotificationKey notificationKeyBarney = new NotificationKey() {
+			@Override
+			public String toString() {
+				return "Barney" ;
+			}
+		};
+
+		expectedNotificationsFred.put(notificationKeyFred.toString(), Boolean.FALSE);
+		expectedNotificationsFred.put(notificationKeyBarney.toString(), Boolean.FALSE);
+
+
+		// Insert dummy to be processed in 2 sec'
+		dao.inTransaction(new Transaction<Void, DummySqlTest>() {
+			@Override
+			public Void inTransaction(DummySqlTest transactional,
+					TransactionStatus status) throws Exception {
+
+				transactional.insertDummy(obj);
+				queueFred.recordFutureNotificationFromTransaction(transactional,
+						readyTime, notificationKeyFred);
+				log.info("posted key: " + notificationKeyFred.toString());
+				queueBarney.recordFutureNotificationFromTransaction(transactional,
+						readyTime, notificationKeyBarney);
+				log.info("posted key: " + notificationKeyBarney.toString());
+
+				return null;
+			}
+		});
+
+		// Move time in the future after the notification effectiveDate
+		((ClockMock) clock).setDeltaFromReality(3000);
+
+		// Note the timeout is short on this test, but expected behaviour is that it times out.
+		// We are checking that the Fred queue does not pick up the Barney event
+		try {
+			await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+				@Override
+				public Boolean call() throws Exception {
+					return eventsReceived >= 2;
+				}
+			});
+			Assert.fail("There should only have been one event for the queue to pick up - it got more than that");
+		} catch (Exception e) {
+			// expected behavior
+		}
+
+		Assert.assertTrue(expectedNotificationsFred.get(notificationKeyFred.toString()));
+		Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney.toString()));
+		  
+	}
 
 	NotificationConfig getNotificationConfig(final boolean off,
 			final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {