killbill-aplcache
Changes
entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java 26(+21 -5)
entitlement/src/main/java/com/ning/billing/entitlement/engine/core/ApiEventProcessorBase.java 251(+0 -251)
entitlement/src/main/java/com/ning/billing/entitlement/engine/core/DefaultApiEventProcessor.java 63(+0 -63)
entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java 146(+79 -67)
entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg 79(+7 -72)
entitlement/src/test/java/com/ning/billing/entitlement/engine/core/MockApiEventProcessorMemory.java 63(+0 -63)
entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java 113(+71 -42)
entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java 21(+14 -7)
util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java 37(+7 -30)
Details
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
index c2f5878..19b851d 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/test/DefaultEntitlementTestApi.java
@@ -18,7 +18,11 @@ package com.ning.billing.entitlement.api.test;
import com.google.inject.Inject;
import com.ning.billing.config.EntitlementConfig;
-import com.ning.billing.entitlement.engine.core.EventNotifier;
+import com.ning.billing.entitlement.engine.core.Engine;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,20 +32,32 @@ public class DefaultEntitlementTestApi implements EntitlementTestApi {
private final static Logger log = LoggerFactory.getLogger(DefaultEntitlementTestApi.class);
- private final EventNotifier apiEventProcessor;
private final EntitlementConfig config;
+ private final NotificationQueueService notificationQueueService;
@Inject
- public DefaultEntitlementTestApi(EventNotifier apiEventProcessor, EntitlementConfig config) {
- this.apiEventProcessor = apiEventProcessor;
+ public DefaultEntitlementTestApi(NotificationQueueService notificationQueueService, EntitlementConfig config) {
this.config = config;
+ this.notificationQueueService = notificationQueueService;
}
@Override
public void doProcessReadyEvents(UUID [] subscriptionsIds, Boolean recursive, Boolean oneEventOnly) {
if (config.isEventProcessingOff()) {
log.warn("Running event processing loop");
- apiEventProcessor.processAllReadyEvents(subscriptionsIds, recursive, oneEventOnly);
+ NotificationQueue queue = getNotificationQueue();
+ queue.processReadyNotification();
+
+ }
+ }
+
+ private NotificationQueue getNotificationQueue() {
+ try {
+ NotificationQueue subscritionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+ Engine.NOTIFICATION_QUEUE_NAME);
+ return subscritionEventQueue;
+ } catch (NoSuchNotificationQueue e) {
+ throw new RuntimeException(e);
}
}
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 3eb5dd3..7bd1124 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -16,6 +16,9 @@
package com.ning.billing.entitlement.engine.core;
+import java.util.UUID;
+
+
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,10 +48,16 @@ import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.eventbus.EventBus;
import com.ning.billing.util.eventbus.EventBus.EventBusException;
+import com.ning.billing.util.notificationq.NotificationConfig;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotficationQueueAlreadyExists;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
public class Engine implements EventListener, EntitlementService {
- private static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
+ public static final String NOTIFICATION_QUEUE_NAME = "subscription-events";
+ public static final String ENTITLEMENT_SERVICE_NAME = "entitlement-service";
private final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000; // 10 secs
private final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000; // 1 sec
@@ -58,33 +67,36 @@ public class Engine implements EventListener, EntitlementService {
private final Clock clock;
private final EntitlementDao dao;
- private final EventNotifier apiEventProcessor;
private final PlanAligner planAligner;
private final EntitlementUserApi userApi;
private final EntitlementBillingApi billingApi;
private final EntitlementTestApi testApi;
private final EntitlementMigrationApi migrationApi;
private final EventBus eventBus;
+ private final EntitlementConfig config;
+ private final NotificationQueueService notificationQueueService;
private boolean startedNotificationThread;
+ private boolean stoppedNotificationThread;
+ private NotificationQueue subscritionEventQueue;
@Inject
- public Engine(Clock clock, EntitlementDao dao, EventNotifier apiEventProcessor,
- PlanAligner planAligner, EntitlementConfig config, DefaultEntitlementUserApi userApi,
+ public Engine(Clock clock, EntitlementDao dao, PlanAligner planAligner,
+ EntitlementConfig config, DefaultEntitlementUserApi userApi,
DefaultEntitlementBillingApi billingApi, DefaultEntitlementTestApi testApi,
- DefaultEntitlementMigrationApi migrationApi, EventBus eventBus) {
+ DefaultEntitlementMigrationApi migrationApi, EventBus eventBus,
+ NotificationQueueService notificationQueueService) {
super();
this.clock = clock;
this.dao = dao;
- this.apiEventProcessor = apiEventProcessor;
this.planAligner = planAligner;
this.userApi = userApi;
this.testApi = testApi;
this.billingApi = billingApi;
this.migrationApi = migrationApi;
+ this.config = config;
this.eventBus = eventBus;
-
- this.startedNotificationThread = false;
+ this.notificationQueueService = notificationQueueService;
}
@Override
@@ -92,20 +104,75 @@ public class Engine implements EventListener, EntitlementService {
return ENTITLEMENT_SERVICE_NAME;
}
-
@LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
public void initialize() {
+
+ try {
+ this.stoppedNotificationThread = false;
+ this.startedNotificationThread = false;
+ subscritionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
+ NOTIFICATION_QUEUE_NAME,
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(String notificationKey) {
+ EntitlementEvent event = dao.getEventById(UUID.fromString(notificationKey));
+ if (event == null) {
+ log.warn("Failed to extract event for notification key {}", notificationKey);
+ } else {
+ processEventReady(event);
+ }
+ }
+
+ @Override
+ public void completedQueueStop() {
+ synchronized (this) {
+ stoppedNotificationThread = true;
+ this.notifyAll();
+ }
+ }
+ @Override
+ public void completedQueueStart() {
+ synchronized (this) {
+ startedNotificationThread = true;
+ this.notifyAll();
+ }
+ }
+ },
+ new NotificationConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return config.isEventProcessingOff();
+ }
+ @Override
+ public long getNotificationSleepTimeMs() {
+ return config.getNotificationSleepTimeMs();
+ }
+ @Override
+ public int getDaoMaxReadyEvents() {
+ return config.getDaoMaxReadyEvents();
+ }
+ @Override
+ public long getDaoClaimTimeMs() {
+ return config.getDaoMaxReadyEvents();
+ }
+ });
+ } catch (NotficationQueueAlreadyExists e) {
+ throw new RuntimeException(e);
+ }
}
@LifecycleHandlerType(LifecycleLevel.START_SERVICE)
public void start() {
- apiEventProcessor.startNotifications(this);
+ subscritionEventQueue.startQueue();
waitForNotificationStartCompletion();
}
@LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
public void stop() {
- apiEventProcessor.stopNotifications();
+ if (subscritionEventQueue != null) {
+ subscritionEventQueue.stopQueue();
+ waitForNotificationStopCompletion();
+ }
startedNotificationThread = false;
}
@@ -133,6 +200,9 @@ public class Engine implements EventListener, EntitlementService {
@Override
public void processEventReady(EntitlementEvent event) {
+ if (!event.isActive()) {
+ return;
+ }
SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(event.getSubscriptionId());
if (subscription == null) {
log.warn("Failed to retrieve subscription for id %s", event.getSubscriptionId());
@@ -148,23 +218,20 @@ public class Engine implements EventListener, EntitlementService {
}
}
- //
- // We want to ensure the notification thread is indeed started when we return from start()
- //
- @Override
- public void completedNotificationStart() {
- synchronized (this) {
- startedNotificationThread = true;
- this.notifyAll();
- }
+ private void waitForNotificationStartCompletion() {
+ waitForNotificationEventCompletion(true);
}
- private void waitForNotificationStartCompletion() {
+ private void waitForNotificationStopCompletion() {
+ waitForNotificationEventCompletion(false);
+ }
+
+ private void waitForNotificationEventCompletion(boolean startEvent) {
long ini = System.nanoTime();
synchronized(this) {
do {
- if (startedNotificationThread) {
+ if ((startEvent ? startedNotificationThread : stoppedNotificationThread)) {
break;
}
try {
@@ -173,14 +240,18 @@ public class Engine implements EventListener, EntitlementService {
Thread.currentThread().interrupt();
throw new EntitlementError(e);
}
- } while (!startedNotificationThread &&
+ } while (!(startEvent ? startedNotificationThread : stoppedNotificationThread) &&
(System.nanoTime() - ini) / NANO_TO_MS < MAX_NOTIFICATION_THREAD_WAIT_MS);
- if (!startedNotificationThread) {
- log.error("Could not start notification thread in %d msec !!!", MAX_NOTIFICATION_THREAD_WAIT_MS);
+ if (!(startEvent ? startedNotificationThread : stoppedNotificationThread)) {
+ log.error("Could not {} notification thread in {} msec !!!",
+ (startEvent ? "start" : "stop"),
+ MAX_NOTIFICATION_THREAD_WAIT_MS);
throw new EntitlementError("Failed to start service!!");
}
- log.info("Notification thread has been started in {} ms", (System.nanoTime() - ini) / NANO_TO_MS);
+ log.info("Notification thread has been {} in {} ms",
+ (startEvent ? "started" : "stopped"),
+ (System.nanoTime() - ini) / NANO_TO_MS);
}
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java
index 7a84c51..e9962d8 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventListener.java
@@ -24,5 +24,4 @@ public interface EventListener {
public void processEventReady(EntitlementEvent event);
- public void completedNotificationStart();
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
index b118110..ea62b84 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
@@ -54,14 +54,12 @@ public interface EntitlementDao {
// Event apis
public void createNextPhaseEvent(UUID subscriptionId, EntitlementEvent nextPhase);
+ public EntitlementEvent getEventById(UUID eventId);
+
public List<EntitlementEvent> getEventsForSubscription(UUID subscriptionId);
public List<EntitlementEvent> getPendingEventsForSubscription(UUID subscriptionId);
- public List<EntitlementEvent> getEventsReady(UUID ownerId, int sequenceId);
-
- public void clearEventsReady(UUID ownerId, Collection<EntitlementEvent> cleared);
-
// Subscription creation, cancellation, changePlan apis
public void createSubscription(SubscriptionData subscription, List<EntitlementEvent> initialEvents);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
index fc4db0f..72a71d8 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
@@ -17,7 +17,6 @@
package com.ning.billing.entitlement.engine.dao;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@@ -25,7 +24,6 @@ import java.util.UUID;
import com.google.inject.Inject;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.config.EntitlementConfig;
import com.ning.billing.entitlement.api.migration.AccountMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
@@ -35,16 +33,23 @@ import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.api.user.SubscriptionFactory;
import com.ning.billing.entitlement.api.user.SubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.engine.core.Engine;
import com.ning.billing.entitlement.events.EntitlementEvent;
import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
import com.ning.billing.entitlement.events.user.ApiEvent;
import com.ning.billing.entitlement.events.user.ApiEventType;
import com.ning.billing.entitlement.exceptions.EntitlementError;
-import com.ning.billing.util.Hostname;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
+import org.joda.time.DateTime;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,19 +62,17 @@ public class EntitlementSqlDao implements EntitlementDao {
private final SubscriptionSqlDao subscriptionsDao;
private final BundleSqlDao bundlesDao;
private final EventSqlDao eventsDao;
- private final EntitlementConfig config;
- private final String hostname;
private final SubscriptionFactory factory;
+ private final NotificationQueueService notificationQueueService;
@Inject
- public EntitlementSqlDao(DBI dbi, Clock clock, EntitlementConfig config, SubscriptionFactory factory) {
+ public EntitlementSqlDao(DBI dbi, Clock clock, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
this.clock = clock;
- this.config = config;
this.factory = factory;
this.subscriptionsDao = dbi.onDemand(SubscriptionSqlDao.class);
this.eventsDao = dbi.onDemand(EventSqlDao.class);
this.bundlesDao = dbi.onDemand(BundleSqlDao.class);
- this.hostname = Hostname.get();
+ this.notificationQueueService = notificationQueueService;
}
@Override
@@ -146,11 +149,24 @@ public class EntitlementSqlDao implements EntitlementDao {
TransactionStatus status) throws Exception {
cancelNextPhaseEventFromTransaction(subscriptionId, dao);
dao.insertEvent(nextPhase);
+ recordFutureNotificationFromTransaction(dao,
+ nextPhase.getEffectiveDate(),
+ new NotificationKey() {
+ @Override
+ public String toString() {
+ return nextPhase.getId().toString();
+ }
+ });
return null;
}
});
}
+ @Override
+ public EntitlementEvent getEventById(UUID eventId) {
+ return eventsDao.getEventById(eventId.toString());
+ }
+
@Override
public List<EntitlementEvent> getEventsForSubscription(UUID subscriptionId) {
@@ -165,61 +181,6 @@ public class EntitlementSqlDao implements EntitlementDao {
return results;
}
- @Override
- public List<EntitlementEvent> getEventsReady(final UUID ownerId, final int sequenceId) {
-
- final Date now = clock.getUTCNow().toDate();
- final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
-
- log.debug(String.format("EntitlementDao getEventsReady START effectiveNow = %s", now));
-
- List<EntitlementEvent> events = eventsDao.inTransaction(new Transaction<List<EntitlementEvent>, EventSqlDao>() {
-
- @Override
- public List<EntitlementEvent> inTransaction(EventSqlDao dao,
- TransactionStatus status) throws Exception {
-
- List<EntitlementEvent> claimedEvents = new ArrayList<EntitlementEvent>();
- List<EntitlementEvent> input = dao.getReadyEvents(now, config.getDaoMaxReadyEvents());
- for (EntitlementEvent cur : input) {
- final boolean claimed = (dao.claimEvent(ownerId.toString(), nextAvailable, cur.getId().toString(), now) == 1);
- if (claimed) {
- claimedEvents.add(cur);
- dao.insertClaimedHistory(sequenceId, ownerId.toString(), hostname, now, cur.getId().toString());
- }
- }
- return claimedEvents;
- }
- });
-
- for (EntitlementEvent cur : events) {
- log.debug(String.format("EntitlementDao %s [host %s] claimed events %s", ownerId, hostname, cur.getId()));
- if (cur.getOwner() != null && !cur.getOwner().equals(ownerId)) {
- log.warn(String.format("EventProcessor %s stealing event %s from %s", ownerId, cur, cur.getOwner()));
- }
- }
- return events;
- }
-
- @Override
- public void clearEventsReady(final UUID ownerId, final Collection<EntitlementEvent> cleared) {
-
- log.debug(String.format("EntitlementDao clearEventsReady START cleared size = %d", cleared.size()));
-
- eventsDao.inTransaction(new Transaction<Void, EventSqlDao>() {
-
- @Override
- public Void inTransaction(EventSqlDao dao,
- TransactionStatus status) throws Exception {
- // STEPH Same here batch would nice
- for (EntitlementEvent cur : cleared) {
- dao.clearEvent(cur.getId().toString(), ownerId.toString());
- log.debug(String.format("EntitlementDao %s [host %s] cleared events %s", ownerId, hostname, cur.getId()));
- }
- return null;
- }
- });
- }
@Override
public void createSubscription(final SubscriptionData subscription,
@@ -234,8 +195,16 @@ public class EntitlementSqlDao implements EntitlementDao {
dao.insertSubscription(subscription);
// STEPH batch as well
EventSqlDao eventsDaoFromSameTranscation = dao.become(EventSqlDao.class);
- for (EntitlementEvent cur : initialEvents) {
+ for (final EntitlementEvent cur : initialEvents) {
eventsDaoFromSameTranscation.insertEvent(cur);
+ recordFutureNotificationFromTransaction(dao,
+ cur.getEffectiveDate(),
+ new NotificationKey() {
+ @Override
+ public String toString() {
+ return cur.getId().toString();
+ }
+ });
}
return null;
}
@@ -252,6 +221,14 @@ public class EntitlementSqlDao implements EntitlementDao {
cancelNextChangeEventFromTransaction(subscriptionId, dao);
cancelNextPhaseEventFromTransaction(subscriptionId, dao);
dao.insertEvent(cancelEvent);
+ recordFutureNotificationFromTransaction(dao,
+ cancelEvent.getEffectiveDate(),
+ new NotificationKey() {
+ @Override
+ public String toString() {
+ return cancelEvent.getId().toString();
+ }
+ });
return null;
}
});
@@ -281,8 +258,16 @@ public class EntitlementSqlDao implements EntitlementDao {
if (existingCancelId != null) {
dao.unactiveEvent(existingCancelId.toString(), now);
- for (EntitlementEvent cur : uncancelEvents) {
+ for (final EntitlementEvent cur : uncancelEvents) {
dao.insertEvent(cur);
+ recordFutureNotificationFromTransaction(dao,
+ cur.getEffectiveDate(),
+ new NotificationKey() {
+ @Override
+ public String toString() {
+ return cur.getId().toString();
+ }
+ });
}
}
return null;
@@ -298,8 +283,16 @@ public class EntitlementSqlDao implements EntitlementDao {
TransactionStatus status) throws Exception {
cancelNextChangeEventFromTransaction(subscriptionId, dao);
cancelNextPhaseEventFromTransaction(subscriptionId, dao);
- for (EntitlementEvent cur : changeEvents) {
+ for (final EntitlementEvent cur : changeEvents) {
dao.insertEvent(cur);
+ recordFutureNotificationFromTransaction(dao,
+ cur.getEffectiveDate(),
+ new NotificationKey() {
+ @Override
+ public String toString() {
+ return cur.getId().toString();
+ }
+ });
}
return null;
}
@@ -372,8 +365,16 @@ public class EntitlementSqlDao implements EntitlementDao {
SubscriptionBundleData bundleData = curBundle.getData();
for (SubscriptionMigrationData curSubscription : curBundle.getSubscriptions()) {
SubscriptionData subData = curSubscription.getData();
- for (EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
+ for (final EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
transEventDao.insertEvent(curEvent);
+ recordFutureNotificationFromTransaction(transEventDao,
+ curEvent.getEffectiveDate(),
+ new NotificationKey() {
+ @Override
+ public String toString() {
+ return curEvent.getId().toString();
+ }
+ });
}
transSubDao.insertSubscription(subData);
}
@@ -384,6 +385,7 @@ public class EntitlementSqlDao implements EntitlementDao {
});
}
+
@Override
public void undoMigration(final UUID accountId) {
@@ -412,4 +414,14 @@ public class EntitlementSqlDao implements EntitlementDao {
transBundleDao.removeBundle(curBundle.getId().toString());
}
}
+
+ private void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime effectiveDate, final NotificationKey notificationKey) {
+ try {
+ NotificationQueue subscritionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+ Engine.NOTIFICATION_QUEUE_NAME);
+ subscritionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
+ } catch (NoSuchNotificationQueue e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java
index de61217..5f485e5 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EventSqlDao.java
@@ -19,7 +19,6 @@ package com.ning.billing.entitlement.engine.dao;
import com.ning.billing.entitlement.events.EntitlementEvent;
import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
import com.ning.billing.entitlement.events.EventBaseBuilder;
-import com.ning.billing.entitlement.events.EventLifecycle.EventLifecycleState;
import com.ning.billing.entitlement.events.phase.PhaseEvent;
import com.ning.billing.entitlement.events.phase.PhaseEventBuilder;
import com.ning.billing.entitlement.events.phase.PhaseEventData;
@@ -49,43 +48,31 @@ import java.util.UUID;
@ExternalizedSqlViaStringTemplate3()
public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transmogrifier {
- //
- // APIs for event notifications
- //
@SqlQuery
- @Mapper(IEventSqlMapper.class)
- public List<EntitlementEvent> getReadyEvents(@Bind("now") Date now, @Bind("max") int max);
+ @Mapper(EventSqlMapper.class)
+ public EntitlementEvent getEventById(@Bind("event_id") String eventId);
@SqlUpdate
- public int claimEvent(@Bind("owner") String owner, @Bind("next_available") Date nextAvailable, @Bind("event_id") String eventId, @Bind("now") Date now);
+ public void insertEvent(@Bind(binder = EventSqlDaoBinder.class) EntitlementEvent evt);
@SqlUpdate
public void removeEvents(@Bind("subscription_id") String subscriptionId);
@SqlUpdate
- public void clearEvent(@Bind("event_id") String eventId, @Bind("owner") String owner);
-
- @SqlUpdate
- public void insertEvent(@Bind(binder = IEventSqlDaoBinder.class) EntitlementEvent evt);
-
- @SqlUpdate
- public void insertClaimedHistory(@Bind("sequence_id") int sequenceId, @Bind("owner_id") String ownerId, @Bind("hostname") String hostname, @Bind("claimed_dt") Date clainedDate, @Bind("event_id") String eventId);
-
- @SqlUpdate
public void unactiveEvent(@Bind("event_id")String eventId, @Bind("now") Date now);
@SqlUpdate
public void reactiveEvent(@Bind("event_id")String eventId, @Bind("now") Date now);
@SqlQuery
- @Mapper(IEventSqlMapper.class)
+ @Mapper(EventSqlMapper.class)
public List<EntitlementEvent> getFutureActiveEventForSubscription(@Bind("subscription_id") String subscriptionId, @Bind("now") Date now);
@SqlQuery
- @Mapper(IEventSqlMapper.class)
+ @Mapper(EventSqlMapper.class)
public List<EntitlementEvent> getEventsForSubscription(@Bind("subscription_id") String subscriptionId);
- public static class IEventSqlDaoBinder implements Binder<Bind, EntitlementEvent> {
+ public static class EventSqlDaoBinder implements Binder<Bind, EntitlementEvent> {
private Date getDate(DateTime dateTime) {
return dateTime == null ? null : dateTime.toDate();
@@ -106,13 +93,10 @@ public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transm
stmt.bind("plist_name", (evt.getType() == EventType.API_USER) ? ((ApiEvent) evt).getPriceList() : null);
stmt.bind("current_version", evt.getActiveVersion());
stmt.bind("is_active", evt.isActive());
- stmt.bind("processing_available_dt", getDate(evt.getNextAvailableDate()));
- stmt.bind("processing_owner", (String) null);
- stmt.bind("processing_state", EventLifecycleState.AVAILABLE.toString());
}
}
- public static class IEventSqlMapper implements ResultSetMapper<EntitlementEvent> {
+ public static class EventSqlMapper implements ResultSetMapper<EntitlementEvent> {
private DateTime getDate(ResultSet r, String fieldName) throws SQLException {
final Timestamp resultStamp = r.getTimestamp(fieldName);
@@ -135,9 +119,6 @@ public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transm
String priceListName = r.getString("plist_name");
long currentVersion = r.getLong("current_version");
boolean isActive = r.getBoolean("is_active");
- DateTime nextAvailableDate = getDate(r, "processing_available_dt");
- UUID processingOwner = (r.getString("processing_owner") != null) ? UUID.fromString(r.getString("processing_owner")) : null;
- EventLifecycleState processingState = EventLifecycleState.valueOf(r.getString("processing_state"));
EventBaseBuilder<?> base = ((eventType == EventType.PHASE) ?
new PhaseEventBuilder() :
@@ -148,11 +129,7 @@ public interface EventSqlDao extends Transactional<EventSqlDao>, CloseMe, Transm
.setEffectiveDate(effectiveDate)
.setProcessedDate(createdDate)
.setActiveVersion(currentVersion)
- .setActive(isActive)
- .setProcessingOwner(processingOwner)
- .setNextAvailableProcessingTime(nextAvailableDate)
- .setProcessingState(processingState);
-
+ .setActive(isActive);
EntitlementEvent result = null;
if (eventType == EventType.PHASE) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java
index af74752..b7bfece 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/EntitlementEvent.java
@@ -21,7 +21,7 @@ import org.joda.time.DateTime;
import java.util.UUID;
-public interface EntitlementEvent extends EventLifecycle, Comparable<EntitlementEvent> {
+public interface EntitlementEvent extends Comparable<EntitlementEvent> {
public enum EventType {
API_USER,
@@ -32,6 +32,16 @@ public interface EntitlementEvent extends EventLifecycle, Comparable<Entitlement
public UUID getId();
+ public long getActiveVersion();
+
+ public void setActiveVersion(long activeVersion);
+
+ public boolean isActive();
+
+ public void deactivate();
+
+ public void reactivate();
+
public DateTime getProcessedDate();
public DateTime getRequestedDate();
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java
index 93dc9e1..9420fbf 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBase.java
@@ -30,12 +30,8 @@ public abstract class EventBase implements EntitlementEvent {
private final DateTime effectiveDate;
private final DateTime processedDate;
- // Lifecyle of the event
private long activeVersion;
private boolean isActive;
- private UUID processingOwner;
- private DateTime nextAvailableProcessingTime;
- private EventLifecycleState processingState;
public EventBase(EventBaseBuilder<?> builder) {
this.uuid = builder.getUuid();
@@ -46,31 +42,17 @@ public abstract class EventBase implements EntitlementEvent {
this.activeVersion = builder.getActiveVersion();
this.isActive = builder.isActive();
- this.processingOwner = builder.getProcessingOwner();
- this.nextAvailableProcessingTime = builder.getNextAvailableProcessingTime();
- this.processingState = builder.getProcessingState();
}
public EventBase(UUID subscriptionId, DateTime requestedDate,
DateTime effectiveDate, DateTime processedDate,
long activeVersion, boolean isActive) {
- this(subscriptionId, requestedDate, effectiveDate, processedDate, activeVersion, isActive, null, null, EventLifecycleState.AVAILABLE);
- }
-
- private EventBase(UUID subscriptionId, DateTime requestedDate,
- DateTime effectiveDate, DateTime processedDate,
- long activeVersion, boolean isActive,
- UUID processingOwner, DateTime nextAvailableProcessingTime,
- EventLifecycleState processingState) {
- this(UUID.randomUUID(), subscriptionId, requestedDate, effectiveDate, processedDate, activeVersion, isActive,
- processingOwner, nextAvailableProcessingTime, processingState);
+ this(UUID.randomUUID(), subscriptionId, requestedDate, effectiveDate, processedDate, activeVersion, isActive);
}
public EventBase(UUID id, UUID subscriptionId, DateTime requestedDate,
DateTime effectiveDate, DateTime processedDate,
- long activeVersion, boolean isActive,
- UUID processingOwner, DateTime nextAvailableProcessingTime,
- EventLifecycleState processingState) {
+ long activeVersion, boolean isActive) {
this.uuid = id;
this.subscriptionId = subscriptionId;
this.requestedDate = requestedDate;
@@ -79,10 +61,6 @@ public abstract class EventBase implements EntitlementEvent {
this.activeVersion = activeVersion;
this.isActive = isActive;
- this.processingOwner = processingOwner;
- this.nextAvailableProcessingTime = nextAvailableProcessingTime;
- this.processingState = processingState;
-
}
@@ -138,64 +116,9 @@ public abstract class EventBase implements EntitlementEvent {
}
- @Override
- public UUID getOwner() {
- return processingOwner;
- }
-
- @Override
- public void setOwner(UUID owner) {
- this.processingOwner = owner;
- }
-
- @Override
- public DateTime getNextAvailableDate() {
- return nextAvailableProcessingTime;
- }
-
- @Override
- public void setNextAvailableDate(DateTime dateTime) {
- this.nextAvailableProcessingTime = dateTime;
- }
-
-
- @Override
- public EventLifecycleState getProcessingState() {
- return processingState;
- }
-
- @Override
- public void setProcessingState(EventLifecycleState processingState) {
- this.processingState = processingState;
- }
-
- @Override
- public boolean isAvailableForProcessing(DateTime now) {
-
- // Event got deactivated, will never be processed
- if (!isActive) {
- return false;
- }
-
- switch(processingState) {
- case AVAILABLE:
- break;
- case IN_PROCESSING:
- // Somebody already got the event, not available yet
- if (nextAvailableProcessingTime.isAfter(now)) {
- return false;
- }
- break;
- case PROCESSED:
- return false;
- default:
- throw new EntitlementError(String.format("Unkwnon IEvent processing state %s", processingState));
- }
- return effectiveDate.isBefore(now);
- }
//
- // Really used for unit tesrs only as the sql implementation relies on date first and then event insertion
+ // Really used for unit tests only as the sql implementation relies on date first and then event insertion
//
// Order first by:
// - effectiveDate, followed by processedDate, requestedDate
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java
index 104fbef..17f5e15 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/EventBaseBuilder.java
@@ -16,7 +16,6 @@
package com.ning.billing.entitlement.events;
-import com.ning.billing.entitlement.events.EventLifecycle.EventLifecycleState;
import org.joda.time.DateTime;
import java.util.UUID;
@@ -32,15 +31,11 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
private long activeVersion;
private boolean isActive;
- private UUID processingOwner;
- private DateTime nextAvailableProcessingTime;
- private EventLifecycleState processingState;
public EventBaseBuilder() {
this.uuid = UUID.randomUUID();
this.isActive = true;
- this.processingState = EventLifecycleState.AVAILABLE;
}
public EventBaseBuilder(EventBaseBuilder<?> copy) {
@@ -52,9 +47,6 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
this.activeVersion = copy.activeVersion;
this.isActive = copy.isActive;
- this.processingOwner = copy.processingOwner;
- this.nextAvailableProcessingTime = copy.nextAvailableProcessingTime;
- this.processingState = copy.processingState;
}
public T setUuid(UUID uuid) {
@@ -92,21 +84,6 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
return (T) this;
}
- public T setProcessingOwner(UUID processingOwner) {
- this.processingOwner = processingOwner;
- return (T) this;
- }
-
- public T setNextAvailableProcessingTime(DateTime nextAvailableProcessingTime) {
- this.nextAvailableProcessingTime = nextAvailableProcessingTime;
- return (T) this;
- }
-
- public T setProcessingState(EventLifecycleState processingState) {
- this.processingState = processingState;
- return (T) this;
- }
-
public UUID getUuid() {
return uuid;
}
@@ -134,16 +111,4 @@ public class EventBaseBuilder<T extends EventBaseBuilder<T>> {
public boolean isActive() {
return isActive;
}
-
- public UUID getProcessingOwner() {
- return processingOwner;
- }
-
- public DateTime getNextAvailableProcessingTime() {
- return nextAvailableProcessingTime;
- }
-
- public EventLifecycleState getProcessingState() {
- return processingState;
- }
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java
index 3f033a4..2438ddf 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/events/user/ApiEventBase.java
@@ -58,17 +58,6 @@ public class ApiEventBase extends EventBase implements ApiEvent {
}
- public ApiEventBase(UUID id, UUID subscriptionId, DateTime processed, String eventPlan, String eventPhase,
- String priceList, DateTime requestedDate, ApiEventType eventType, DateTime effectiveDate, long activeVersion,
- boolean isActive, UUID processingOwner, DateTime nextAvailableProcessingTime,EventLifecycleState processingState) {
- super(id, subscriptionId, requestedDate, effectiveDate, processed, activeVersion, isActive, processingOwner, nextAvailableProcessingTime, processingState);
- this.eventType = eventType;
- this.eventPlan = eventPlan;
- this.eventPlanPhase = eventPhase;
- this.eventPriceList = priceList;
- }
-
-
@Override
public ApiEventType getEventType() {
return eventType;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java b/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
index ab04700..a5ab3d8 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/glue/EntitlementModule.java
@@ -30,7 +30,6 @@ import com.ning.billing.entitlement.api.test.EntitlementTestApi;
import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
import com.ning.billing.entitlement.api.user.EntitlementUserApi;
import com.ning.billing.entitlement.api.user.SubscriptionApiService;
-import com.ning.billing.entitlement.engine.core.DefaultApiEventProcessor;
import com.ning.billing.entitlement.engine.core.Engine;
import com.ning.billing.entitlement.engine.core.EventNotifier;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
@@ -53,10 +52,6 @@ public class EntitlementModule extends AbstractModule {
bind(EntitlementConfig.class).toInstance(config);
}
- protected void installApiEventProcessor() {
- bind(EventNotifier.class).to(DefaultApiEventProcessor.class).asEagerSingleton();
- }
-
protected void installEntitlementDao() {
bind(EntitlementDao.class).to(EntitlementSqlDao.class).asEagerSingleton();
}
@@ -77,7 +72,6 @@ public class EntitlementModule extends AbstractModule {
protected void configure() {
installConfig();
installClock();
- installApiEventProcessor();
installEntitlementDao();
installEntitlementCore();
}
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
index 4540ad8..55ad7f4 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/ddl.sql
@@ -14,24 +14,9 @@ CREATE TABLE events (
plist_name varchar(64) DEFAULT NULL,
current_version int(11) DEFAULT 1,
is_active bool DEFAULT 1,
- processing_owner char(36) DEFAULT NULL,
- processing_available_dt datetime DEFAULT NULL,
- processing_state varchar(14) DEFAULT 'AVAILABLE',
PRIMARY KEY(id)
) ENGINE=innodb;
-DROP TABLE IF EXISTS claimed_events;
-CREATE TABLE claimed_events (
- id int(11) unsigned NOT NULL AUTO_INCREMENT,
- sequence_id int(11) unsigned NOT NULL,
- owner_id char(36) NOT NULL,
- hostname varchar(64) NOT NULL,
- claimed_dt datetime NOT NULL,
- event_id char(36) NOT NULL,
- PRIMARY KEY(id)
-) ENGINE=innodb;
-
-
DROP TABLE IF EXISTS subscriptions;
CREATE TABLE subscriptions (
id char(36) NOT NULL,
diff --git a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
index 31e9eaf..704e2c7 100644
--- a/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
+++ b/entitlement/src/main/resources/com/ning/billing/entitlement/engine/dao/EventSqlDao.sql.stg
@@ -1,8 +1,8 @@
group EventSqlDao;
-getReadyEvents(now, max) ::= <<
- select
- event_id
+getEventById(event_id) ::= <<
+ select
+ event_id
, event_type
, user_type
, created_dt
@@ -14,48 +14,11 @@ getReadyEvents(now, max) ::= <<
, phase_name
, plist_name
, current_version
- , is_active
- , processing_owner
- , processing_available_dt
- , processing_state
- from events
- where
- effective_dt \<= :now
- and is_active = 1
- and processing_state != 'PROCESSED'
- and (processing_owner IS NULL OR processing_available_dt \<= :now)
- order by
- effective_dt asc
- , created_dt asc
- , requested_dt asc
- , id asc
- limit :max
- ;
->>
-
-claimEvent(owner, next_available, event_id, now) ::= <<
- update events
- set
- processing_owner = :owner
- , processing_available_dt = :next_available
- , processing_state = 'IN_PROCESSING'
- where
- event_id = :event_id
- and is_active = 1
- and processing_state != 'PROCESSED'
- and (processing_owner IS NULL OR processing_available_dt \<= :now)
- ;
->>
-
-clearEvent(event_id, owner) ::= <<
- update events
- set
- processing_owner = NULL
- , processing_state = 'PROCESSED'
- where
+ , is_active
+ from events
+ where
event_id = :event_id
- and processing_owner = :owner
- ;
+ ;
>>
insertEvent() ::= <<
@@ -73,9 +36,6 @@ insertEvent() ::= <<
, plist_name
, current_version
, is_active
- , processing_owner
- , processing_available_dt
- , processing_state
) values (
:event_id
, :event_type
@@ -90,9 +50,6 @@ insertEvent() ::= <<
, :plist_name
, :current_version
, :is_active
- , :processing_owner
- , :processing_available_dt
- , :processing_state
);
>>
@@ -103,22 +60,6 @@ removeEvents(subscription_id) ::= <<
;
>>
-insertClaimedHistory(sequence_id, owner_id, hostname, claimed_dt, event_id) ::= <<
- insert into claimed_events (
- sequence_id
- , owner_id
- , hostname
- , claimed_dt
- , event_id
- ) values (
- :sequence_id
- , :owner_id
- , :hostname
- , :claimed_dt
- , :event_id
- );
->>
-
unactiveEvent(event_id, now) ::= <<
update events
set
@@ -154,9 +95,6 @@ getFutureActiveEventForSubscription(subscription_id, now) ::= <<
, plist_name
, current_version
, is_active
- , processing_owner
- , processing_available_dt
- , processing_state
from events
where
subscription_id = :subscription_id
@@ -185,9 +123,6 @@ getEventsForSubscription(subscription_id) ::= <<
, plist_name
, current_version
, is_active
- , processing_owner
- , processing_available_dt
- , processing_state
from events
where
subscription_id = :subscription_id
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java
index 187c080..87491c7 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiCancelSql.java
@@ -32,7 +32,7 @@ public class TestUserApiCancelSql extends TestUserApiCancel {
return Guice.createInjector(Stage.DEVELOPMENT, new MockEngineModuleSql());
}
- @Test(enabled= true, groups={"stress"})
+ @Test(enabled= false, groups={"stress"})
public void stressTest() {
for (int i = 0; i < MAX_STRESS_ITERATIONS; i++) {
cleanupTest();
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
index dc6567f..51c3d8b 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/user/TestUserApiChangePlan.java
@@ -198,6 +198,15 @@ public abstract class TestUserApiChangePlan extends TestApiBase {
testListener.pushExpectedEvent(NextEvent.PHASE);
clock.addDeltaFromReality(currentPhase.getDuration());
DateTime futureNow = clock.getUTCNow();
+
+ /*
+ try {
+ Thread.sleep(1000 * 3000);
+ } catch (Exception e) {
+
+ }
+ */
+
assertTrue(futureNow.isAfter(nextExpectedPhaseChange));
assertTrue(testListener.isCompleted(3000));
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java
index 74ac1e7..3623da2 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDao.java
@@ -17,5 +17,6 @@
package com.ning.billing.entitlement.engine.dao;
public interface MockEntitlementDao {
+
public void reset();
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index 7fc8342..86e458f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -31,16 +31,26 @@ import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionFactory;
import com.ning.billing.entitlement.api.user.SubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.engine.core.Engine;
import com.ning.billing.entitlement.events.EntitlementEvent;
import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
-import com.ning.billing.entitlement.events.EventLifecycle.EventLifecycleState;
import com.ning.billing.entitlement.events.user.ApiEvent;
import com.ning.billing.entitlement.events.user.ApiEventType;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationKey;
+import com.ning.billing.util.notificationq.NotificationLifecycle;
+import com.ning.billing.util.notificationq.NotificationQueue;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlementDao {
@@ -52,15 +62,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
private final Clock clock;
private final EntitlementConfig config;
private final SubscriptionFactory factory;
-
-
+ private final NotificationQueueService notificationQueueService;
@Inject
- public MockEntitlementDaoMemory(Clock clock, EntitlementConfig config, SubscriptionFactory factory) {
+ public MockEntitlementDaoMemory(Clock clock, EntitlementConfig config, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
super();
this.clock = clock;
this.config = config;
this.factory = factory;
+ this.notificationQueueService = notificationQueueService;
this.bundles = new ArrayList<SubscriptionBundle>();
this.subscriptions = new ArrayList<Subscription>();
this.events = new TreeSet<EntitlementEvent>();
@@ -138,6 +148,14 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
synchronized(events) {
events.addAll(initalEvents);
+ for (final EntitlementEvent cur : initalEvents) {
+ recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
+ @Override
+ public String toString() {
+ return cur.getId().toString();
+ }
+ });
+ }
}
Subscription updatedSubscription = buildSubscription(subscription);
subscriptions.add(updatedSubscription);
@@ -174,7 +192,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
List<EntitlementEvent> results = new LinkedList<EntitlementEvent>();
for (EntitlementEvent cur : events) {
if (cur.isActive() &&
- cur.getProcessingState() == EventLifecycleState.AVAILABLE &&
+ cur.getEffectiveDate().isAfter(clock.getUTCNow()) &&
cur.getSubscriptionId().equals(subscriptionId)) {
results.add(cur);
}
@@ -202,39 +220,6 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
}
- @Override
- public List<EntitlementEvent> getEventsReady(UUID ownerId, int sequenceId) {
- synchronized(events) {
- List<EntitlementEvent> readyList = new LinkedList<EntitlementEvent>();
- for (EntitlementEvent cur : events) {
- if (cur.isAvailableForProcessing(clock.getUTCNow())) {
-
- if (cur.getOwner() != null) {
- log.warn(String.format("EventProcessor %s stealing event %s from %s", ownerId, cur, cur.getOwner()));
- }
- cur.setOwner(ownerId);
- cur.setNextAvailableDate(clock.getUTCNow().plus(config.getDaoClaimTimeMs()));
- cur.setProcessingState(EventLifecycleState.IN_PROCESSING);
- readyList.add(cur);
- }
- }
- Collections.sort(readyList);
- return readyList;
- }
- }
-
- @Override
- public void clearEventsReady(UUID ownerId, Collection<EntitlementEvent> cleared) {
- synchronized(events) {
- for (EntitlementEvent cur : cleared) {
- if (cur.getOwner().equals(ownerId)) {
- cur.setProcessingState(EventLifecycleState.PROCESSED);
- } else {
- log.warn(String.format("EventProcessor %s trying to clear event %s that it does not own", ownerId, cur));
- }
- }
- }
- }
private Subscription buildSubscription(SubscriptionData in) {
return factory.createSubscription(new SubscriptionBuilder(in), getEventsForSubscription(in.getId()));
@@ -272,12 +257,26 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
cancelNextChangeEvent(subscriptionId);
cancelNextPhaseEvent(subscriptionId);
events.addAll(changeEvents);
+ for (final EntitlementEvent cur : changeEvents) {
+ recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
+ @Override
+ public String toString() {
+ return cur.getId().toString();
+ }
+ });
+ }
}
}
- private void insertEvent(EntitlementEvent event) {
+ private void insertEvent(final EntitlementEvent event) {
synchronized(events) {
events.add(event);
+ recordFutureNotificationFromTransaction(null, event.getEffectiveDate(), new NotificationKey() {
+ @Override
+ public String toString() {
+ return event.getId().toString();
+ }
+ });
}
}
@@ -298,10 +297,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
continue;
}
if (cur.getType() == EventType.PHASE &&
- cur.getProcessingState() == EventLifecycleState.AVAILABLE) {
+ cur.getEffectiveDate().isAfter(clock.getUTCNow())) {
cur.deactivate();
break;
}
+
}
}
}
@@ -319,7 +319,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
}
if (cur.getType() == EventType.API_USER &&
ApiEventType.CHANGE == ((ApiEvent) cur).getEventType() &&
- cur.getProcessingState() == EventLifecycleState.AVAILABLE) {
+ cur.getEffectiveDate().isAfter(clock.getUTCNow())) {
cur.deactivate();
break;
}
@@ -364,8 +364,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
SubscriptionBundleData bundleData = curBundle.getData();
for (SubscriptionMigrationData curSubscription : curBundle.getSubscriptions()) {
SubscriptionData subData = curSubscription.getData();
- for (EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
+ for (final EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
events.add(curEvent);
+ recordFutureNotificationFromTransaction(null, curEvent.getEffectiveDate(), new NotificationKey() {
+ @Override
+ public String toString() {
+ return curEvent.getId().toString();
+ }
+ });
+
}
subscriptions.add(subData);
}
@@ -393,4 +400,26 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
}
}
+
+ @Override
+ public EntitlementEvent getEventById(UUID eventId) {
+ synchronized(events) {
+ for (EntitlementEvent cur : events) {
+ if (cur.getId().equals(eventId)) {
+ return cur;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime effectiveDate, final NotificationKey notificationKey) {
+ try {
+ NotificationQueue subscritionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+ Engine.NOTIFICATION_QUEUE_NAME);
+ subscritionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
+ } catch (NoSuchNotificationQueue e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
index 2ebdea9..503fd1a 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
@@ -20,6 +20,8 @@ import com.google.inject.Inject;
import com.ning.billing.config.EntitlementConfig;
import com.ning.billing.entitlement.api.user.SubscriptionFactory;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService;
+
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
@@ -32,11 +34,12 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
private final ResetSqlDao resetDao;
@Inject
- public MockEntitlementDaoSql(DBI dbi, Clock clock, EntitlementConfig config, SubscriptionFactory factory) {
- super(dbi, clock, config, factory);
+ public MockEntitlementDaoSql(DBI dbi, Clock clock, SubscriptionFactory factory, NotificationQueueService notificationQueueService) {
+ super(dbi, clock, factory, notificationQueueService);
this.resetDao = dbi.onDemand(ResetSqlDao.class);
}
+
@Override
public void reset() {
resetDao.inTransaction(new Transaction<Void, ResetSqlDao>() {
@@ -45,9 +48,10 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
public Void inTransaction(ResetSqlDao dao, TransactionStatus status)
throws Exception {
resetDao.resetEvents();
- resetDao.resetClaimedEvents();
resetDao.resetSubscriptions();
resetDao.resetBundles();
+ resetDao.resetClaimedNotifications();
+ resetDao.resetNotifications();
return null;
}
});
@@ -58,14 +62,17 @@ public class MockEntitlementDaoSql extends EntitlementSqlDao implements MockEnti
@SqlUpdate("truncate table events")
public void resetEvents();
- @SqlUpdate("truncate table claimed_events")
- public void resetClaimedEvents();
-
@SqlUpdate("truncate table subscriptions")
public void resetSubscriptions();
@SqlUpdate("truncate table bundles")
public void resetBundles();
- }
+ @SqlUpdate("truncate table notifications")
+ public void resetNotifications();
+
+ @SqlUpdate("truncate table claimed_notifications")
+ public void resetClaimedNotifications();
+
+ }
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
index dc422a6..85f5e0a 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
@@ -20,6 +20,7 @@ import com.ning.billing.catalog.glue.CatalogModule;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.glue.EventBusModule;
+import com.ning.billing.util.glue.NotificationQueueModule;
public class MockEngineModule extends EntitlementModule {
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
index c6ce269..786f1e3 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
@@ -17,16 +17,12 @@
package com.ning.billing.entitlement.glue;
-import com.ning.billing.entitlement.engine.core.EventNotifier;
-import com.ning.billing.entitlement.engine.core.MockApiEventProcessorMemory;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.engine.dao.MockEntitlementDaoMemory;
+import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueService;
public class MockEngineModuleMemory extends MockEngineModule {
- @Override
- protected void installApiEventProcessor() {
- bind(EventNotifier.class).to(MockApiEventProcessorMemory.class).asEagerSingleton();
- }
@Override
protected void installEntitlementDao() {
@@ -34,8 +30,12 @@ public class MockEngineModuleMemory extends MockEngineModule {
}
+ private void installNotificationQueue() {
+ bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
+ }
@Override
protected void configure() {
super.configure();
+ installNotificationQueue();
}
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
index f1cd237..dbe2938 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
@@ -22,6 +22,8 @@ import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.engine.dao.MockEntitlementDaoSql;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.glue.NotificationQueueModule;
+
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
@@ -47,6 +49,7 @@ public class MockEngineModuleSql extends MockEngineModule {
@Override
protected void configure() {
installDBI();
+ install(new NotificationQueueModule());
super.configure();
}
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 802d16b..818d831 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -97,7 +97,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
final String notificationKey = r.getString("notification_key");
final DateTime effectiveDate = getDate(r, "effective_dt");
final DateTime nextAvailableDate = getDate(r, "processing_available_dt");
- final UUID processingOwner = (r.getString("processing_owner") != null) ? UUID.fromString(r.getString("processing_owner")) : null;
+ final String processingOwner = r.getString("processing_owner");
final NotificationLifecycleState processingState = NotificationLifecycleState.valueOf(r.getString("processing_state"));
return new DefaultNotification(id, processingOwner, nextAvailableDate,
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 56a8547..2946e13 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -23,14 +23,14 @@ import org.joda.time.DateTime;
public class DefaultNotification implements Notification {
private final UUID id;
- private final UUID owner;
+ private final String owner;
private final DateTime nextAvailableDate;
private final NotificationLifecycleState lifecycleState;
private final String notificationKey;
private final DateTime effectiveDate;
- public DefaultNotification(UUID id, UUID owner, DateTime nextAvailableDate,
+ public DefaultNotification(UUID id, String owner, DateTime nextAvailableDate,
NotificationLifecycleState lifecycleState,
String notificationKey, DateTime effectiveDate) {
super();
@@ -52,7 +52,7 @@ public class DefaultNotification implements Notification {
}
@Override
- public UUID getOwner() {
+ public String getOwner() {
return owner;
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
new file mode 100644
index 0000000..80f7385
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+
+public class DefaultNotificationQueue extends NotificationQueueBase {
+
+ protected final NotificationSqlDao dao;
+
+ public DefaultNotificationQueue(final DBI dbi, final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+ super(clock, svcName, queueName, handler, config);
+ this.dao = dbi.onDemand(NotificationSqlDao.class);
+ }
+
+ @Override
+ protected void doProcessEvents(int sequenceId) {
+ List<Notification> notifications = getReadyNotifications(sequenceId);
+ for (Notification cur : notifications) {
+ nbProcessedEvents.incrementAndGet();
+ handler.handleReadyNotification(cur.getNotificationKey());
+ }
+ // If anything happens before we get to clear those notifications, somebody else will pick them up
+ clearNotifications(notifications);
+ }
+
+ @Override
+ public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
+ final DateTime futureNotificationTime, final NotificationKey notificationKey) {
+ NotificationSqlDao transactionalNotificationDao = transactionalDao.become(NotificationSqlDao.class);
+ Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+ transactionalNotificationDao.insertNotification(notification);
+ }
+
+
+ private void clearNotifications(final Collection<Notification> cleared) {
+
+ log.debug(String.format("NotificationQueue %s clearEventsReady START cleared size = %d",
+ getFullQName(),
+ cleared.size()));
+
+ dao.inTransaction(new Transaction<Void, NotificationSqlDao>() {
+
+ @Override
+ public Void inTransaction(NotificationSqlDao transactional,
+ TransactionStatus status) throws Exception {
+ for (Notification cur : cleared) {
+ transactional.clearNotification(cur.getId().toString(), hostname);
+ log.debug(String.format("NotificationQueue %s cleared events %s", getFullQName(), cur.getId()));
+ }
+ return null;
+ }
+ });
+ }
+
+ private List<Notification> getReadyNotifications(final int seqId) {
+
+ final Date now = clock.getUTCNow().toDate();
+ final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
+
+ log.debug(String.format("NotificationQueue %s getEventsReady START effectiveNow = %s", getFullQName(), now));
+
+ List<Notification> result = dao.inTransaction(new Transaction<List<Notification>, NotificationSqlDao>() {
+
+ @Override
+ public List<Notification> inTransaction(NotificationSqlDao transactionalDao,
+ TransactionStatus status) throws Exception {
+
+ List<Notification> claimedNotifications = new ArrayList<Notification>();
+ List<Notification> input = transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents());
+ for (Notification cur : input) {
+ final boolean claimed = (transactionalDao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
+ if (claimed) {
+ claimedNotifications.add(cur);
+ transactionalDao.insertClaimedHistory(seqId, hostname, now, cur.getId().toString());
+ }
+ }
+ return claimedNotifications;
+ }
+ });
+
+ for (Notification cur : result) {
+ log.debug(String.format("NotificationQueue %sclaimed events %s",
+ getFullQName(), cur.getId()));
+ if (cur.getOwner() != null && !cur.getOwner().equals(hostname)) {
+ log.warn(String.format("NotificationQueue %s stealing notification %s from %s",
+ getFullQName(), cur, cur.getOwner()));
+ }
+ }
+ return result;
+ }
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index 6685e05..5181113 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -16,49 +16,26 @@
package com.ning.billing.util.notificationq;
-import java.util.Map;
-import java.util.TreeMap;
-
import org.skife.jdbi.v2.DBI;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
import com.ning.billing.util.clock.Clock;
-public class DefaultNotificationQueueService implements NotificationQueueService {
-
- private final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
private final DBI dbi;
- private final Clock clock;
-
- private final Map<String, NotificationQueue> queues;
+ @Inject
public DefaultNotificationQueueService(final DBI dbi, final Clock clock) {
+ super(clock);
this.dbi = dbi;
- this.clock = clock;
- this.queues = new TreeMap<String, NotificationQueue>();
}
@Override
- public NotificationQueue createNotificationQueue(String svcName,
+ protected NotificationQueue createNotificationQueueInternal(String svcName,
String queueName, NotificationQueueHandler handler,
NotificationConfig config) {
- if (svcName == null || queueName == null || handler == null || config == null) {
- throw new RuntimeException("Need to specify all parameters");
- }
-
- String compositeName = svcName + ":" + queueName;
- NotificationQueue result = null;
- synchronized(queues) {
- result = queues.get(compositeName);
- if (result == null) {
- result = new NotificationQueue(dbi, clock, svcName, queueName, handler, config);
- queues.put(compositeName, result);
- } else {
- log.warn("Queue for svc {} and name {} already exist", svcName, queueName);
- }
- }
- return result;
+ return new DefaultNotificationQueue(dbi, clock, svcName, queueName, handler, config);
}
+
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
index 7c81108..3200424 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationLifecycle.java
@@ -16,8 +16,6 @@
package com.ning.billing.util.notificationq;
-import java.util.UUID;
-
import org.joda.time.DateTime;
@@ -29,17 +27,12 @@ public interface NotificationLifecycle {
PROCESSED
}
- public UUID getOwner();
-
- //public void setOwner(UUID owner);
+ public String getOwner();
public DateTime getNextAvailableDate();
- //public void setNextAvailableDate(DateTime dateTime);
-
public NotificationLifecycleState getProcessingState();
- //public void setProcessingState(NotificationLifecycleState procesingState);
public boolean isAvailableForProcessing(DateTime now);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 5677335..23f0de0 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -16,260 +16,44 @@
package com.ning.billing.util.notificationq;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.joda.time.DateTime;
-import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.Transaction;
-import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.ning.billing.util.Hostname;
-import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
-import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
-
-
-public class NotificationQueue {
-
- protected final static Logger log = LoggerFactory.getLogger(NotificationQueue.class);
-
- private static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
- private final long STOP_WAIT_TIMEOUT_MS = 60000;
-
- private final String svcName;
- private final String queueName;
- private final NotificationQueueHandler handler;
- private final NotificationConfig config;
- private final NotificationSqlDao dao;
- private final Executor executor;
- private final Clock clock;
- private final String hostname;
-
- private static final AtomicInteger sequenceId = new AtomicInteger();
-
- protected AtomicLong nbProcessedEvents;
-
- // Use this object's monitor for synchronization (no need for volatile)
- protected boolean isProcessingEvents;
-
- // Package visibility on purpose
- NotificationQueue(final DBI dbi, final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
- this.clock = clock;
- this.svcName = svcName;
- this.queueName = queueName;
- this.handler = handler;
- this.config = config;
- this.hostname = Hostname.get();
- this.dao = dbi.onDemand(NotificationSqlDao.class);
-
- this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread th = new Thread(r);
- th.setName(NOTIFICATION_THREAD_PREFIX + svcName + "-" + queueName);
- th.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- log.error("Uncaught exception for thread " + t.getName(), e);
- }
- });
- return th;
- }
- });
- }
-
- /**
- *
- * Record from within a transaction the need to be called back when the notification is ready
- *
- * @param transactionalDao the transactionalDao
- * @param futureNotificationTime the time at which the notificatoin is ready
- * @param notificationKey the key for that notification
- */
- public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
- final DateTime futureNotificationTime, final NotificationKey notificationKey) {
- NotificationSqlDao transactionalNotificationDao = transactionalDao.become(NotificationSqlDao.class);
- Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
- transactionalNotificationDao.insertNotification(notification);
- }
+public interface NotificationQueue {
/**
- * Stops the queue.
- *
- * @see NotificationQueueHandler.completedQueueStop to be notified when the notification thread exited
- */
- public void stopQueue() {
- if (config.isNotificationProcessingOff()) {
- handler.completedQueueStop();
- return;
- }
-
- synchronized(this) {
- isProcessingEvents = false;
- try {
- log.info("NotificationQueue requested to stop");
- wait(STOP_WAIT_TIMEOUT_MS);
- log.info("NotificationQueue requested should have exited");
- } catch (InterruptedException e) {
- log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
- }
- }
-
- }
-
- /**
- * Starts the queue.
- *
- * @see NotificationQueueHandler.completedQueueStart to be notified when the notification thread started
- */
- public void startQueue() {
-
- this.isProcessingEvents = true;
- this.nbProcessedEvents = new AtomicLong();
-
-
- if (config.isNotificationProcessingOff()) {
- log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
- handler.completedQueueStart();
- return;
- }
- final NotificationQueue notificationQueue = this;
-
- executor.execute(new Runnable() {
- @Override
- public void run() {
-
- log.info(String.format("NotificationQueue thread %s [%d] started",
- Thread.currentThread().getName(),
- Thread.currentThread().getId()));
-
- // Thread is now started, notify the listener
- handler.completedQueueStart();
-
- try {
- while (true) {
-
- synchronized (notificationQueue) {
- if (!isProcessingEvents) {
- log.info(String.format("NotificationQueue has been requested to stop, thread %s [%d] stopping...",
- Thread.currentThread().getName(),
- Thread.currentThread().getId()));
- notificationQueue.notify();
- break;
- }
- }
-
- // Callback may trigger exceptions in user code so catch anything here and live with it.
- try {
- doProcessEvents(sequenceId.getAndIncrement());
- } catch (Exception e) {
- log.error(String.format("NotificationQueue thread %s [%d] got an exception..",
- Thread.currentThread().getName(),
- Thread.currentThread().getId()), e);
- }
- sleepALittle();
- }
- } catch (InterruptedException e) {
- log.warn(Thread.currentThread().getName() + " got interrupted ", e);
- } catch (Throwable e) {
- log.error(Thread.currentThread().getName() + " got an exception exiting...", e);
- // Just to make it really obvious in the log
- e.printStackTrace();
- } finally {
- handler.completedQueueStop();
- log.info(String.format("NotificationQueue thread %s [%d] exited...",
- Thread.currentThread().getName(),
- Thread.currentThread().getId()));
- }
- }
-
- private void sleepALittle() throws InterruptedException {
- Thread.sleep(config.getNotificationSleepTimeMs());
- }
- });
- }
-
- private void doProcessEvents(int sequenceId) {
- List<Notification> notifications = getReadyNotifications(sequenceId);
- for (Notification cur : notifications) {
- nbProcessedEvents.incrementAndGet();
- handler.handleReadyNotification(cur.getNotificationKey());
- }
- // If anything happens before we get to clear those notifications, somebody else will pick them up
- clearNotifications(notifications);
- }
-
- private String getFullQName() {
- return svcName + ":" + queueName;
- }
-
- private void clearNotifications(final Collection<Notification> cleared) {
-
- log.debug(String.format("NotificationQueue %s clearEventsReady START cleared size = %d",
- getFullQName(),
- cleared.size()));
-
- dao.inTransaction(new Transaction<Void, NotificationSqlDao>() {
-
- @Override
- public Void inTransaction(NotificationSqlDao transactional,
- TransactionStatus status) throws Exception {
- for (Notification cur : cleared) {
- transactional.clearNotification(cur.getId().toString(), hostname);
- log.debug(String.format("NotificationQueue %s cleared events %s", getFullQName(), cur.getId()));
- }
- return null;
- }
- });
- }
-
- private List<Notification> getReadyNotifications(final int seqId) {
-
- final Date now = clock.getUTCNow().toDate();
- final Date nextAvailable = clock.getUTCNow().plus(config.getDaoClaimTimeMs()).toDate();
-
- log.debug(String.format("NotificationQueue %s getEventsReady START effectiveNow = %s", getFullQName(), now));
-
- List<Notification> result = dao.inTransaction(new Transaction<List<Notification>, NotificationSqlDao>() {
-
- @Override
- public List<Notification> inTransaction(NotificationSqlDao transactionalDao,
- TransactionStatus status) throws Exception {
+ *
+ * Record from within a transaction the need to be called back when the notification is ready
+ *
+ * @param transactionalDao the transactionalDao
+ * @param futureNotificationTime the time at which the notification is ready
+ * @param notificationKey the key for that notification
+ */
+ public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
+ final DateTime futureNotificationTime, final NotificationKey notificationKey);
+
+ /**
+ * This is only valid when the queue has been configured with isNotificationProcessingOff is true
+ * In which case, it will callback users for all the ready notifications.
+ *
+ */
+ public void processReadyNotification();
+
+ /**
+ * Stops the queue.
+ *
+ * @see NotificationQueueHandler.completedQueueStop to be notified when the notification thread exited
+ */
+ public void stopQueue();
+
+ /**
+ * Starts the queue.
+ *
+ * @see NotificationQueueHandler.completedQueueStart to be notified when the notification thread started
+ */
+ public void startQueue();
- List<Notification> claimedNotifications = new ArrayList<Notification>();
- List<Notification> input = transactionalDao.getReadyNotifications(now, config.getDaoMaxReadyEvents());
- for (Notification cur : input) {
- final boolean claimed = (transactionalDao.claimNotification(hostname, nextAvailable, cur.getId().toString(), now) == 1);
- if (claimed) {
- claimedNotifications.add(cur);
- transactionalDao.insertClaimedHistory(seqId, hostname, now, cur.getId().toString());
- }
- }
- return claimedNotifications;
- }
- });
- for (Notification cur : result) {
- log.debug(String.format("NotificationQueue %sclaimed events %s",
- getFullQName(), cur.getId()));
- if (cur.getOwner() != null && !cur.getOwner().equals(hostname)) {
- log.warn(String.format("NotificationQueue %s stealing notification %s from %s",
- getFullQName(), cur, cur.getOwner()));
- }
- }
- return result;
- }
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
new file mode 100644
index 0000000..6eaf33f
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueBase.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Transaction;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ning.billing.util.Hostname;
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+
+
+public abstract class NotificationQueueBase implements NotificationQueue {
+
+ protected final static Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
+
+ protected static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
+ protected final long STOP_WAIT_TIMEOUT_MS = 60000;
+
+ protected final String svcName;
+ protected final String queueName;
+ protected final NotificationQueueHandler handler;
+ protected final NotificationConfig config;
+
+ protected final Executor executor;
+ protected final Clock clock;
+ protected final String hostname;
+
+ protected static final AtomicInteger sequenceId = new AtomicInteger();
+
+ protected AtomicLong nbProcessedEvents;
+
+ // Use this object's monitor for synchronization (no need for volatile)
+ protected boolean isProcessingEvents;
+
+ // Package visibility on purpose
+ NotificationQueueBase(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+ this.clock = clock;
+ this.svcName = svcName;
+ this.queueName = queueName;
+ this.handler = handler;
+ this.config = config;
+ this.hostname = Hostname.get();
+
+ this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread th = new Thread(r);
+ th.setName(NOTIFICATION_THREAD_PREFIX + svcName + "-" + queueName);
+ th.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ log.error("Uncaught exception for thread " + t.getName(), e);
+ }
+ });
+ return th;
+ }
+ });
+ }
+
+
+ @Override
+ public void processReadyNotification() {
+ // STEPH to be implemented
+ }
+
+
+ @Override
+ public void stopQueue() {
+ if (config.isNotificationProcessingOff()) {
+ handler.completedQueueStop();
+ return;
+ }
+
+ synchronized(this) {
+ isProcessingEvents = false;
+ try {
+ log.info("NotificationQueue requested to stop");
+ wait(STOP_WAIT_TIMEOUT_MS);
+ log.info("NotificationQueue requested should have exited");
+ } catch (InterruptedException e) {
+ log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
+ }
+ }
+
+ }
+
+ @Override
+ public void startQueue() {
+
+ this.isProcessingEvents = true;
+ this.nbProcessedEvents = new AtomicLong();
+
+
+ if (config.isNotificationProcessingOff()) {
+ log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
+ handler.completedQueueStart();
+ return;
+ }
+ final NotificationQueueBase notificationQueue = this;
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+
+ log.info(String.format("NotificationQueue thread %s [%d] started",
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()));
+
+ // Thread is now started, notify the listener
+ handler.completedQueueStart();
+
+ try {
+ while (true) {
+
+ synchronized (notificationQueue) {
+ if (!isProcessingEvents) {
+ log.info(String.format("NotificationQueue has been requested to stop, thread %s [%d] stopping...",
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()));
+ notificationQueue.notify();
+ break;
+ }
+ }
+
+ // Callback may trigger exceptions in user code so catch anything here and live with it.
+ try {
+ doProcessEvents(sequenceId.getAndIncrement());
+ } catch (Exception e) {
+ log.error(String.format("NotificationQueue thread %s [%d] got an exception..",
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()), e);
+ }
+ sleepALittle();
+ }
+ } catch (InterruptedException e) {
+ log.warn(Thread.currentThread().getName() + " got interrupted ", e);
+ } catch (Throwable e) {
+ log.error(Thread.currentThread().getName() + " got an exception exiting...", e);
+ // Just to make it really obvious in the log
+ e.printStackTrace();
+ } finally {
+ handler.completedQueueStop();
+ log.info(String.format("NotificationQueue thread %s [%d] exited...",
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()));
+ }
+ }
+
+ private void sleepALittle() throws InterruptedException {
+ Thread.sleep(config.getNotificationSleepTimeMs());
+ }
+ });
+ }
+
+
+ protected String getFullQName() {
+ return svcName + ":" + queueName;
+ }
+
+ protected abstract void doProcessEvents(int sequenceId);
+}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index d1544c8..a18906b 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -16,6 +16,8 @@
package com.ning.billing.util.notificationq;
+import java.util.NoSuchElementException;
+
public interface NotificationQueueService {
@@ -37,6 +39,22 @@ public interface NotificationQueueService {
public void completedQueueStop();
}
+ public static final class NotficationQueueAlreadyExists extends Exception {
+ private static final long serialVersionUID = 1541281L;
+
+ public NotficationQueueAlreadyExists(String msg) {
+ super(msg);
+ }
+ }
+
+ public static final class NoSuchNotificationQueue extends Exception {
+ private static final long serialVersionUID = 1541281L;
+
+ public NoSuchNotificationQueue(String msg) {
+ super(msg);
+ }
+ }
+
/**
* Creates a new NotificationQueue for a given associated with the given service and queueName
*
@@ -46,6 +64,23 @@ public interface NotificationQueueService {
* @param config the notification queue configuration
*
* @return a new NotificationQueue
+ *
+ * @throws NotficationQueueAlreadyExists is the queue associated with that service and name already exits
+ *
*/
- NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config);
+ NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config)
+ throws NotficationQueueAlreadyExists;
+
+ /**
+ * Retrieves an already created NotificationQueue by service and name if it exists
+ *
+ * @param svcName
+ * @param queueName
+ * @return
+ *
+ * @throws NoSuchNotificationQueue if queue does not exist
+ */
+ NotificationQueue getNotificationQueue(final String svcName, final String queueName)
+ throws NoSuchNotificationQueue;
+
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
new file mode 100644
index 0000000..a4dc64e
--- /dev/null
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.ning.billing.util.clock.Clock;
+
+public abstract class NotificationQueueServiceBase implements NotificationQueueService {
+
+ protected final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+
+ protected final Clock clock;
+
+ private final Map<String, NotificationQueue> queues;
+
+ @Inject
+ public NotificationQueueServiceBase(final Clock clock) {
+
+ this.clock = clock;
+ this.queues = new TreeMap<String, NotificationQueue>();
+ }
+
+ @Override
+ public NotificationQueue createNotificationQueue(String svcName,
+ String queueName, NotificationQueueHandler handler,
+ NotificationConfig config) throws NotficationQueueAlreadyExists {
+ if (svcName == null || queueName == null || handler == null || config == null) {
+ throw new RuntimeException("Need to specify all parameters");
+ }
+
+ String compositeName = getCompositeName(svcName, queueName);
+ NotificationQueue result = null;
+ synchronized(queues) {
+ result = queues.get(compositeName);
+ if (result != null) {
+ throw new NotficationQueueAlreadyExists(String.format("Queue for svc %s and name %s already exist",
+ svcName, queueName));
+ }
+ result = createNotificationQueueInternal(svcName, queueName, handler, config);
+ queues.put(compositeName, result);
+ }
+ return result;
+ }
+
+ @Override
+ public NotificationQueue getNotificationQueue(String svcName,
+ String queueName) throws NoSuchNotificationQueue {
+
+ NotificationQueue result = null;
+ String compositeName = getCompositeName(svcName, queueName);
+ synchronized(queues) {
+ result = queues.get(compositeName);
+ if (result == null) {
+ throw new NoSuchNotificationQueue(String.format("Queue for svc %s and name %s does not exist",
+ svcName, queueName));
+ }
+ }
+ return result;
+ }
+
+
+ protected abstract NotificationQueue createNotificationQueueInternal(String svcName,
+ String queueName, NotificationQueueHandler handler,
+ NotificationConfig config);
+
+
+ private String getCompositeName(String svcName, String queueName) {
+ return svcName + ":" + queueName;
+ }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
new file mode 100644
index 0000000..11721c8
--- /dev/null
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010-2011 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.util.notificationq;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.joda.time.DateTime;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+
+import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationLifecycle.NotificationLifecycleState;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+
+public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
+
+
+ private TreeSet<Notification> notifications;
+
+ public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
+ super(clock, svcName, queueName, handler, config);
+ notifications = new TreeSet<Notification>(new Comparator<Notification>() {
+ @Override
+ public int compare(Notification o1, Notification o2) {
+ if (o1.getEffectiveDate().equals(o2.getEffectiveDate())) {
+ return o1.getNotificationKey().compareTo(o2.getNotificationKey());
+ } else {
+ return o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
+ }
+ }
+ });
+ }
+
+ @Override
+ public void recordFutureNotificationFromTransaction(
+ Transmogrifier transactionalDao, DateTime futureNotificationTime,
+ NotificationKey notificationKey) {
+ Notification notification = new DefaultNotification(notificationKey.toString(), futureNotificationTime);
+ synchronized(notifications) {
+ notifications.add(notification);
+ }
+ }
+
+ @Override
+ protected void doProcessEvents(int sequenceId) {
+
+ List<Notification> processedNotifications = new ArrayList<Notification>();
+ synchronized(notifications) {
+ Iterator<Notification> it = notifications.iterator();
+ while (it.hasNext()) {
+ Notification cur = it.next();
+ if (cur.isAvailableForProcessing(clock.getUTCNow())) {
+ handler.handleReadyNotification(cur.getNotificationKey());
+ DefaultNotification processedNotification = new DefaultNotification(cur.getId(), hostname, clock.getUTCNow().plus(config.getDaoClaimTimeMs()), NotificationLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+ it.remove();
+ processedNotifications.add(processedNotification);
+ }
+ }
+ if (processedNotifications.size() > 0) {
+ notifications.addAll(processedNotifications);
+ }
+ }
+ }
+}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 7129351..2e3bb3c 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -116,7 +116,7 @@ public class TestNotificationQueue {
public void testSimpleQueueDisabled() throws InterruptedException {
final TestStartStop testStartStop = new TestStartStop(false, false);
- NotificationQueue queue = new NotificationQueue(dbi, clock, "test-svc", "dead",
+ DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "dead",
new NotificationQueueHandler() {
@Override
public void handleReadyNotification(String notificationKey) {
@@ -134,7 +134,7 @@ public class TestNotificationQueue {
executeTest(testStartStop, queue, new WithTest() {
@Override
- public void test(final NotificationQueue readyQueue) throws InterruptedException {
+ public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
// Do nothing
}
});
@@ -153,7 +153,7 @@ public class TestNotificationQueue {
final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
final TestStartStop testStartStop = new TestStartStop(false, false);
- NotificationQueue queue = new NotificationQueue(dbi, clock, "test-svc", "foo",
+ DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
new NotificationQueueHandler() {
@Override
public void handleReadyNotification(String notificationKey) {
@@ -176,7 +176,7 @@ public class TestNotificationQueue {
executeTest(testStartStop, queue, new WithTest() {
@Override
- public void test(final NotificationQueue readyQueue) throws InterruptedException {
+ public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
final UUID key = UUID.randomUUID();
final DummyObject obj = new DummyObject("foo", key);
@@ -229,7 +229,7 @@ public class TestNotificationQueue {
final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
final TestStartStop testStartStop = new TestStartStop(false, false);
- NotificationQueue queue = new NotificationQueue(dbi, clock, "test-svc", "many",
+ DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
new NotificationQueueHandler() {
@Override
public void handleReadyNotification(String notificationKey) {
@@ -252,7 +252,7 @@ public class TestNotificationQueue {
executeTest(testStartStop, queue, new WithTest() {
@Override
- public void test(final NotificationQueue readyQueue) throws InterruptedException {
+ public void test(final DefaultNotificationQueue readyQueue) throws InterruptedException {
final DateTime now = clock.getUTCNow();
final int MAX_NOTIFICATIONS = 100;
@@ -388,11 +388,11 @@ public class TestNotificationQueue {
}
private interface WithTest {
- public void test(NotificationQueue readyQueue) throws InterruptedException;
+ public void test(DefaultNotificationQueue readyQueue) throws InterruptedException;
}
private void executeTest(final TestStartStop testStartStop,
- NotificationQueue queue, WithTest test) throws InterruptedException{
+ DefaultNotificationQueue queue, WithTest test) throws InterruptedException{
queue.startQueue();
boolean started = testStartStop.waitForStartComplete(3000);