diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 19923b3..c89f908 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -24,26 +24,27 @@ import java.util.List;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.ning.billing.config.NotificationConfig;
-import com.ning.billing.util.bus.dao.BusEventEntry;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
public class DefaultNotificationQueue extends NotificationQueueBase {
+ private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
- protected final NotificationSqlDao dao;
-
- public DefaultNotificationQueue(final IDBI dbi, final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+ private final NotificationSqlDao dao;
+ public DefaultNotificationQueue(final IDBI dbi, final Clock clock, final String svcName, final String queueName,
+ final NotificationQueueHandler handler, final NotificationConfig config) {
super(clock, svcName, queueName, handler, config);
this.dao = dbi.onDemand(NotificationSqlDao.class);
}
@Override
public int doProcessEvents() {
-
logDebug("ENTER doProcessEvents");
final List<Notification> notifications = getReadyNotifications();
if (notifications.size() == 0) {
@@ -51,24 +52,22 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
return 0;
}
- logDebug("START processing %d events at time %s", notifications.size(), clock.getUTCNow().toDate());
+ logDebug("START processing %d events at time %s", notifications.size(), getClock().getUTCNow().toDate());
int result = 0;
for (final Notification cur : notifications) {
- nbProcessedEvents.incrementAndGet();
- logDebug("handling notification %s, key = %s for time %s",
- cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
- NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
- handler.handleReadyNotification(key, cur.getEffectiveDate());
+ getNbProcessedEvents().incrementAndGet();
+ logDebug("handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
+ final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
+ getHandler().handleReadyNotification(key, cur.getEffectiveDate());
result++;
clearNotification(cur);
- logDebug("done handling notification %s, key = %s for time %s",
- cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
+ logDebug("done handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
}
+
return result;
}
-
@Override
public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
recordFutureNotificationInternal(futureNotificationTime, notificationKey, dao);
@@ -76,47 +75,50 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
@Override
public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
- final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
+ final DateTime futureNotificationTime,
+ final NotificationKey notificationKey) throws IOException {
final NotificationSqlDao transactionalNotificationDao = transactionalDao.become(NotificationSqlDao.class);
recordFutureNotificationInternal(futureNotificationTime, notificationKey, transactionalNotificationDao);
}
-
- private void recordFutureNotificationInternal(final DateTime futureNotificationTime, final NotificationKey notificationKey, final NotificationSqlDao thisDao) throws IOException {
+
+ private void recordFutureNotificationInternal(final DateTime futureNotificationTime,
+ final NotificationKey notificationKey,
+ final NotificationSqlDao thisDao) throws IOException {
final String json = objectMapper.writeValueAsString(notificationKey);
- final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.getClass().getName(), json, futureNotificationTime);
+ final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json, futureNotificationTime);
thisDao.insertNotification(notification);
}
private void clearNotification(final Notification cleared) {
- dao.clearNotification(cleared.getId().toString(), hostname);
+ dao.clearNotification(cleared.getId().toString(), getHostname());
}
private List<Notification> getReadyNotifications() {
-
- final Date now = clock.getUTCNow().toDate();
- final Date nextAvailable = clock.getUTCNow().plus(CLAIM_TIME_MS).toDate();
-
- final List<Notification> input = dao.getReadyNotifications(now, hostname, CLAIM_TIME_MS, getFullQName());
+ final Date now = getClock().getUTCNow().toDate();
+ final Date nextAvailable = getClock().getUTCNow().plus(CLAIM_TIME_MS).toDate();
+ final List<Notification> input = dao.getReadyNotifications(now, getHostname(), CLAIM_TIME_MS, getFullQName());
final List<Notification> claimedNotifications = new ArrayList<Notification>();
for (final Notification cur : input) {
logDebug("about to claim notification %s, key = %s for time %s",
cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
- final boolean claimed = (dao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
+
+ final boolean claimed = (dao.claimNotification(getHostname(), nextAvailable, cur.getId().toString(), now) == 1);
logDebug("claimed notification %s, key = %s for time %s result = %s",
- cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate(), Boolean.valueOf(claimed));
+ cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate(), claimed);
+
if (claimed) {
claimedNotifications.add(cur);
- dao.insertClaimedHistory(hostname, now, cur.getId().toString());
+ dao.insertClaimedHistory(getHostname(), now, cur.getId().toString());
}
}
for (final Notification cur : claimedNotifications) {
- if (cur.getOwner() != null && !cur.getOwner().equals(hostname)) {
- log.warn(String.format("NotificationQueue %s stealing notification %s from %s",
- getFullQName(), cur, cur.getOwner()));
+ if (cur.getOwner() != null && !cur.getOwner().equals(getHostname())) {
+ log.warn("NotificationQueue {} stealing notification {} from {}", new Object[]{getFullQName(), cur, cur.getOwner()});
}
}
+
return claimedNotifications;
}
@@ -130,6 +132,5 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
@Override
public void removeNotificationsByKey(final NotificationKey notificationKey) {
dao.removeNotificationsByKey(notificationKey.toString());
-
}
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 5e7d607..edb4f2e 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -19,7 +19,6 @@ package com.ning.billing.util.notificationq;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
@@ -33,10 +32,10 @@ import com.ning.billing.util.notificationq.NotificationQueueService.Notification
import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
private final TreeSet<Notification> notifications;
- ObjectMapper objectMapper = new ObjectMapper();
-
public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
super(clock, svcName, queueName, handler, config);
notifications = new TreeSet<Notification>(new Comparator<Notification>() {
@@ -52,18 +51,16 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
}
@Override
- public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
+ public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
final String json = objectMapper.writeValueAsString(notificationKey);
- final Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.getClass().getName(), json, futureNotificationTime);
+ final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, futureNotificationTime);
synchronized (notifications) {
notifications.add(notification);
}
}
@Override
- public void recordFutureNotificationFromTransaction(
- final Transmogrifier transactionalDao, final DateTime futureNotificationTime,
- final NotificationKey notificationKey) throws IOException {
+ public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
recordFutureNotification(futureNotificationTime, notificationKey);
}
@@ -75,23 +72,20 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
result.add(notification);
}
}
+
return result;
}
@Override
public int doProcessEvents() {
-
- int result = 0;
-
+ final int result;
final List<Notification> processedNotifications = new ArrayList<Notification>();
final List<Notification> oldNotifications = new ArrayList<Notification>();
final List<Notification> readyNotifications = new ArrayList<Notification>();
synchronized (notifications) {
- final Iterator<Notification> it = notifications.iterator();
- while (it.hasNext()) {
- final Notification cur = it.next();
- if (cur.isAvailableForProcessing(clock.getUTCNow())) {
+ for (final Notification cur : notifications) {
+ if (cur.isAvailableForProcessing(getClock().getUTCNow())) {
readyNotifications.add(cur);
}
}
@@ -99,15 +93,16 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
result = readyNotifications.size();
for (final Notification cur : readyNotifications) {
-
-
- NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
- handler.handleReadyNotification(key, cur.getEffectiveDate());
- final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), hostname, hostname, "MockQueue", clock.getUTCNow().plus(CLAIM_TIME_MS), PersistentQueueEntryLifecycleState.PROCESSED,
- cur.getNotificationKeyClass(), cur.getNotificationKey(), cur.getEffectiveDate());
+ final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
+ getHandler().handleReadyNotification(key, cur.getEffectiveDate());
+ final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
+ "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
+ PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
+ cur.getNotificationKey(), cur.getEffectiveDate());
oldNotifications.add(cur);
processedNotifications.add(processedNotification);
}
+
synchronized (notifications) {
if (oldNotifications.size() > 0) {
notifications.removeAll(oldNotifications);
@@ -117,6 +112,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
notifications.addAll(processedNotifications);
}
}
+
return result;
}
@@ -128,11 +124,11 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
toClearNotifications.add(notification);
}
}
+
synchronized (notifications) {
if (toClearNotifications.size() > 0) {
notifications.removeAll(toClearNotifications);
}
}
-
}
}