killbill-memoizeit
Changes
entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java 2(+1 -1)
util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java 15(+8 -7)
util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java 207(+169 -38)
Details
diff --git a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
index 7e2d9cf..410fd97 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
@@ -59,6 +59,7 @@ import com.ning.billing.util.callcontext.DefaultCallContextFactory;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.clock.DefaultClock;
+import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
@@ -75,6 +76,18 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
private EntitlementUserApi entitlementUserApi;
private BusinessTagDao tagDao;
+
+ private NotificationConfig config = new NotificationConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return false;
+ }
+ @Override
+ public long getSleepTimeMs() {
+ return 3000;
+ }
+ };
+
@BeforeMethod(groups = "slow")
public void setUp() throws Exception {
final IDBI dbi = helper.getDBI();
@@ -92,7 +105,7 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
accountUserApi = new DefaultAccountUserApi(callContextFactory, internalCallContextFactory, accountDao, accountEmailDao);
final CatalogService catalogService = new DefaultCatalogService(Mockito.mock(CatalogConfig.class), Mockito.mock(VersionedCatalogLoader.class));
final AddonUtils addonUtils = new AddonUtils(catalogService);
- final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, internalCallContextFactory);
+ final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, config, internalCallContextFactory);
final EntitlementDao entitlementDao = new AuditedEntitlementDao(dbi, clock, addonUtils, notificationQueueService, eventBus, catalogService);
final PlanAligner planAligner = new PlanAligner(catalogService);
final DefaultSubscriptionApiService apiService = new DefaultSubscriptionApiService(clock, entitlementDao, catalogService, planAligner, internalCallContextFactory);
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
index 81c832a..eddf4b5 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
@@ -63,6 +63,8 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
private final InternalCallContextFactory internalCallContextFactory;
private final AccountInternalApi accountApi;
+ private volatile boolean isStarted;
+
private static final class EventBusDelegate extends EventBus {
public EventBusDelegate(final String busName) {
@@ -90,10 +92,12 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
public void start() {
startQueue();
+ isStarted = true;
}
public void stop() {
stopQueue();
+ isStarted = false;
}
@Override
@@ -119,6 +123,11 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
return result;
}
+ @Override
+ public boolean isStarted() {
+ return isStarted;
+ }
+
private final UUID getAccountIdFromRecordId(final Long recordId, final InternalCallContext context) {
try {
final Account account = accountApi.getAccountByRecordId(recordId, context);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
index 081132e..e035ba0 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2010-2011 Ning, Inc.
+ * Copyright 2010-2012 Ning, Inc.
*
* Ning licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
index c84d1b0..fd6154d 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
@@ -58,6 +58,8 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
private final String hostname;
private final InternalCallContextFactory internalCallContextFactory;
+ private volatile boolean isStarted;
+
private static final class EventBusDelegate extends EventBus {
public EventBusDelegate(final String busName) {
@@ -94,16 +96,19 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
this.hostname = Hostname.get();
this.internalCallContextFactory = internalCallContextFactory;
+ this.isStarted = false;
}
@Override
public void start() {
startQueue();
+ isStarted = true;
}
@Override
public void stop() {
stopQueue();
+ isStarted = false;
}
@Override
@@ -129,6 +134,11 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
return result;
}
+ @Override
+ public boolean isStarted() {
+ return isStarted;
+ }
+
private List<BusEventEntry> getNextBusEvent(final InternalCallContext context) {
final Date now = clock.getUTCNow().toDate();
final Date nextAvailable = clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 811846e..4fe0000 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -55,7 +55,6 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
public List<Notification> getReadyNotifications(@Bind("now") Date now,
@Bind("owner") String owner,
@Bind("max") int max,
- @Bind("queueName") String queueName,
@InternalTenantContextBinder final InternalTenantContext context);
@SqlQuery
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index b20c1a7..461410d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -17,68 +17,55 @@
package com.ning.billing.util.notificationq;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
import java.util.UUID;
-import javax.annotation.Nullable;
-
import org.joda.time.DateTime;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.ning.billing.util.Hostname;
import com.ning.billing.util.config.NotificationConfig;
-import com.ning.billing.util.callcontext.CallOrigin;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
-import com.ning.billing.util.callcontext.UserType;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
-public class DefaultNotificationQueue extends NotificationQueueBase {
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class DefaultNotificationQueue implements NotificationQueue {
private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
private final NotificationSqlDao dao;
- private final InternalCallContextFactory internalCallContextFactory;
+ private final String hostname;
+
+ private final String svcName;
+ private final String queueName;
+
+ private final ObjectMapper objectMapper;
+
+ private final NotificationQueueHandler handler;
- public DefaultNotificationQueue(final IDBI dbi, final Clock clock, final String svcName, final String queueName,
- final NotificationQueueHandler handler, final NotificationConfig config,
- final InternalCallContextFactory internalCallContextFactory) {
- super(clock, svcName, queueName, handler, config);
+ private final NotificationQueueService notificationQueueService;
+
+ private volatile boolean isStarted;
+
+ public DefaultNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler,
+ final IDBI dbi, final NotificationQueueService notificationQueueService) {
+ this.svcName = svcName;
+ this.queueName = queueName;
+ this.handler = handler;
this.dao = dbi.onDemand(NotificationSqlDao.class);
- this.internalCallContextFactory = internalCallContextFactory;
+ this.hostname = Hostname.get();
+ this.notificationQueueService = notificationQueueService;
+ this.objectMapper = new ObjectMapper();
}
- @Override
- public int doProcessEvents() {
- logDebug("ENTER doProcessEvents");
- // Finding and claiming notifications is not done per tenant (yet?)
- final List<Notification> notifications = getReadyNotifications(createCallContext(null, null));
- if (notifications.size() == 0) {
- logDebug("EXIT doProcessEvents");
- return 0;
- }
-
- logDebug("START processing %d events at time %s", notifications.size(), getClock().getUTCNow().toDate());
-
- int result = 0;
- for (final Notification cur : notifications) {
- getNbProcessedEvents().incrementAndGet();
- logDebug("handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
- final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
- getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
- result++;
- clearNotification(cur, createCallContext(cur.getTenantRecordId(), cur.getAccountRecordId()));
- logDebug("done handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
- }
-
- return result;
- }
+
@Override
public void recordFutureNotification(final DateTime futureNotificationTime,
@@ -104,50 +91,12 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
final NotificationSqlDao thisDao,
final InternalCallContext context) throws IOException {
final String json = objectMapper.writeValueAsString(notificationKey);
- final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json,
+ final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.getClass().getName(), json,
accountId, futureNotificationTime, context.getAccountRecordId(), context.getTenantRecordId());
thisDao.insertNotification(notification, context);
}
- private void clearNotification(final Notification cleared, final InternalCallContext context) {
- dao.clearNotification(cleared.getId().toString(), getHostname(), context);
- }
-
- private List<Notification> getReadyNotifications(final InternalCallContext context) {
- final Date now = getClock().getUTCNow().toDate();
- final Date nextAvailable = getClock().getUTCNow().plus(CLAIM_TIME_MS).toDate();
- final List<Notification> input = dao.getReadyNotifications(now, getHostname(), CLAIM_TIME_MS, getFullQName(), context);
-
- final List<Notification> claimedNotifications = new ArrayList<Notification>();
- for (final Notification cur : input) {
- logDebug("about to claim notification %s, key = %s for time %s",
- cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-
- final boolean claimed = (dao.claimNotification(getHostname(), nextAvailable, cur.getId().toString(), now, context) == 1);
- logDebug("claimed notification %s, key = %s for time %s result = %s",
- cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate(), claimed);
-
- if (claimed) {
- claimedNotifications.add(cur);
- dao.insertClaimedHistory(getHostname(), now, cur.getId().toString(), context);
- }
- }
-
- for (final Notification cur : claimedNotifications) {
- if (cur.getOwner() != null && !cur.getOwner().equals(getHostname())) {
- log.warn("NotificationQueue {} stealing notification {} from {}", new Object[]{getFullQName(), cur, cur.getOwner()});
- }
- }
-
- return claimedNotifications;
- }
- private void logDebug(final String format, final Object... args) {
- if (log.isDebugEnabled()) {
- final String realDebug = String.format(format, args);
- log.debug(String.format("Thread %d [queue = %s] %s", Thread.currentThread().getId(), getFullQName(), realDebug));
- }
- }
@Override
public void removeNotificationsByKey(final NotificationKey notificationKey, final InternalCallContext context) {
@@ -164,7 +113,41 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
dao.removeNotification(notificationId.toString(), context);
}
- private InternalCallContext createCallContext(@Nullable final Long tenantRecordId, @Nullable final Long accountRecordId) {
- return internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "NotificationQueue", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+ @Override
+ public String getFullQName() {
+ return NotificationQueueServiceBase.getCompositeName(svcName, queueName);
+ }
+
+ @Override
+ public String getServiceName() {
+ return svcName;
}
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
+
+ @Override
+ public NotificationQueueHandler getHandler() {
+ return handler;
+ }
+
+ @Override
+ public void startQueue() {
+ notificationQueueService.startQueue();
+ isStarted = true;
+ }
+
+ @Override
+ public void stopQueue() {
+ notificationQueueService.stopQueue();
+ isStarted = false;
+ }
+
+ @Override
+ public boolean isStarted() {
+ return isStarted;
+ }
+
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index 350a140..277cadf 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -21,25 +21,26 @@ import org.skife.jdbi.v2.IDBI;
import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
import com.google.inject.Inject;
public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
+
private final IDBI dbi;
- private final InternalCallContextFactory internalCallContextFactory;
@Inject
- public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final InternalCallContextFactory internalCallContextFactory) {
- super(clock);
+ public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final NotificationConfig config,
+ final InternalCallContextFactory internalCallContextFactory) {
+ super(clock, config, dbi, internalCallContextFactory);
this.dbi = dbi;
- this.internalCallContextFactory = internalCallContextFactory;
}
+
@Override
protected NotificationQueue createNotificationQueueInternal(final String svcName,
final String queueName,
- final NotificationQueueHandler handler,
- final NotificationConfig config) {
- return new DefaultNotificationQueue(dbi, clock, svcName, queueName, handler, config, internalCallContextFactory);
+ final NotificationQueueHandler handler) {
+ return new DefaultNotificationQueue(svcName, queueName, handler, dbi, this);
}
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 1444425..f5ab4bb 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -24,6 +24,7 @@ import org.joda.time.DateTime;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import com.ning.billing.util.callcontext.InternalCallContext;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
import com.ning.billing.util.queue.QueueLifecycle;
public interface NotificationQueue extends QueueLifecycle {
@@ -67,13 +68,6 @@ public interface NotificationQueue extends QueueLifecycle {
public void removeNotification(final UUID notificationId,
final InternalCallContext context);
- /**
- * This is only valid when the queue has been configured with isNotificationProcessingOff is true
- * In which case, it will callback users for all the ready notifications.
- *
- * @return the number of entries we processed
- */
- public int processReadyNotification();
/**
* @return the name of that queue
@@ -89,4 +83,6 @@ public interface NotificationQueue extends QueueLifecycle {
* @return the queue name associated
*/
public String getQueueName();
+
+ public NotificationQueueHandler getHandler();
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index d6df6dd..f5f493d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -19,8 +19,9 @@ package com.ning.billing.util.notificationq;
import org.joda.time.DateTime;
import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.queue.QueueLifecycle;
-public interface NotificationQueueService {
+public interface NotificationQueueService extends QueueLifecycle {
public interface NotificationQueueHandler {
@@ -89,8 +90,7 @@ public interface NotificationQueueService {
throws NoSuchNotificationQueue;
/**
- * @param services
* @return the number of processed notifications
*/
- public int triggerManualQueueProcessing(final String[] services, final Boolean keepRunning);
+ public int triggerManualQueueProcessing(final Boolean keepRunning);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 8763192..4ad413c 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -22,26 +22,25 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
import com.google.common.base.Joiner;
import com.google.inject.Inject;
-public abstract class NotificationQueueServiceBase implements NotificationQueueService {
- protected final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+public abstract class NotificationQueueServiceBase extends NotificationQueueDispatcher implements NotificationQueueService {
- protected final Clock clock;
-
- private final Map<String, NotificationQueue> queues;
@Inject
- public NotificationQueueServiceBase(final Clock clock) {
- this.clock = clock;
- this.queues = new TreeMap<String, NotificationQueue>();
+ public NotificationQueueServiceBase(final Clock clock, final NotificationConfig config, final IDBI dbi,
+ final InternalCallContextFactory internalCallContextFactory) {
+ super(clock, config, dbi, internalCallContextFactory);
}
@Override
@@ -61,7 +60,7 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
throw new NotificationQueueAlreadyExists(String.format("Queue for svc %s and name %s already exist",
svcName, queueName));
}
- result = createNotificationQueueInternal(svcName, queueName, handler, config);
+ result = createNotificationQueueInternal(svcName, queueName, handler);
queues.put(compositeName, result);
}
return result;
@@ -96,33 +95,29 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
}
}
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("NotificationQueueServiceBase");
+ sb.append("{queues=").append(queues);
+ sb.append('}');
+ return sb.toString();
+ }
+
+
//
// Test ONLY
//
@Override
- public int triggerManualQueueProcessing(final String[] services, final Boolean keepRunning) {
+ public int triggerManualQueueProcessing(final Boolean keepRunning) {
int result = 0;
- List<NotificationQueue> manualQueues = null;
- if (services == null) {
- manualQueues = new ArrayList<NotificationQueue>(queues.values());
- } else {
- final Joiner join = Joiner.on(",");
- join.join(services);
-
- log.info("Trigger manual processing for services {} ", join.toString());
- manualQueues = new LinkedList<NotificationQueue>();
- synchronized (queues) {
- for (final String svc : services) {
- addQueuesForService(manualQueues, svc);
- }
- }
- }
+ List<NotificationQueue> manualQueues = new ArrayList<NotificationQueue>(queues.values());
for (final NotificationQueue cur : manualQueues) {
int processedNotifications = 0;
do {
- processedNotifications = cur.processReadyNotification();
+ doProcessEventsWithLimit(1);
log.info("Got {} results from queue {}", processedNotifications, cur.getFullQName());
result += processedNotifications;
} while (keepRunning && processedNotifications > 0);
@@ -130,28 +125,6 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
return result;
}
- private void addQueuesForService(final List<NotificationQueue> result, final String svcName) {
- for (final String cur : queues.keySet()) {
- if (cur.startsWith(svcName)) {
- result.add(queues.get(cur));
- }
- }
- }
-
protected abstract NotificationQueue createNotificationQueueInternal(String svcName,
- String queueName, NotificationQueueHandler handler,
- NotificationConfig config);
-
- public static String getCompositeName(final String svcName, final String queueName) {
- return svcName + ":" + queueName;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("NotificationQueueServiceBase");
- sb.append("{queues=").append(queues);
- sb.append('}');
- return sb.toString();
- }
+ String queueName, NotificationQueueHandler handler);
}
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index 8deea0e..2fa9baa 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -34,23 +34,25 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
private final int nbThreads;
private final Executor executor;
- private final String svcName;
+ private final String svcQName;
private final long sleepTimeMs;
private boolean isProcessingEvents;
private int curActiveThreads;
protected final ObjectMapper objectMapper;
-
- public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
+
+
+ public PersistentQueueBase(final String svcQName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
this.executor = executor;
this.nbThreads = nbThreads;
- this.svcName = svcName;
+ this.svcQName = svcQName;
this.sleepTimeMs = config.getSleepTimeMs();
this.objectMapper = new ObjectMapper();
this.isProcessingEvents = false;
this.curActiveThreads = 0;
}
+
@Override
public void startQueue() {
@@ -61,7 +63,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
final CountDownLatch doneInitialization = new CountDownLatch(nbThreads);
log.info(String.format("%s: Starting with %d threads",
- svcName, nbThreads));
+ svcQName, nbThreads));
for (int i = 0; i < nbThreads; i++) {
executor.execute(new Runnable() {
@@ -69,7 +71,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
public void run() {
log.info(String.format("%s: Thread %s [%d] starting",
- svcName,
+ svcQName,
Thread.currentThread().getName(),
Thread.currentThread().getId()));
@@ -93,19 +95,19 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
doProcessEvents();
} catch (Exception e) {
log.warn(String.format("%s: Thread %s [%d] got an exception, catching and moving on...",
- svcName,
+ svcQName,
Thread.currentThread().getName(),
Thread.currentThread().getId()), e);
}
sleepALittle();
}
} catch (InterruptedException e) {
- log.info(String.format("%s: Thread %s got interrupted, exting... ", svcName, Thread.currentThread().getName()));
+ log.info(String.format("%s: Thread %s got interrupted, exting... ", svcQName, Thread.currentThread().getName()));
} catch (Throwable e) {
- log.error(String.format("%s: Thread %s got an exception, exting... ", svcName, Thread.currentThread().getName()), e);
+ log.error(String.format("%s: Thread %s got an exception, exting... ", svcQName, Thread.currentThread().getName()), e);
} finally {
- log.info(String.format("%s: Thread %s has exited", svcName, Thread.currentThread().getName()));
+ log.info(String.format("%s: Thread %s has exited", svcQName, Thread.currentThread().getName()));
synchronized (thePersistentQ) {
curActiveThreads--;
}
@@ -121,12 +123,12 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
final boolean success = doneInitialization.await(waitTimeoutMs, TimeUnit.MILLISECONDS);
if (!success) {
- log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+ log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcQName, (nbThreads - doneInitialization.getCount()), nbThreads));
} else {
- log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+ log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcQName, (nbThreads - doneInitialization.getCount()), nbThreads));
}
} catch (InterruptedException e) {
- log.warn(String.format("%s: Start sequence, got interrupted", svcName));
+ log.warn(String.format("%s: Start sequence, got interrupted", svcQName));
}
}
@@ -147,17 +149,17 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
}
} catch (InterruptedException ignore) {
- log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcName, curActiveThreads));
+ log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcQName, curActiveThreads));
} finally {
if (remaining > 0) {
- log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+ log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcQName, curActiveThreads));
} else {
- log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+ log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcQName, curActiveThreads));
}
curActiveThreads = 0;
}
}
-
+
protected <T> T deserializeEvent(final String className, final String json) {
try {
final Class<?> claz = Class.forName(className);
@@ -168,9 +170,8 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
}
}
+ public abstract int doProcessEvents();
+ public abstract boolean isStarted();
-
- @Override
- public abstract int doProcessEvents();
}
diff --git a/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java b/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
index b839468..e183bb9 100644
--- a/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
@@ -26,8 +26,5 @@ public interface QueueLifecycle {
*/
public void stopQueue();
- /**
- * Processes event from queue
- */
- public int doProcessEvents();
+ public boolean isStarted();
}
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 980d9c9..4e36ef9 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -131,7 +131,7 @@ CREATE TABLE notifications (
PRIMARY KEY(record_id)
) ENGINE=innodb;
CREATE UNIQUE INDEX notifications_id ON notifications(id);
-CREATE INDEX `idx_comp_where` ON notifications (`effective_date`, `queue_name`, `processing_state`,`processing_owner`,`processing_available_date`);
+CREATE INDEX `idx_comp_where` ON notifications (`effective_date`, `processing_state`,`processing_owner`,`processing_available_date`);
CREATE INDEX `idx_update` ON notifications (`processing_state`,`processing_owner`,`processing_available_date`);
CREATE INDEX `idx_get_ready` ON notifications (`effective_date`,`created_date`,`id`);
CREATE INDEX notifications_tenant_account_record_id ON notifications(tenant_record_id, account_record_id);
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 8287ea4..70651e8 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -23,7 +23,6 @@ getReadyNotifications() ::= <<
FORCE INDEX (idx_comp_where)
where
effective_date \<= :now
- and queue_name = :queueName
and processing_state != 'PROCESSED'
and processing_state != 'REMOVED'
and (processing_owner IS NULL OR processing_available_date \<= :now)
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 54be599..ea5c536 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -86,7 +86,7 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
Thread.sleep(1000);
final DateTime now = new DateTime();
- final List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, "testBasic", internalCallContext);
+ final List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, internalCallContext);
assertNotNull(notifications);
assertEquals(notifications.size(), 1);
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 2a59bfd..bffb033 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import com.ning.billing.util.Hostname;
import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.clock.Clock;
@@ -34,13 +35,27 @@ import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueue
import com.fasterxml.jackson.databind.ObjectMapper;
-public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
+public class MockNotificationQueue implements NotificationQueue {
+
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private final String hostname;
private final TreeSet<Notification> notifications;
+ private final Clock clock;
+ private final String svcName;
+ private final String queueName;
+ private final NotificationQueueHandler handler;
+
+ private volatile boolean isStarted;
+
+ public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler) {
+
+ this.svcName = svcName;
+ this.queueName = queueName;
+ this.handler = handler;
+ this.clock = clock;
+ this.hostname = Hostname.get();
- public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
- super(clock, svcName, queueName, handler, config);
notifications = new TreeSet<Notification>(new Comparator<Notification>() {
@Override
public int compare(final Notification o1, final Notification o2) {
@@ -57,7 +72,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
public void recordFutureNotification(final DateTime futureNotificationTime, final UUID accountId,
final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
final String json = objectMapper.writeValueAsString(notificationKey);
- final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, accountId, futureNotificationTime,
+ final Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.getClass().getName(), json, accountId, futureNotificationTime,
null, 0L);
synchronized (notifications) {
notifications.add(notification);
@@ -70,81 +85,107 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
recordFutureNotification(futureNotificationTime, accountId, notificationKey, context);
}
- public List<Notification> getPendingEvents() {
- final List<Notification> result = new ArrayList<Notification>();
+ @Override
+ public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
+ final List<Notification> toClearNotifications = new ArrayList<Notification>();
for (final Notification notification : notifications) {
- if (notification.getProcessingState() == PersistentQueueEntryLifecycleState.AVAILABLE) {
- result.add(notification);
+ if (notification.getNotificationKey().equals(key.toString())) {
+ toClearNotifications.add(notification);
}
}
- return result;
+ synchronized (notifications) {
+ if (toClearNotifications.size() > 0) {
+ notifications.removeAll(toClearNotifications);
+ }
+ }
}
@Override
- public int doProcessEvents() {
- final int result;
- final List<Notification> processedNotifications = new ArrayList<Notification>();
- final List<Notification> oldNotifications = new ArrayList<Notification>();
+ public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
- final List<Notification> readyNotifications = new ArrayList<Notification>();
+ final List<Notification> result = new ArrayList<Notification>();
synchronized (notifications) {
- for (final Notification cur : notifications) {
- if (cur.isAvailableForProcessing(getClock().getUTCNow())) {
- readyNotifications.add(cur);
+ for (Notification cur : notifications) {
+ if (cur.getAccountId().equals(accountId) || cur.getEffectiveDate().compareTo(effectiveDate) == 0) {
+ result.add(cur);
}
}
}
+ return result;
+ }
- result = readyNotifications.size();
- for (final Notification cur : readyNotifications) {
- final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
- getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
- final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
- "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
- PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
- cur.getNotificationKey(), cur.getAccountId(), cur.getEffectiveDate(),
- cur.getAccountRecordId(), cur.getTenantRecordId());
- oldNotifications.add(cur);
- processedNotifications.add(processedNotification);
- }
-
+ @Override
+ public void removeNotification(final UUID notificationId, final InternalCallContext context) {
synchronized (notifications) {
- if (oldNotifications.size() > 0) {
- notifications.removeAll(oldNotifications);
- }
-
- if (processedNotifications.size() > 0) {
- notifications.addAll(processedNotifications);
+ for (Notification cur : notifications) {
+ if (cur.getId().equals(notificationId)) {
+ notifications.remove(cur);
+ break;
+ }
}
}
- return result;
}
@Override
- public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
- final List<Notification> toClearNotifications = new ArrayList<Notification>();
- for (final Notification notification : notifications) {
- if (notification.getNotificationKey().equals(key.toString())) {
- toClearNotifications.add(notification);
- }
- }
+ public String getFullQName() {
+ return NotificationQueueDispatcher.getCompositeName(svcName, queueName);
+ }
- synchronized (notifications) {
- if (toClearNotifications.size() > 0) {
- notifications.removeAll(toClearNotifications);
- }
- }
+ @Override
+ public String getServiceName() {
+ return svcName;
}
@Override
- public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
- return null;
+ public String getQueueName() {
+ return queueName;
}
@Override
- public void removeNotification(final UUID notificationId, final InternalCallContext context) {
+ public NotificationQueueHandler getHandler() {
+ return handler;
+ }
+
+ @Override
+ public void startQueue() {
+ isStarted = true;
+ }
+
+ @Override
+ public void stopQueue() {
+ isStarted = false;
+ }
+
+ @Override
+ public boolean isStarted() {
+ return isStarted;
}
+
+
+ public List<Notification> getReadyNotifications() {
+ final int result;
+ final List<Notification> processedNotifications = new ArrayList<Notification>();
+ final List<Notification> oldNotifications = new ArrayList<Notification>();
+
+ final List<Notification> readyNotifications = new ArrayList<Notification>();
+ synchronized (notifications) {
+ for (final Notification cur : notifications) {
+ if (cur.isAvailableForProcessing(clock.getUTCNow())) {
+ readyNotifications.add(cur);
+ }
+ }
+ }
+ return readyNotifications;
+ }
+
+ public void markProcessedNotifications(final List<Notification> toBeremoved, final List<Notification> toBeAdded ) {
+ synchronized (notifications) {
+ notifications.removeAll(toBeremoved);
+ notifications.addAll(toBeAdded);
+ }
+ }
+
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
index ee5ca32..45acc5b 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
@@ -16,22 +16,65 @@
package com.ning.billing.util.notificationq;
-import com.ning.billing.util.config.NotificationConfig;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
import com.google.inject.Inject;
public class MockNotificationQueueService extends NotificationQueueServiceBase {
@Inject
- public MockNotificationQueueService(final Clock clock) {
- super(clock);
+ public MockNotificationQueueService(final Clock clock, final NotificationConfig config) {
+ super(clock, config, null, null);
+ }
+
+
+ @Override
+ protected NotificationQueue createNotificationQueueInternal(final String svcName, final String queueName,
+ final NotificationQueueHandler handler) {
+ return new MockNotificationQueue(clock, svcName, queueName, handler);
}
+
@Override
- protected NotificationQueue createNotificationQueueInternal(final String svcName,
- final String queueName, final NotificationQueueHandler handler,
- final NotificationConfig config) {
- return new MockNotificationQueue(clock, svcName, queueName, handler, config);
+ public int doProcessEvents() {
+
+ int result = 0;
+
+ for (NotificationQueue cur : queues.values()) {
+ result += doProcessEventsForQueue((MockNotificationQueue) cur);
+ }
+ return result;
+ }
+
+ private int doProcessEventsForQueue(final MockNotificationQueue queue) {
+
+
+ int result = 0;
+ final List<Notification> processedNotifications = new ArrayList<Notification>();
+ final List<Notification> oldNotifications = new ArrayList<Notification>();
+
+ List<Notification> readyNotifications = queue.getReadyNotifications();
+ for (final Notification cur : readyNotifications) {
+ final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
+ queue.getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
+ final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
+ "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
+ PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
+ cur.getNotificationKey(), cur.getAccountId(), cur.getEffectiveDate(),
+ cur.getAccountRecordId(), cur.getTenantRecordId());
+ oldNotifications.add(cur);
+ processedNotifications.add(processedNotification);
+ result++;
+ }
+
+ queue.markProcessedNotifications(oldNotifications, processedNotifications);
+ return result;
}
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index ee19da5..2faa969 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -38,12 +38,12 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import com.ning.billing.KillbillTestSuiteWithEmbeddedDB;
-import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.dbi.MysqlTestingHelper;
import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.io.IOUtils;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
@@ -62,6 +62,7 @@ import static org.testng.Assert.assertEquals;
@Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
+
private final Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
private static final UUID accountId = UUID.randomUUID();
@@ -75,11 +76,18 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
@Inject
private Clock clock;
+ @Inject
+ NotificationQueueService queueService;
+
+ @Inject
+ NotificationConfig config;
+
private DummySqlTest dao;
private int eventsReceived;
private static final class TestNotificationKey implements NotificationKey, Comparable<TestNotificationKey> {
+
private final String value;
@JsonCreator
@@ -96,6 +104,13 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
public int compareTo(TestNotificationKey arg0) {
return value.compareTo(arg0.value);
}
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(value);
+ return sb.toString();
+ }
}
@BeforeSuite(groups = "slow")
@@ -133,20 +148,21 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
- final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
- synchronized (expectedNotifications) {
- log.info("Handler received key: " + notificationKey);
-
- expectedNotifications.put(notificationKey, Boolean.TRUE);
- expectedNotifications.notify();
- }
- }
- },
- getNotificationConfig(false, 100, 1, 10000),
- new InternalCallContextFactory(dbi, clock));
+ final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+ "foo",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+ synchronized (expectedNotifications) {
+ log.info("Handler received key: " + notificationKey);
+
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ },
+ config);
+
queue.startQueue();
@@ -188,21 +204,24 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
}
@Test(groups = "slow")
- public void testManyNotifications() throws InterruptedException {
+ public void testManyNotifications() throws Exception {
final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
- final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
- synchronized (expectedNotifications) {
- expectedNotifications.put(notificationKey, Boolean.TRUE);
- expectedNotifications.notify();
- }
- }
- },
- getNotificationConfig(false, 100, 10, 10000),
- new InternalCallContextFactory(dbi, clock));
+
+ final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+ "many",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+ synchronized (expectedNotifications) {
+ log.info("Handler received key: " + notificationKey.toString());
+
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ },
+ config);
queue.startQueue();
@@ -216,7 +235,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final DummyObject obj = new DummyObject("foo", key);
final int currentIteration = i;
- final NotificationKey notificationKey = new TestNotificationKey(key.toString());
+ final NotificationKey notificationKey = new TestNotificationKey(new Integer(i).toString());
expectedNotifications.put(notificationKey, Boolean.FALSE);
dao.inTransaction(new Transaction<Void, DummySqlTest>() {
@@ -255,7 +274,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
success = true;
break;
}
- //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+ log.info(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
expectedNotifications.wait(1000);
}
} while (nbTry-- > 0);
@@ -281,19 +300,9 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final Map<NotificationKey, Boolean> expectedNotificationsFred = new TreeMap<NotificationKey, Boolean>();
final Map<NotificationKey, Boolean> expectedNotificationsBarney = new TreeMap<NotificationKey, Boolean>();
- final NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, new InternalCallContextFactory(dbi, clock));
- final NotificationConfig config = new NotificationConfig() {
- @Override
- public boolean isNotificationProcessingOff() {
- return false;
- }
+ final NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, config, new InternalCallContextFactory(dbi, clock));
- @Override
- public long getSleepTimeMs() {
- return 10;
- }
- };
final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
@Override
@@ -367,39 +376,29 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney));
}
- NotificationConfig getNotificationConfig(final boolean off, final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
- return new NotificationConfig() {
- @Override
- public boolean isNotificationProcessingOff() {
- return off;
- }
-
- @Override
- public long getSleepTimeMs() {
- return sleepTime;
- }
- };
- }
@Test(groups = "slow")
- public void testRemoveNotifications() throws InterruptedException {
+ public void testRemoveNotifications() throws Exception {
final UUID key = UUID.randomUUID();
final NotificationKey notificationKey = new TestNotificationKey(key.toString());
final UUID key2 = UUID.randomUUID();
final NotificationKey notificationKey2 = new TestNotificationKey(key2.toString());
- final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
- if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
- log.info("Received notification with key: " + notificationKey);
- eventsReceived++;
- }
- }
- },
- getNotificationConfig(false, 100, 10, 10000),
- new InternalCallContextFactory(dbi, clock));
+
+ final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+ "remove",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+ if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
+ log.info("Received notification with key: " + notificationKey);
+ eventsReceived++;
+ }
+ }
+ },
+ config);
+
+
queue.startQueue();
@@ -440,10 +439,26 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
queue.stopQueue();
}
+
+ static NotificationConfig getNotificationConfig(final boolean off, final long sleepTime) {
+ return new NotificationConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return off;
+ }
+
+ @Override
+ public long getSleepTimeMs() {
+ return sleepTime;
+ }
+ };
+ }
+
public static class TestNotificationQueueModule extends AbstractModule {
+
@Override
protected void configure() {
- bind(Clock.class).to(ClockMock.class);
+ bind(Clock.class).to(ClockMock.class).asEagerSingleton();
final MysqlTestingHelper helper = KillbillTestSuiteWithEmbeddedDB.getMysqlTestingHelper();
bind(MysqlTestingHelper.class).toInstance(helper);
@@ -451,6 +466,8 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
bind(IDBI.class).toInstance(dbi);
final IDBI otherDbi = helper.getDBI();
bind(IDBI.class).annotatedWith(Names.named("global-lock")).toInstance(otherDbi);
+ bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+ bind(NotificationConfig.class).toInstance(getNotificationConfig(false, 100));
}
}
}