killbill-aplcache
Changes
entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java 20(+6 -14)
entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java 146(+75 -71)
entitlement/src/test/java/com/ning/billing/entitlement/engine/core/TestEntitlementNotificationKey.java 45(+0 -45)
entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java 39(+9 -30)
invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java 78(+42 -36)
invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java 10(+4 -6)
Details
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 3e8c025..a4a926e 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import javax.swing.text.html.HTMLDocument.HTMLReader.IsindexAction;
+
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +61,7 @@ import com.ning.billing.util.callcontext.CallContextFactory;
import com.ning.billing.util.callcontext.CallOrigin;
import com.ning.billing.util.callcontext.UserType;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.NotificationKey;
import com.ning.billing.util.notificationq.NotificationQueue;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
@@ -87,11 +90,11 @@ public class Engine implements EventListener, EntitlementService {
@Inject
public Engine(final Clock clock, final EntitlementDao dao, final PlanAligner planAligner,
- final EntitlementConfig config,
- final AddonUtils addonUtils, final Bus eventBus,
- final NotificationQueueService notificationQueueService,
- final SubscriptionFactory subscriptionFactory,
- final CallContextFactory factory) {
+ final EntitlementConfig config,
+ final AddonUtils addonUtils, final Bus eventBus,
+ final NotificationQueueService notificationQueueService,
+ final SubscriptionFactory subscriptionFactory,
+ final CallContextFactory factory) {
super();
this.clock = clock;
this.dao = dao;
@@ -114,35 +117,40 @@ public class Engine implements EventListener, EntitlementService {
try {
subscriptionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
- NOTIFICATION_QUEUE_NAME,
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final String inputKey, final DateTime eventDateTime) {
-
- final EntitlementNotificationKey key = new EntitlementNotificationKey(inputKey);
- final EntitlementEvent event = dao.getEventById(key.getEventId());
- if (event == null) {
- log.warn("Failed to extract event for notification key {}", inputKey);
- return;
- }
- final UUID userToken = (event.getType() == EventType.API_USER) ? ((ApiEvent) event).getUserToken() : null;
- final CallContext context = factory.createCallContext("SubscriptionEventQueue", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
- processEventReady(event, key.getSeqId(), context);
- }
- },
- new NotificationConfig() {
-
- @Override
- public long getSleepTimeMs() {
- return config.getSleepTimeMs();
- }
-
- @Override
- public boolean isNotificationProcessingOff() {
- return config.isNotificationProcessingOff();
- }
- }
- );
+ NOTIFICATION_QUEUE_NAME,
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime) {
+
+ if (! (inputKey instanceof EntitlementNotificationKey)) {
+ log.error("Entitlement service received an unexpected event type {}" + inputKey.getClass().getName());
+ return;
+ }
+ EntitlementNotificationKey key = (EntitlementNotificationKey) inputKey;
+
+ final EntitlementEvent event = dao.getEventById(key.getEventId());
+ if (event == null) {
+ log.warn("Failed to extract event for notification key {}", inputKey);
+ return;
+ }
+ final UUID userToken = (event.getType() == EventType.API_USER) ? ((ApiEvent) event).getUserToken() : null;
+ final CallContext context = factory.createCallContext("SubscriptionEventQueue", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
+ processEventReady(event, key.getSeqId(), context);
+ }
+ },
+ new NotificationConfig() {
+
+ @Override
+ public long getSleepTimeMs() {
+ return config.getSleepTimeMs();
+ }
+
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return config.isNotificationProcessingOff();
+ }
+ }
+ );
} catch (NotificationQueueAlreadyExists e) {
throw new RuntimeException(e);
}
@@ -202,10 +210,10 @@ public class Engine implements EventListener, EntitlementService {
final TimedPhase nextTimedPhase = planAligner.getNextTimedPhase(subscription, now, now);
final PhaseEvent nextPhaseEvent = (nextTimedPhase != null) ?
PhaseEventData.createNextPhaseEvent(nextTimedPhase.getPhase().getName(), subscription, now, nextTimedPhase.getStartPhase()) :
- null;
- if (nextPhaseEvent != null) {
- dao.createNextPhaseEvent(subscription.getId(), nextPhaseEvent, context);
- }
+ null;
+ if (nextPhaseEvent != null) {
+ dao.createNextPhaseEvent(subscription.getId(), nextPhaseEvent, context);
+ }
} catch (EntitlementError e) {
log.error(String.format("Failed to insert next phase for subscription %s", subscription.getId()), e);
}
@@ -238,13 +246,13 @@ public class Engine implements EventListener, EntitlementService {
// Perform AO cancellation using the effectiveDate of the BP
//
final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
- .setSubscriptionId(cur.getId())
- .setActiveVersion(cur.getActiveVersion())
- .setProcessedDate(now)
- .setEffectiveDate(event.getEffectiveDate())
- .setRequestedDate(now)
- .setUserToken(context.getUserToken())
- .setFromDisk(true));
+ .setSubscriptionId(cur.getId())
+ .setActiveVersion(cur.getActiveVersion())
+ .setProcessedDate(now)
+ .setEffectiveDate(event.getEffectiveDate())
+ .setRequestedDate(now)
+ .setUserToken(context.getUserToken())
+ .setFromDisk(true));
addOnCancellations.put(cur.getId(), cancelEvent);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java
index ee35526..1327a72 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EntitlementNotificationKey.java
@@ -17,16 +17,19 @@ package com.ning.billing.entitlement.engine.core;
import java.util.UUID;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.ning.billing.util.notificationq.NotificationKey;
public class EntitlementNotificationKey implements NotificationKey {
- private static final String DELIMITER = ":";
-
private final UUID eventId;
private final int seqId;
- public EntitlementNotificationKey(final UUID eventId, final int seqId) {
+
+ @JsonCreator
+ public EntitlementNotificationKey(@JsonProperty("eventId") final UUID eventId,
+ @JsonProperty("seqId") final int seqId) {
this.eventId = eventId;
this.seqId = seqId;
}
@@ -35,17 +38,6 @@ public class EntitlementNotificationKey implements NotificationKey {
this(eventId, 0);
}
- public EntitlementNotificationKey(final String input) {
-
- final String[] parts = input.split(DELIMITER);
- eventId = UUID.fromString(parts[0]);
- if (parts.length == 2) {
- seqId = Integer.valueOf(parts[1]);
- } else {
- seqId = 0;
- }
- }
-
public UUID getEventId() {
return eventId;
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
index fab86f4..7316c3b 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
@@ -17,6 +17,8 @@
package com.ning.billing.entitlement.engine.dao;
import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -89,8 +91,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
@Inject
public AuditedEntitlementDao(final IDBI dbi, final Clock clock,
- final AddonUtils addonUtils, final NotificationQueueService notificationQueueService,
- final Bus eventBus) {
+ final AddonUtils addonUtils, final NotificationQueueService notificationQueueService,
+ final Bus eventBus) {
this.clock = clock;
this.subscriptionsDao = dbi.onDemand(SubscriptionSqlDao.class);
@@ -188,7 +190,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
subscriptionsDao.inTransaction(new Transaction<Void, SubscriptionSqlDao>() {
@Override
public Void inTransaction(final SubscriptionSqlDao transactionalDao,
- final TransactionStatus status) throws Exception {
+ final TransactionStatus status) throws Exception {
final String subscriptionId = subscription.getId().toString();
transactionalDao.updateChargedThroughDate(subscription.getId().toString(), ctd, context);
final Long subscriptionRecordId = transactionalDao.getRecordId(subscriptionId);
@@ -220,8 +222,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
transactional.insertAuditFromTransaction(audit, context);
recordFutureNotificationFromTransaction(transactional,
- nextPhase.getEffectiveDate(),
- new EntitlementNotificationKey(nextPhase.getId()));
+ nextPhase.getEffectiveDate(),
+ new EntitlementNotificationKey(nextPhase.getId()));
return null;
}
});
@@ -242,7 +244,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
return subscriptionsDao.inTransaction(new Transaction<Map<UUID, List<EntitlementEvent>>, SubscriptionSqlDao>() {
@Override
public Map<UUID, List<EntitlementEvent>> inTransaction(final SubscriptionSqlDao transactional,
- final TransactionStatus status) throws Exception {
+ final TransactionStatus status) throws Exception {
final List<Subscription> subscriptions = transactional.getSubscriptionsFromBundleId(bundleId.toString());
if (subscriptions.size() == 0) {
return Collections.emptyMap();
@@ -266,13 +268,13 @@ public class AuditedEntitlementDao implements EntitlementDao {
@Override
public void createSubscription(final SubscriptionData subscription,
- final List<EntitlementEvent> initialEvents, final CallContext context) {
+ final List<EntitlementEvent> initialEvents, final CallContext context) {
subscriptionsDao.inTransaction(new Transaction<Void, SubscriptionSqlDao>() {
@Override
public Void inTransaction(final SubscriptionSqlDao transactional,
- final TransactionStatus status) throws Exception {
+ final TransactionStatus status) throws Exception {
transactional.insertSubscription(subscription, context);
final Long subscriptionRecordId = transactional.getRecordId(subscription.getId().toString());
@@ -288,8 +290,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
final Long recordId = eventsDaoFromSameTransaction.getRecordId(cur.getId().toString());
audits.add(new EntityAudit(TableName.SUBSCRIPTION_EVENTS, recordId, ChangeType.INSERT));
recordFutureNotificationFromTransaction(transactional,
- cur.getEffectiveDate(),
- new EntitlementNotificationKey(cur.getId()));
+ cur.getEffectiveDate(),
+ new EntitlementNotificationKey(cur.getId()));
}
eventsDaoFromSameTransaction.insertAuditFromTransaction(audits, context);
@@ -300,12 +302,12 @@ public class AuditedEntitlementDao implements EntitlementDao {
@Override
public void recreateSubscription(final UUID subscriptionId,
- final List<EntitlementEvent> recreateEvents, final CallContext context) {
+ final List<EntitlementEvent> recreateEvents, final CallContext context) {
eventsDao.inTransaction(new Transaction<Void, EntitlementEventSqlDao>() {
@Override
public Void inTransaction(final EntitlementEventSqlDao transactional,
- final TransactionStatus status) throws Exception {
+ final TransactionStatus status) throws Exception {
final List<EntityAudit> audits = new ArrayList<EntityAudit>();
for (final EntitlementEvent cur : recreateEvents) {
@@ -313,8 +315,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
final Long recordId = transactional.getRecordId(cur.getId().toString());
audits.add(new EntityAudit(TableName.SUBSCRIPTION_EVENTS, recordId, ChangeType.INSERT));
recordFutureNotificationFromTransaction(transactional,
- cur.getEffectiveDate(),
- new EntitlementNotificationKey(cur.getId()));
+ cur.getEffectiveDate(),
+ new EntitlementNotificationKey(cur.getId()));
}
@@ -330,7 +332,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
eventsDao.inTransaction(new Transaction<Void, EntitlementEventSqlDao>() {
@Override
public Void inTransaction(final EntitlementEventSqlDao transactional,
- final TransactionStatus status) throws Exception {
+ final TransactionStatus status) throws Exception {
cancelNextCancelEventFromTransaction(subscriptionId, transactional, context);
cancelNextChangeEventFromTransaction(subscriptionId, transactional, context);
cancelNextPhaseEventFromTransaction(subscriptionId, transactional, context);
@@ -342,8 +344,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
transactional.insertAuditFromTransaction(audit, context);
recordFutureNotificationFromTransaction(transactional,
- cancelEvent.getEffectiveDate(),
- new EntitlementNotificationKey(cancelEvent.getId(), seqId));
+ cancelEvent.getEffectiveDate(),
+ new EntitlementNotificationKey(cancelEvent.getId(), seqId));
return null;
}
});
@@ -356,7 +358,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
@Override
public Void inTransaction(final EntitlementEventSqlDao transactional,
- final TransactionStatus status) throws Exception {
+ final TransactionStatus status) throws Exception {
EntitlementEvent cancelledEvent = null;
final Date now = clock.getUTCNow().toDate();
@@ -384,8 +386,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
final Long recordId = transactional.getRecordId(cur.getId().toString());
eventAudits.add(new EntityAudit(TableName.SUBSCRIPTION_EVENTS, recordId, ChangeType.INSERT));
recordFutureNotificationFromTransaction(transactional,
- cur.getEffectiveDate(),
- new EntitlementNotificationKey(cur.getId()));
+ cur.getEffectiveDate(),
+ new EntitlementNotificationKey(cur.getId()));
}
transactional.insertAuditFromTransaction(eventAudits, context);
@@ -410,8 +412,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
eventAudits.add(new EntityAudit(TableName.SUBSCRIPTION_EVENTS, recordId, ChangeType.INSERT));
recordFutureNotificationFromTransaction(transactional,
- cur.getEffectiveDate(),
- new EntitlementNotificationKey(cur.getId()));
+ cur.getEffectiveDate(),
+ new EntitlementNotificationKey(cur.getId()));
}
transactional.insertAuditFromTransaction(eventAudits, context);
@@ -433,8 +435,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
}
private void cancelFutureEventFromTransaction(final UUID subscriptionId, final EntitlementEventSqlDao dao,
- final EventType type, @Nullable final ApiEventType apiType,
- final CallContext context) {
+ final EventType type, @Nullable final ApiEventType apiType,
+ final CallContext context) {
EntitlementEvent futureEvent = null;
final Date now = clock.getUTCNow().toDate();
@@ -445,7 +447,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
if (futureEvent != null) {
throw new EntitlementError(
String.format("Found multiple future events for type %s for subscriptions %s",
- type, subscriptionId.toString()));
+ type, subscriptionId.toString()));
}
futureEvent = cur;
}
@@ -504,46 +506,46 @@ public class AuditedEntitlementDao implements EntitlementDao {
Subscription reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
switch (cur.getCategory()) {
- case BASE:
- final Collection<EntitlementEvent> futureApiEvents = Collections2.filter(events, new Predicate<EntitlementEvent>() {
- @Override
- public boolean apply(final EntitlementEvent input) {
- return (input.getEffectiveDate().isAfter(clock.getUTCNow()) &&
- ((input instanceof ApiEventCancel) || (input instanceof ApiEventChange)));
- }
- });
- futureBaseEvent = (futureApiEvents.size() == 0) ? null : futureApiEvents.iterator().next();
- break;
-
- case ADD_ON:
- final Plan targetAddOnPlan = reloaded.getCurrentPlan();
- final String baseProductName = (futureBaseEvent instanceof ApiEventChange) ?
- ((ApiEventChange) futureBaseEvent).getEventPlan() : null;
-
- final boolean createCancelEvent = (futureBaseEvent != null) &&
- ((futureBaseEvent instanceof ApiEventCancel) ||
- ((!addonUtils.isAddonAvailableFromPlanName(baseProductName, futureBaseEvent.getEffectiveDate(), targetAddOnPlan)) ||
- (addonUtils.isAddonIncludedFromPlanName(baseProductName, futureBaseEvent.getEffectiveDate(), targetAddOnPlan))));
-
- if (createCancelEvent) {
- final DateTime now = clock.getUTCNow();
- final EntitlementEvent addOnCancelEvent = new ApiEventCancel(new ApiEventBuilder()
- .setSubscriptionId(reloaded.getId())
- .setActiveVersion(((SubscriptionData) reloaded).getActiveVersion())
- .setProcessedDate(now)
- .setEffectiveDate(futureBaseEvent.getEffectiveDate())
- .setRequestedDate(now)
- // This event is only there to indicate the ADD_ON is future canceled, but it is not there
- // on disk until the base plan cancellation becomes effective
- .setFromDisk(false));
-
- events.add(addOnCancelEvent);
- // Finally reload subscription with full set of events
- reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
+ case BASE:
+ final Collection<EntitlementEvent> futureApiEvents = Collections2.filter(events, new Predicate<EntitlementEvent>() {
+ @Override
+ public boolean apply(final EntitlementEvent input) {
+ return (input.getEffectiveDate().isAfter(clock.getUTCNow()) &&
+ ((input instanceof ApiEventCancel) || (input instanceof ApiEventChange)));
}
- break;
- default:
- break;
+ });
+ futureBaseEvent = (futureApiEvents.size() == 0) ? null : futureApiEvents.iterator().next();
+ break;
+
+ case ADD_ON:
+ final Plan targetAddOnPlan = reloaded.getCurrentPlan();
+ final String baseProductName = (futureBaseEvent instanceof ApiEventChange) ?
+ ((ApiEventChange) futureBaseEvent).getEventPlan() : null;
+
+ final boolean createCancelEvent = (futureBaseEvent != null) &&
+ ((futureBaseEvent instanceof ApiEventCancel) ||
+ ((!addonUtils.isAddonAvailableFromPlanName(baseProductName, futureBaseEvent.getEffectiveDate(), targetAddOnPlan)) ||
+ (addonUtils.isAddonIncludedFromPlanName(baseProductName, futureBaseEvent.getEffectiveDate(), targetAddOnPlan))));
+
+ if (createCancelEvent) {
+ final DateTime now = clock.getUTCNow();
+ final EntitlementEvent addOnCancelEvent = new ApiEventCancel(new ApiEventBuilder()
+ .setSubscriptionId(reloaded.getId())
+ .setActiveVersion(((SubscriptionData) reloaded).getActiveVersion())
+ .setProcessedDate(now)
+ .setEffectiveDate(futureBaseEvent.getEffectiveDate())
+ .setRequestedDate(now)
+ // This event is only there to indicate the ADD_ON is future canceled, but it is not there
+ // on disk until the base plan cancellation becomes effective
+ .setFromDisk(false));
+
+ events.add(addOnCancelEvent);
+ // Finally reload subscription with full set of events
+ reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
+ }
+ break;
+ default:
+ break;
}
result.add(reloaded);
@@ -558,7 +560,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
@Override
public Void inTransaction(final EntitlementEventSqlDao transactional,
- final TransactionStatus status) throws Exception {
+ final TransactionStatus status) throws Exception {
final SubscriptionSqlDao transSubDao = transactional.become(SubscriptionSqlDao.class);
final BundleSqlDao transBundleDao = transactional.become(BundleSqlDao.class);
@@ -579,8 +581,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
audits.add(new EntityAudit(TableName.SUBSCRIPTION_EVENTS, recordId, ChangeType.INSERT));
recordFutureNotificationFromTransaction(transactional,
- curEvent.getEffectiveDate(),
- new EntitlementNotificationKey(curEvent.getId()));
+ curEvent.getEffectiveDate(),
+ new EntitlementNotificationKey(curEvent.getId()));
}
transSubDao.insertSubscription(subData, context);
recordId = transSubDao.getRecordId(subData.getId().toString());
@@ -604,7 +606,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
@Override
public Void inTransaction(final SubscriptionSqlDao transactional,
- final TransactionStatus status) throws Exception {
+ final TransactionStatus status) throws Exception {
final EntitlementEventSqlDao transEventDao = transactional.become(EntitlementEventSqlDao.class);
for (final SubscriptionDataRepair cur : inRepair) {
@@ -616,8 +618,8 @@ public class AuditedEntitlementDao implements EntitlementDao {
transEventDao.insertEvent(event, context);
if (event.getEffectiveDate().isAfter(clock.getUTCNow())) {
recordFutureNotificationFromTransaction(transactional,
- event.getEffectiveDate(),
- new EntitlementNotificationKey(event.getId()));
+ event.getEffectiveDate(),
+ new EntitlementNotificationKey(event.getId()));
}
}
}
@@ -645,10 +647,12 @@ public class AuditedEntitlementDao implements EntitlementDao {
private void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao, final DateTime effectiveDate, final NotificationKey notificationKey) {
try {
final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
- Engine.NOTIFICATION_QUEUE_NAME);
+ Engine.NOTIFICATION_QUEUE_NAME);
subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
} catch (NoSuchNotificationQueue e) {
throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index 5b905ac..ec5c17e 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -16,6 +16,7 @@
package com.ning.billing.entitlement.engine.dao;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -46,6 +47,7 @@ import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.engine.core.Engine;
+import com.ning.billing.entitlement.engine.core.EntitlementNotificationKey;
import com.ning.billing.entitlement.events.EntitlementEvent;
import com.ning.billing.entitlement.events.EntitlementEvent.EventType;
import com.ning.billing.entitlement.events.user.ApiEvent;
@@ -159,12 +161,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
synchronized (events) {
events.addAll(initialEvents);
for (final EntitlementEvent cur : initialEvents) {
- recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
- @Override
- public String toString() {
- return cur.getId().toString();
- }
- });
+ recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new EntitlementNotificationKey(cur.getId()));
}
}
final Subscription updatedSubscription = buildSubscription(null, subscription);
@@ -178,12 +175,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
synchronized (events) {
events.addAll(recreateEvents);
for (final EntitlementEvent cur : recreateEvents) {
- recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
- @Override
- public String toString() {
- return cur.getId().toString();
- }
- });
+ recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new EntitlementNotificationKey(cur.getId()));
}
}
}
@@ -295,12 +287,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
cancelNextPhaseEvent(subscriptionId);
events.addAll(changeEvents);
for (final EntitlementEvent cur : changeEvents) {
- recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new NotificationKey() {
- @Override
- public String toString() {
- return cur.getId().toString();
- }
- });
+ recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new EntitlementNotificationKey(cur.getId()));
}
}
}
@@ -308,12 +295,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
private void insertEvent(final EntitlementEvent event) {
synchronized (events) {
events.add(event);
- recordFutureNotificationFromTransaction(null, event.getEffectiveDate(), new NotificationKey() {
- @Override
- public String toString() {
- return event.getId().toString();
- }
- });
+ recordFutureNotificationFromTransaction(null, event.getEffectiveDate(), new EntitlementNotificationKey(event.getId()));
}
}
@@ -403,12 +385,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
final SubscriptionData subData = curSubscription.getData();
for (final EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
events.add(curEvent);
- recordFutureNotificationFromTransaction(null, curEvent.getEffectiveDate(), new NotificationKey() {
- @Override
- public String toString() {
- return curEvent.getId().toString();
- }
- });
+ recordFutureNotificationFromTransaction(null, curEvent.getEffectiveDate(), new EntitlementNotificationKey(curEvent.getId()));
}
subscriptions.add(subData);
@@ -437,6 +414,8 @@ public class MockEntitlementDaoMemory implements EntitlementDao, MockEntitlement
subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey);
} catch (NoSuchNotificationQueue e) {
throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 534ee0f..5c03f01 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -30,6 +30,7 @@ import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
import com.ning.billing.entitlement.api.user.Subscription;
import com.ning.billing.invoice.InvoiceListener;
import com.ning.billing.invoice.api.DefaultInvoiceService;
+import com.ning.billing.util.notificationq.NotificationKey;
import com.ning.billing.util.notificationq.NotificationQueue;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
@@ -52,7 +53,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
@Inject
public DefaultNextBillingDateNotifier(final NotificationQueueService notificationQueueService,
- final InvoiceConfig config, final EntitlementUserApi entitlementUserApi, final InvoiceListener listener) {
+ final InvoiceConfig config, final EntitlementUserApi entitlementUserApi, final InvoiceListener listener) {
this.notificationQueueService = notificationQueueService;
this.config = config;
this.entitlementUserApi = entitlementUserApi;
@@ -62,41 +63,46 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
@Override
public void initialize() throws NotificationQueueAlreadyExists {
nextBillingQueue = notificationQueueService.createNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
- NEXT_BILLING_DATE_NOTIFIER_QUEUE,
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final String notificationKey, final DateTime eventDate) {
- try {
- final UUID key = UUID.fromString(notificationKey);
- try {
- final Subscription subscription = entitlementUserApi.getSubscriptionFromId(key);
- if (subscription == null) {
- log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")");
- } else {
- processEvent(key, eventDate);
- }
- } catch (EntitlementUserApiException e) {
- log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")", e);
- }
- } catch (IllegalArgumentException e) {
- log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
- }
-
- }
- },
- new NotificationConfig() {
-
- @Override
- public long getSleepTimeMs() {
- return config.getSleepTimeMs();
- }
-
- @Override
- public boolean isNotificationProcessingOff() {
- return config.isNotificationProcessingOff();
- }
- }
- );
+ NEXT_BILLING_DATE_NOTIFIER_QUEUE,
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate) {
+ try {
+
+ if (! (notificationKey instanceof NextBillingDateNotificationKey)) {
+ log.error("Invoice service received an unexpected event type {}", notificationKey.getClass().getName());
+ return;
+ }
+ final NextBillingDateNotificationKey key = (NextBillingDateNotificationKey) notificationKey;
+ try {
+ final Subscription subscription = entitlementUserApi.getSubscriptionFromId(key.getUuidKey());
+ if (subscription == null) {
+ log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")");
+ } else {
+ processEvent(key.getUuidKey(), eventDate);
+ }
+ } catch (EntitlementUserApiException e) {
+ log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")", e);
+ }
+ } catch (IllegalArgumentException e) {
+ log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
+ }
+
+ }
+ },
+ new NotificationConfig() {
+
+ @Override
+ public long getSleepTimeMs() {
+ return config.getSleepTimeMs();
+ }
+
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return config.isNotificationProcessingOff();
+ }
+ }
+ );
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
index ffc08a3..f2521de 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
@@ -16,6 +16,7 @@
package com.ning.billing.invoice.notification;
+import java.io.IOException;
import java.util.UUID;
import org.joda.time.DateTime;
@@ -51,14 +52,11 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
log.info("Queuing next billing date notification. id: {}, timestamp: {}", subscriptionId.toString(), futureNotificationTime.toString());
- nextBillingQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NotificationKey() {
- @Override
- public String toString() {
- return subscriptionId.toString();
- }
- });
+ nextBillingQueue.recordFutureNotificationFromTransaction(transactionalDao, futureNotificationTime, new NextBillingDateNotificationKey(subscriptionId));
} catch (NoSuchNotificationQueue e) {
log.error("Attempting to put items on a non-existent queue (NextBillingDateNotifier).", e);
+ } catch (IOException e) {
+ log.error("Failed to serialize notficationKey for subscriptionId {}", subscriptionId);
}
}
}
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
index 1695aa4..0d58473 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -27,6 +27,7 @@ import com.ning.billing.config.NotificationConfig;
import com.ning.billing.overdue.OverdueProperties;
import com.ning.billing.overdue.listener.OverdueListener;
import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.notificationq.NotificationKey;
import com.ning.billing.util.notificationq.NotificationQueue;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
@@ -47,7 +48,7 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
@Inject
public DefaultOverdueCheckNotifier(final NotificationQueueService notificationQueueService,
- final OverdueProperties config, final OverdueListener listener) {
+ final OverdueProperties config, final OverdueListener listener) {
this.notificationQueueService = notificationQueueService;
this.config = config;
this.listener = listener;
@@ -57,32 +58,36 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
public void initialize() {
try {
overdueQueue = notificationQueueService.createNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
- OVERDUE_CHECK_NOTIFIER_QUEUE,
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final String notificationKey, final DateTime eventDate) {
- try {
- final UUID key = UUID.fromString(notificationKey);
- processEvent(key, eventDate);
- } catch (IllegalArgumentException e) {
- log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
- return;
- }
-
- }
- },
- new NotificationConfig() {
- @Override
- public boolean isNotificationProcessingOff() {
- return config.isNotificationProcessingOff();
- }
-
- @Override
- public long getSleepTimeMs() {
- return config.getSleepTimeMs();
- }
- }
- );
+ OVERDUE_CHECK_NOTIFIER_QUEUE,
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate) {
+ try {
+ if (! (notificationKey instanceof OverdueCheckNotificationKey)) {
+ log.error("Overdue service received Unexpected notificationKey {}", notificationKey.getClass().getName());
+ return;
+ }
+ final OverdueCheckNotificationKey key = (OverdueCheckNotificationKey) notificationKey;
+ processEvent(key.getUuidKey(), eventDate);
+ } catch (IllegalArgumentException e) {
+ log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
+ return;
+ }
+
+ }
+ },
+ new NotificationConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return config.isNotificationProcessingOff();
+ }
+
+ @Override
+ public long getSleepTimeMs() {
+ return config.getSleepTimeMs();
+ }
+ }
+ );
} catch (NotificationQueueAlreadyExists e) {
throw new RuntimeException(e);
}
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
index 33c1328..79cfdf9 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -16,6 +16,8 @@
package com.ning.billing.ovedue.notification;
+import java.io.IOException;
+
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,16 +50,12 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
log.info("Queuing overdue check notification. id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
- checkOverdueQueue.recordFutureNotification(futureNotificationTime, new NotificationKey() {
- @Override
- public String toString() {
- return overdueable.getId().toString();
- }
- });
+ checkOverdueQueue.recordFutureNotification(futureNotificationTime, new OverdueCheckNotificationKey(overdueable.getId()));
} catch (NoSuchNotificationQueue e) {
log.error("Attempting to put items on a non-existent queue (DefaultOverdueCheck).", e);
+ } catch (IOException e) {
+ log.error("Failed to serialize notifcationKey for {}", overdueable.toString());
}
-
}
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
index f8c4498..292583e 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
@@ -15,6 +15,7 @@
*/
package com.ning.billing.payment.retry;
+import java.io.IOException;
import java.util.UUID;
import org.joda.time.DateTime;
@@ -43,7 +44,7 @@ public abstract class BaseRetryService implements RetryService {
private NotificationQueue retryQueue;
public BaseRetryService(final NotificationQueueService notificationQueueService,
- final Clock clock, final PaymentConfig config) {
+ final Clock clock, final PaymentConfig config) {
this.notificationQueueService = notificationQueueService;
final Clock clock1 = clock;
this.config = config;
@@ -54,13 +55,18 @@ public abstract class BaseRetryService implements RetryService {
public void initialize(final String svcName) throws NotificationQueueAlreadyExists {
retryQueue = notificationQueueService.createNotificationQueue(svcName, getQueueName(), new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final String notificationKey, final DateTime eventDateTime) {
- retry(UUID.fromString(notificationKey));
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
+ if (! (notificationKey instanceof PaymentRetryNotificationKey)) {
+ log.error("Payment service got an unexpected notification type {}", notificationKey.getClass().getName());
+ return;
+ }
+ final PaymentRetryNotificationKey key = (PaymentRetryNotificationKey) notificationKey;
+ retry(key.getUuidKey());
}
},
- config);
+ config);
}
-
+
@Override
public void start() {
retryQueue.startQueue();
@@ -109,19 +115,14 @@ public abstract class BaseRetryService implements RetryService {
} catch (NoSuchNotificationQueue e) {
log.error(String.format("Failed to retrieve notification queue %s:%s", DefaultPaymentService.SERVICE_NAME, getQueueName()));
}
- */
+ */
}
private boolean scheduleRetryInternal(final UUID paymentId, final DateTime timeOfRetry, final Transmogrifier transactionalDao) {
try {
final NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, getQueueName());
- final NotificationKey key = new NotificationKey() {
- @Override
- public String toString() {
- return paymentId.toString();
- }
- };
+ final NotificationKey key = new PaymentRetryNotificationKey(paymentId);
if (retryQueue != null) {
if (transactionalDao == null) {
retryQueue.recordFutureNotification(timeOfRetry, key);
@@ -132,6 +133,9 @@ public abstract class BaseRetryService implements RetryService {
} catch (NoSuchNotificationQueue e) {
log.error(String.format("Failed to retrieve notification queue %s:%s", DefaultPaymentService.SERVICE_NAME, getQueueName()));
return false;
+ } catch (IOException e) {
+ log.error(String.format("Failed to serialize notificationQueue event for paymentId %s", paymentId));
+ return false;
}
return true;
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
index d89b07f..b315e79 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
@@ -46,7 +46,7 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
private final PersistentBusSqlDao dao;
- private final ObjectMapper objectMapper = new ObjectMapper();
+
private final EventBusDelegate eventBusDelegate;
private final Clock clock;
private final String hostname;
@@ -108,7 +108,7 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
int result = 0;
for (final BusEventEntry cur : events) {
- final BusEvent evt = deserializeBusEvent(cur.getBusEventClass(), cur.getBusEventJson());
+ final BusEvent evt = deserializeEvent(cur.getBusEventClass(), cur.getBusEventJson());
result++;
// STEPH exception handling is done by GUAVA-- logged a bug Issue-780
eventBusDelegate.post(evt);
@@ -117,16 +117,6 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
return result;
}
- private BusEvent deserializeBusEvent(final String className, final String json) {
- try {
- final Class<?> claz = Class.forName(className);
- return (BusEvent) objectMapper.readValue(json, claz);
- } catch (Exception e) {
- log.error(String.format("Failed to deserialize json object %s for class %s", json, className), e);
- return null;
- }
- }
-
private List<BusEventEntry> getNextBusEvent() {
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index f5a5f62..dc3ee4d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -73,6 +73,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
stmt.bind("id", evt.getId().toString());
stmt.bind("createdDate", getDate(new DateTime()));
stmt.bind("creatingOwner", evt.getCreatedOwner());
+ stmt.bind("className", evt.getNotificationKeyClass());
stmt.bind("notificationKey", evt.getNotificationKey());
stmt.bind("effectiveDate", getDate(evt.getEffectiveDate()));
stmt.bind("queueName", evt.getQueueName());
@@ -91,6 +92,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
final Long ordering = r.getLong("record_id");
final UUID id = getUUID(r, "id");
final String createdOwner = r.getString("creating_owner");
+ final String className = r.getString("class_name");
final String notificationKey = r.getString("notification_key");
final String queueName = r.getString("queue_name");
final DateTime effectiveDate = getDate(r, "effective_date");
@@ -99,7 +101,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
final PersistentQueueEntryLifecycleState processingState = PersistentQueueEntryLifecycleState.valueOf(r.getString("processing_state"));
return new DefaultNotification(ordering, id, createdOwner, processingOwner, queueName, nextAvailableDate,
- processingState, notificationKey, effectiveDate);
+ processingState, className, notificationKey, effectiveDate);
}
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index fe9b7e2..1d74210 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -29,13 +29,14 @@ public class DefaultNotification extends EntityBase implements Notification {
private final String queueName;
private final DateTime nextAvailableDate;
private final PersistentQueueEntryLifecycleState lifecycleState;
+ private final String notificationKeyClass;
private final String notificationKey;
private final DateTime effectiveDate;
public DefaultNotification(final long ordering, final UUID id, final String createdOwner, final String owner, final String queueName, final DateTime nextAvailableDate,
final PersistentQueueEntryLifecycleState lifecycleState,
- final String notificationKey, final DateTime effectiveDate) {
+ final String notificationKeyClass, final String notificationKey, final DateTime effectiveDate) {
super(id);
this.ordering = ordering;
this.owner = owner;
@@ -43,12 +44,13 @@ public class DefaultNotification extends EntityBase implements Notification {
this.queueName = queueName;
this.nextAvailableDate = nextAvailableDate;
this.lifecycleState = lifecycleState;
+ this.notificationKeyClass = notificationKeyClass;
this.notificationKey = notificationKey;
this.effectiveDate = effectiveDate;
}
- public DefaultNotification(final String queueName, final String createdOwner, final String notificationKey, final DateTime effectiveDate) {
- this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE, notificationKey, effectiveDate);
+ public DefaultNotification(final String queueName, final String createdOwner, final String notificationKeyClass, final String notificationKey, final DateTime effectiveDate) {
+ this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE, notificationKeyClass, notificationKey, effectiveDate);
}
@Override
@@ -91,6 +93,12 @@ public class DefaultNotification extends EntityBase implements Notification {
}
@Override
+ public String getNotificationKeyClass() {
+ return notificationKeyClass;
+ }
+
+
+ @Override
public String getNotificationKey() {
return notificationKey;
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 607c1ba..19923b3 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -16,6 +16,7 @@
package com.ning.billing.util.notificationq;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -25,6 +26,7 @@ import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import com.ning.billing.config.NotificationConfig;
+import com.ning.billing.util.bus.dao.BusEventEntry;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
@@ -56,7 +58,8 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
nbProcessedEvents.incrementAndGet();
logDebug("handling notification %s, key = %s for time %s",
cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
- handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
+ NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
+ handler.handleReadyNotification(key, cur.getEffectiveDate());
result++;
clearNotification(cur);
logDebug("done handling notification %s, key = %s for time %s",
@@ -65,20 +68,24 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
return result;
}
+
@Override
- public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) {
- final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.toString(), futureNotificationTime);
- dao.insertNotification(notification);
+ public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
+ recordFutureNotificationInternal(futureNotificationTime, notificationKey, dao);
}
@Override
public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
- final DateTime futureNotificationTime, final NotificationKey notificationKey) {
+ final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
final NotificationSqlDao transactionalNotificationDao = transactionalDao.become(NotificationSqlDao.class);
- final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.toString(), futureNotificationTime);
- transactionalNotificationDao.insertNotification(notification);
+ recordFutureNotificationInternal(futureNotificationTime, notificationKey, transactionalNotificationDao);
+ }
+
+ private void recordFutureNotificationInternal(final DateTime futureNotificationTime, final NotificationKey notificationKey, final NotificationSqlDao thisDao) throws IOException {
+ final String json = objectMapper.writeValueAsString(notificationKey);
+ final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.getClass().getName(), json, futureNotificationTime);
+ thisDao.insertNotification(notification);
}
-
private void clearNotification(final Notification cleared) {
dao.clearNotification(cleared.getId().toString(), hostname);
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index 53fd0e0..2ecea54 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -24,6 +24,8 @@ import com.ning.billing.util.queue.PersistentQueueEntryLifecycle;
public interface Notification extends PersistentQueueEntryLifecycle, Entity {
public Long getOrdering();
+ public String getNotificationKeyClass();
+
public String getNotificationKey();
public DateTime getEffectiveDate();
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java
index 2cde9bf..534a99e 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationKey.java
@@ -21,6 +21,4 @@ package com.ning.billing.util.notificationq;
*/
public interface NotificationKey {
- @Override
- public String toString();
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 6e09320..678da46 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -16,6 +16,8 @@
package com.ning.billing.util.notificationq;
+import java.io.IOException;
+
import org.joda.time.DateTime;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
@@ -29,7 +31,8 @@ public interface NotificationQueue extends QueueLifecycle {
* @param futureNotificationTime the time at which the notification is ready
* @param notificationKey the key for that notification
*/
- public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey);
+ public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey)
+ throws IOException;
/**
* Record from within a transaction the need to be called back when the notification is ready
@@ -39,7 +42,8 @@ public interface NotificationQueue extends QueueLifecycle {
* @param notificationKey the key for that notification
*/
public void recordFutureNotificationFromTransaction(final Transmogrifier transactionalDao,
- final DateTime futureNotificationTime, final NotificationKey notificationKey);
+ final DateTime futureNotificationTime, final NotificationKey notificationKey)
+ throws IOException;
/**
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index e66d803..3b3d74d 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -29,7 +29,7 @@ public interface NotificationQueueService {
*
* @param notificationKey the notification key associated to that notification entry
*/
- public void handleReadyNotification(String notificationKey, DateTime eventDateTime);
+ public void handleReadyNotification(NotificationKey notificationKey, DateTime eventDateTime);
}
public static final class NotificationQueueAlreadyExists extends Exception {
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index e938057..09bb1b5 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -22,7 +22,9 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.ning.billing.config.PersistentQueueConfig;
+import com.ning.billing.util.jackson.ObjectMapper;
public abstract class PersistentQueueBase implements QueueLifecycle {
@@ -35,15 +37,17 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
private final Executor executor;
private final String svcName;
private final long sleepTimeMs;
-
private boolean isProcessingEvents;
private int curActiveThreads;
+ protected final ObjectMapper objectMapper;
+
public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
this.executor = executor;
this.nbThreads = nbThreads;
this.svcName = svcName;
this.sleepTimeMs = config.getSleepTimeMs();
+ this.objectMapper = new ObjectMapper();
this.isProcessingEvents = false;
this.curActiveThreads = 0;
}
@@ -154,6 +158,18 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
curActiveThreads = 0;
}
}
+
+ protected <T> T deserializeEvent(final String className, final String json) {
+ try {
+ final Class<?> claz = Class.forName(className);
+ return (T) objectMapper.readValue(json, claz);
+ } catch (Exception e) {
+ log.error(String.format("Failed to deserialize json object %s for class %s", json, className), e);
+ return null;
+ }
+ }
+
+
@Override
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 9ea7c1a..c61cd30 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -102,7 +102,8 @@ CREATE TABLE notifications (
record_id int(11) unsigned NOT NULL AUTO_INCREMENT,
id char(36) NOT NULL,
created_date datetime NOT NULL,
- notification_key varchar(256) NOT NULL,
+ class_name varchar(256) NOT NULL,
+ notification_key varchar(2048) NOT NULL,
creating_owner char(50) NOT NULL,
effective_date datetime NOT NULL,
queue_name char(64) NOT NULL,
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 0731adc..13ff029 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -4,6 +4,7 @@ getReadyNotifications() ::= <<
select
record_id
, id
+ , class_name
, notification_key
, created_date
, creating_owner
@@ -62,6 +63,7 @@ removeNotificationsByKey() ::= <<
insertNotification() ::= <<
insert into notifications (
id
+ , class_name
, notification_key
, created_date
, creating_owner
@@ -72,6 +74,7 @@ insertNotification() ::= <<
, processing_state
) values (
:id
+ , :className
, :notificationKey
, :createdDate
, :creatingOwner
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 4b5ec17..d7141d3 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -102,7 +102,7 @@ public class TestNotificationSqlDao {
final String notificationKey = UUID.randomUUID().toString();
final DateTime effDt = new DateTime();
- final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey, effDt);
+ final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey.getClass().getName(), notificationKey, effDt);
dao.insertNotification(notif);
Thread.sleep(1000);
@@ -149,6 +149,7 @@ public class TestNotificationSqlDao {
final Notification res = handle.createQuery(" select" +
" record_id " +
", id" +
+ ", class_name" +
", notification_key" +
", created_date" +
", creating_owner" +
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 258e6da..5e7d607 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -16,6 +16,7 @@
package com.ning.billing.util.notificationq;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
@@ -25,6 +26,7 @@ import java.util.TreeSet;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.ning.billing.config.NotificationConfig;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
@@ -33,6 +35,8 @@ import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueue
public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
private final TreeSet<Notification> notifications;
+ ObjectMapper objectMapper = new ObjectMapper();
+
public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
super(clock, svcName, queueName, handler, config);
notifications = new TreeSet<Notification>(new Comparator<Notification>() {
@@ -48,8 +52,9 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
}
@Override
- public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) {
- final Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.toString(), futureNotificationTime);
+ public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey) throws IOException {
+ final String json = objectMapper.writeValueAsString(notificationKey);
+ final Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.getClass().getName(), json, futureNotificationTime);
synchronized (notifications) {
notifications.add(notification);
}
@@ -58,7 +63,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
@Override
public void recordFutureNotificationFromTransaction(
final Transmogrifier transactionalDao, final DateTime futureNotificationTime,
- final NotificationKey notificationKey) {
+ final NotificationKey notificationKey) throws IOException {
recordFutureNotification(futureNotificationTime, notificationKey);
}
@@ -94,8 +99,12 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
result = readyNotifications.size();
for (final Notification cur : readyNotifications) {
- handler.handleReadyNotification(cur.getNotificationKey(), cur.getEffectiveDate());
- final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), hostname, hostname, "MockQueue", clock.getUTCNow().plus(CLAIM_TIME_MS), PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKey(), cur.getEffectiveDate());
+
+
+ NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
+ handler.handleReadyNotification(key, cur.getEffectiveDate());
+ final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), hostname, hostname, "MockQueue", clock.getUTCNow().plus(CLAIM_TIME_MS), PersistentQueueEntryLifecycleState.PROCESSED,
+ cur.getNotificationKeyClass(), cur.getNotificationKey(), cur.getEffectiveDate());
oldNotifications.add(cur);
processedNotifications.add(processedNotification);
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index d972537..11db32e 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -40,6 +40,8 @@ import org.testng.annotations.BeforeTest;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.inject.AbstractModule;
@@ -84,6 +86,28 @@ public class TestNotificationQueue {
helper.initDb(testDdl);
}
+
+ private final static class TestNotificationKey implements NotificationKey, Comparable<TestNotificationKey> {
+
+ private final String value;
+
+ @JsonCreator
+ public TestNotificationKey(@JsonProperty("value") String value) {
+ super();
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public int compareTo(TestNotificationKey arg0) {
+ return value.compareTo(arg0.value);
+ }
+ }
+
+
@BeforeSuite(groups = "slow")
public void setup() throws Exception {
startMysql();
@@ -124,21 +148,21 @@ public class TestNotificationQueue {
@Test(groups = {"slow"}, enabled = true)
public void testSimpleNotification() throws Exception {
- final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+ final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final String notificationKey, final DateTime eventDateTime) {
- synchronized (expectedNotifications) {
- log.info("Handler received key: " + notificationKey);
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
+ synchronized (expectedNotifications) {
+ log.info("Handler received key: " + notificationKey);
- expectedNotifications.put(notificationKey, Boolean.TRUE);
- expectedNotifications.notify();
- }
- }
- },
- getNotificationConfig(false, 100, 1, 10000));
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ },
+ getNotificationConfig(false, 100, 1, 10000));
queue.startQueue();
@@ -147,24 +171,20 @@ public class TestNotificationQueue {
final DummyObject obj = new DummyObject("foo", key);
final DateTime now = new DateTime();
final DateTime readyTime = now.plusMillis(2000);
- final NotificationKey notificationKey = new NotificationKey() {
- @Override
- public String toString() {
- return key.toString();
- }
- };
- expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+ final NotificationKey notificationKey = new TestNotificationKey(key.toString());
+
+ expectedNotifications.put(notificationKey, Boolean.FALSE);
// Insert dummy to be processed in 2 sec'
dao.inTransaction(new Transaction<Void, DummySqlTest>() {
@Override
public Void inTransaction(final DummySqlTest transactional,
- final TransactionStatus status) throws Exception {
+ final TransactionStatus status) throws Exception {
transactional.insertDummy(obj);
queue.recordFutureNotificationFromTransaction(transactional,
- readyTime, notificationKey);
+ readyTime, notificationKey);
log.info("Posted key: " + notificationKey);
return null;
@@ -178,29 +198,29 @@ public class TestNotificationQueue {
await().atMost(1, MINUTES).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- return expectedNotifications.get(notificationKey.toString());
+ return expectedNotifications.get(notificationKey);
}
});
queue.stopQueue();
- Assert.assertTrue(expectedNotifications.get(notificationKey.toString()));
+ Assert.assertTrue(expectedNotifications.get(notificationKey));
}
@Test(groups = "slow")
public void testManyNotifications() throws InterruptedException {
- final Map<String, Boolean> expectedNotifications = new TreeMap<String, Boolean>();
+ final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final String notificationKey, final DateTime eventDateTime) {
- synchronized (expectedNotifications) {
- expectedNotifications.put(notificationKey, Boolean.TRUE);
- expectedNotifications.notify();
- }
- }
- },
- getNotificationConfig(false, 100, 10, 10000));
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
+ synchronized (expectedNotifications) {
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ },
+ getNotificationConfig(false, 100, 10, 10000));
queue.startQueue();
@@ -215,22 +235,17 @@ public class TestNotificationQueue {
final DummyObject obj = new DummyObject("foo", key);
final int currentIteration = i;
- final NotificationKey notificationKey = new NotificationKey() {
- @Override
- public String toString() {
- return key.toString();
- }
- };
- expectedNotifications.put(notificationKey.toString(), Boolean.FALSE);
+ final NotificationKey notificationKey = new TestNotificationKey(key.toString());
+ expectedNotifications.put(notificationKey, Boolean.FALSE);
dao.inTransaction(new Transaction<Void, DummySqlTest>() {
@Override
public Void inTransaction(final DummySqlTest transactional,
- final TransactionStatus status) throws Exception {
+ final TransactionStatus status) throws Exception {
transactional.insertDummy(obj);
queue.recordFutureNotificationFromTransaction(transactional,
- now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
+ now.plus((currentIteration + 1) * nextReadyTimeIncrementMs), notificationKey);
return null;
}
});
@@ -266,6 +281,12 @@ public class TestNotificationQueue {
} while (nbTry-- > 0);
queue.stopQueue();
+ log.info("STEPH GOT SIZE " + Collections2.filter(expectedNotifications.values(), new Predicate<Boolean>() {
+ @Override
+ public boolean apply(final Boolean input) {
+ return input;
+ }
+ }).size());
assertEquals(success, true);
}
@@ -278,8 +299,8 @@ public class TestNotificationQueue {
@Test(groups = {"slow"}, enabled = true)
public void testMultipleHandlerNotification() throws Exception {
- final Map<String, Boolean> expectedNotificationsFred = new TreeMap<String, Boolean>();
- final Map<String, Boolean> expectedNotificationsBarney = new TreeMap<String, Boolean>();
+ final Map<NotificationKey, Boolean> expectedNotificationsFred = new TreeMap<NotificationKey, Boolean>();
+ final Map<NotificationKey, Boolean> expectedNotificationsBarney = new TreeMap<NotificationKey, Boolean>();
final NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock);
@@ -298,23 +319,23 @@ public class TestNotificationQueue {
final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final String notificationKey, final DateTime eventDateTime) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
log.info("Fred received key: " + notificationKey);
expectedNotificationsFred.put(notificationKey, Boolean.TRUE);
eventsReceived++;
}
},
- config);
+ config);
final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final String notificationKey, final DateTime eventDateTime) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
log.info("Barney received key: " + notificationKey);
expectedNotificationsBarney.put(notificationKey, Boolean.TRUE);
eventsReceived++;
}
},
- config);
+ config);
queueFred.startQueue();
// We don't start Barney so it can never pick up notifications
@@ -324,37 +345,27 @@ public class TestNotificationQueue {
final DummyObject obj = new DummyObject("foo", key);
final DateTime now = new DateTime();
final DateTime readyTime = now.plusMillis(2000);
- final NotificationKey notificationKeyFred = new NotificationKey() {
- @Override
- public String toString() {
- return "Fred";
- }
- };
+ final NotificationKey notificationKeyFred = new TestNotificationKey("Fred");
- final NotificationKey notificationKeyBarney = new NotificationKey() {
- @Override
- public String toString() {
- return "Barney";
- }
- };
+ final NotificationKey notificationKeyBarney = new TestNotificationKey("Barney");
- expectedNotificationsFred.put(notificationKeyFred.toString(), Boolean.FALSE);
- expectedNotificationsFred.put(notificationKeyBarney.toString(), Boolean.FALSE);
+ expectedNotificationsFred.put(notificationKeyFred, Boolean.FALSE);
+ expectedNotificationsFred.put(notificationKeyBarney, Boolean.FALSE);
// Insert dummy to be processed in 2 sec'
dao.inTransaction(new Transaction<Void, DummySqlTest>() {
@Override
public Void inTransaction(final DummySqlTest transactional,
- final TransactionStatus status) throws Exception {
+ final TransactionStatus status) throws Exception {
transactional.insertDummy(obj);
queueFred.recordFutureNotificationFromTransaction(transactional,
- readyTime, notificationKeyFred);
+ readyTime, notificationKeyFred);
log.info("posted key: " + notificationKeyFred.toString());
queueBarney.recordFutureNotificationFromTransaction(transactional,
- readyTime, notificationKeyBarney);
+ readyTime, notificationKeyBarney);
log.info("posted key: " + notificationKeyBarney.toString());
return null;
@@ -379,12 +390,12 @@ public class TestNotificationQueue {
}
queueFred.stopQueue();
- Assert.assertTrue(expectedNotificationsFred.get(notificationKeyFred.toString()));
- Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney.toString()));
+ Assert.assertTrue(expectedNotificationsFred.get(notificationKeyFred));
+ Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney));
}
NotificationConfig getNotificationConfig(final boolean off,
- final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
+ final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
return new NotificationConfig() {
@Override
public boolean isNotificationProcessingOff() {
@@ -403,31 +414,21 @@ public class TestNotificationQueue {
public void testRemoveNotifications() throws InterruptedException {
final UUID key = UUID.randomUUID();
- final NotificationKey notificationKey = new NotificationKey() {
- @Override
- public String toString() {
- return key.toString();
- }
- };
+ final NotificationKey notificationKey = new TestNotificationKey(key.toString());
final UUID key2 = UUID.randomUUID();
- final NotificationKey notificationKey2 = new NotificationKey() {
- @Override
- public String toString() {
- return key2.toString();
- }
- };
+ final NotificationKey notificationKey2 = new TestNotificationKey(key2.toString());
final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final String key, final DateTime eventDateTime) {
- if (key.equals(notificationKey) || key.equals(notificationKey2)) { //ignore stray events from other tests
- log.info("Received notification with key: " + notificationKey);
- eventsReceived++;
- }
- }
- },
- getNotificationConfig(false, 100, 10, 10000));
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime) {
+ if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
+ log.info("Received notification with key: " + notificationKey);
+ eventsReceived++;
+ }
+ }
+ },
+ getNotificationConfig(false, 100, 10, 10000));
queue.startQueue();
@@ -440,14 +441,14 @@ public class TestNotificationQueue {
dao.inTransaction(new Transaction<Void, DummySqlTest>() {
@Override
public Void inTransaction(final DummySqlTest transactional,
- final TransactionStatus status) throws Exception {
+ final TransactionStatus status) throws Exception {
queue.recordFutureNotificationFromTransaction(transactional,
- start.plus(nextReadyTimeIncrementMs), notificationKey);
+ start.plus(nextReadyTimeIncrementMs), notificationKey);
queue.recordFutureNotificationFromTransaction(transactional,
- start.plus(2 * nextReadyTimeIncrementMs), notificationKey);
+ start.plus(2 * nextReadyTimeIncrementMs), notificationKey);
queue.recordFutureNotificationFromTransaction(transactional,
- start.plus(3 * nextReadyTimeIncrementMs), notificationKey2);
+ start.plus(3 * nextReadyTimeIncrementMs), notificationKey2);
return null;
}
});