killbill-memoizeit
Changes
subscription/src/main/java/org/killbill/billing/subscription/api/SubscriptionBaseApiService.java 2(+1 -1)
subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java 13(+7 -6)
subscription/src/main/java/org/killbill/billing/subscription/engine/core/DefaultSubscriptionBaseService.java 49(+26 -23)
subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java 66(+42 -24)
Details
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/api/SubscriptionBaseApiService.java b/subscription/src/main/java/org/killbill/billing/subscription/api/SubscriptionBaseApiService.java
index a4fefd0..200a0fc 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/api/SubscriptionBaseApiService.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/api/SubscriptionBaseApiService.java
@@ -81,7 +81,7 @@ public interface SubscriptionBaseApiService {
String priceList, List<PlanPhasePriceOverride> overrides, BillingActionPolicy policy, CallContext context)
throws SubscriptionBaseApiException;
- public int cancelAddOnsIfRequired(final Product baseProduct, final UUID bundleId, final DateTime effectiveDate, final CallContext context) throws CatalogApiException;
+ public int cancelAddOnsIfRequiredOnBasePlanEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final CallContext context) throws CatalogApiException;
public PlanChangeResult getPlanChangeResult(final DefaultSubscriptionBase subscription, final String productName,
final BillingPeriod term, final String priceList, final DateTime effectiveDate, TenantContext context) throws SubscriptionBaseApiException;
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java b/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java
index cc18c43..cb7804e 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java
@@ -519,13 +519,14 @@ public class DefaultSubscriptionBaseApiService implements SubscriptionBaseApiSer
}
@Override
- public int cancelAddOnsIfRequired(final Product baseProduct, final UUID bundleId, final DateTime effectiveDate, final CallContext context) throws CatalogApiException {
+ public int cancelAddOnsIfRequiredOnBasePlanEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final CallContext context) throws CatalogApiException {
+ final Product baseProduct = (subscription.getState() == EntitlementState.CANCELLED) ? null : subscription.getCurrentPlan().getProduct();
+
final List<SubscriptionBaseEvent> cancelEvents = new LinkedList<SubscriptionBaseEvent>();
- final InternalCallContext internalCallContext = createCallContextFromBundleId(bundleId, context);
- final List<DefaultSubscriptionBase> subscriptionsToBeCancelled = computeAddOnsToCancel(cancelEvents, baseProduct, bundleId, effectiveDate, internalCallContext);
- if (!subscriptionsToBeCancelled.isEmpty()) {
- dao.cancelSubscriptions(subscriptionsToBeCancelled, cancelEvents, internalCallContext);
- }
+ final InternalCallContext internalCallContext = createCallContextFromBundleId(subscription.getBundleId(), context);
+ final List<DefaultSubscriptionBase> subscriptionsToBeCancelled = computeAddOnsToCancel(cancelEvents, baseProduct, subscription.getBundleId(), event.getEffectiveDate(), internalCallContext);
+ dao.cancelSubscriptionsOnBasePlanEvent(subscription, event, subscriptionsToBeCancelled, cancelEvents, internalCallContext);
+
return subscriptionsToBeCancelled.size();
}
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/engine/core/DefaultSubscriptionBaseService.java b/subscription/src/main/java/org/killbill/billing/subscription/engine/core/DefaultSubscriptionBaseService.java
index 8b4fbe8..18dd082 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/engine/core/DefaultSubscriptionBaseService.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/engine/core/DefaultSubscriptionBaseService.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -23,10 +23,7 @@ import java.util.UUID;
import org.joda.time.DateTime;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.catalog.api.CatalogApiException;
-import org.killbill.billing.catalog.api.Product;
import org.killbill.billing.catalog.api.ProductCategory;
-import org.killbill.billing.entitlement.api.Entitlement.EntitlementState;
-import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
import org.killbill.billing.platform.api.LifecycleHandlerType;
import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
import org.killbill.billing.subscription.alignment.PlanAligner;
@@ -41,12 +38,12 @@ import org.killbill.billing.subscription.events.SubscriptionBaseEvent;
import org.killbill.billing.subscription.events.SubscriptionBaseEvent.EventType;
import org.killbill.billing.subscription.events.phase.PhaseEvent;
import org.killbill.billing.subscription.events.phase.PhaseEventData;
-import org.killbill.billing.subscription.events.user.ApiEvent;
import org.killbill.billing.subscription.exceptions.SubscriptionBaseError;
import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.PersistentBus;
import org.killbill.bus.api.PersistentBus.EventBusException;
import org.killbill.clock.Clock;
@@ -74,9 +71,10 @@ public class DefaultSubscriptionBaseService implements EventListener, Subscripti
private final PersistentBus eventBus;
private final NotificationQueueService notificationQueueService;
private final InternalCallContextFactory internalCallContextFactory;
- private NotificationQueue subscriptionEventQueue;
private final SubscriptionBaseApiService apiService;
+ private NotificationQueue subscriptionEventQueue;
+
@Inject
public DefaultSubscriptionBaseService(final Clock clock, final SubscriptionDao dao, final PlanAligner planAligner,
final PersistentBus eventBus,
@@ -159,22 +157,24 @@ public class DefaultSubscriptionBaseService implements EventListener, Subscripti
return;
}
- //
- // Do any internal processing on that event before we send the event to the bus
- //
- int theRealSeqId = seqId;
+ boolean eventSent = false;
if (event.getType() == EventType.PHASE) {
- onPhaseEvent(subscription, context);
+ eventSent = onPhaseEvent(subscription, event, context);
} else if (event.getType() == EventType.API_USER && subscription.getCategory() == ProductCategory.BASE) {
final CallContext callContext = internalCallContextFactory.createCallContext(context);
- theRealSeqId = onBasePlanEvent(subscription, (ApiEvent) event, callContext);
+ eventSent = onBasePlanEvent(subscription, event, callContext);
}
- final SubscriptionBaseTransitionData transition = (subscription.getTransitionFromEvent(event, theRealSeqId));
- final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, subscription.getAlignStartDate(),
- context.getUserToken(),
- context.getAccountRecordId(), context.getTenantRecordId());
- eventBus.post(busEvent);
+ if (!eventSent) {
+ // Methods above invoking the DAO will send this event directly from the transaction
+ final SubscriptionBaseTransitionData transition = subscription.getTransitionFromEvent(event, seqId);
+ final BusEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition,
+ subscription.getAlignStartDate(),
+ context.getUserToken(),
+ context.getAccountRecordId(),
+ context.getTenantRecordId());
+ eventBus.post(busEvent);
+ }
} catch (final EventBusException e) {
log.warn("Failed to post subscription event " + event, e);
} catch (final CatalogApiException e) {
@@ -182,7 +182,7 @@ public class DefaultSubscriptionBaseService implements EventListener, Subscripti
}
}
- private void onPhaseEvent(final DefaultSubscriptionBase subscription, final InternalCallContext context) {
+ private boolean onPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent readyPhaseEvent, final InternalCallContext context) {
try {
final DateTime now = clock.getUTCNow();
final TimedPhase nextTimedPhase = planAligner.getNextTimedPhase(subscription, now, context);
@@ -191,15 +191,18 @@ public class DefaultSubscriptionBaseService implements EventListener, Subscripti
nextTimedPhase.getPhase().getName(), nextTimedPhase.getStartPhase()) :
null;
if (nextPhaseEvent != null) {
- dao.createNextPhaseEvent(subscription, nextPhaseEvent, context);
+ dao.createNextPhaseEvent(subscription, readyPhaseEvent, nextPhaseEvent, context);
+ return true;
}
} catch (final SubscriptionBaseError e) {
log.error(String.format("Failed to insert next phase for subscription %s", subscription.getId()), e);
}
+
+ return false;
}
- private int onBasePlanEvent(final DefaultSubscriptionBase baseSubscription, final ApiEvent event, final CallContext context) throws CatalogApiException {
- final Product baseProduct = (baseSubscription.getState() == EntitlementState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
- return apiService.cancelAddOnsIfRequired(baseProduct, baseSubscription.getBundleId(), event.getEffectiveDate(), context);
+ private boolean onBasePlanEvent(final DefaultSubscriptionBase baseSubscription, final SubscriptionBaseEvent event, final CallContext context) throws CatalogApiException {
+ apiService.cancelAddOnsIfRequiredOnBasePlanEvent(baseSubscription, event, context);
+ return true;
}
}
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java b/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java
index b6768f5..1e6c118 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java
@@ -45,7 +45,6 @@ import org.killbill.billing.catalog.api.Plan;
import org.killbill.billing.catalog.api.ProductCategory;
import org.killbill.billing.entitlement.api.SubscriptionApiException;
import org.killbill.billing.entity.EntityPersistenceException;
-import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
import org.killbill.billing.subscription.api.SubscriptionBase;
import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
import org.killbill.billing.subscription.api.migration.AccountMigrationData;
@@ -86,6 +85,7 @@ import org.killbill.billing.util.entity.dao.EntityDaoBase;
import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
import org.killbill.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import org.killbill.bus.api.BusEvent;
import org.killbill.bus.api.PersistentBus;
import org.killbill.bus.api.PersistentBus.EventBusException;
import org.killbill.clock.Clock;
@@ -405,20 +405,21 @@ public class DefaultSubscriptionDao extends EntityDaoBase<SubscriptionBundleMode
}
@Override
- public void createNextPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent nextPhase, final InternalCallContext context) {
+ public void createNextPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent readyPhaseEvent, final SubscriptionBaseEvent nextPhaseEvent, final InternalCallContext context) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@Override
public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
final SubscriptionEventSqlDao transactional = entitySqlDaoWrapperFactory.become(SubscriptionEventSqlDao.class);
final UUID subscriptionId = subscription.getId();
cancelNextPhaseEventFromTransaction(subscriptionId, entitySqlDaoWrapperFactory, context);
- transactional.create(new SubscriptionEventModelDao(nextPhase), context);
+ transactional.create(new SubscriptionEventModelDao(nextPhaseEvent), context);
recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
- nextPhase.getEffectiveDate(),
- new SubscriptionNotificationKey(nextPhase.getId()), context);
+ nextPhaseEvent.getEffectiveDate(),
+ new SubscriptionNotificationKey(nextPhaseEvent.getId()), context);
- // Notify the Bus of the requested change
- notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, nextPhase, SubscriptionBaseTransitionType.PHASE, context);
+ // Notify the Bus
+ notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, nextPhaseEvent, SubscriptionBaseTransitionType.PHASE, context);
+ notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, readyPhaseEvent, 0, context);
return null;
}
@@ -561,6 +562,19 @@ public class DefaultSubscriptionDao extends EntityDaoBase<SubscriptionBundleMode
}
@Override
+ public void cancelSubscriptionsOnBasePlanEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final List<DefaultSubscriptionBase> subscriptions, final List<SubscriptionBaseEvent> cancelEvents, final InternalCallContext context) {
+ transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+ @Override
+ public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
+ cancelSubscriptionsFromTransaction(entitySqlDaoWrapperFactory, subscriptions, cancelEvents, context);
+ // Make sure to always send the event, even if there were no subscriptions to cancel
+ notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, subscriptions.size(), context);
+ return null;
+ }
+ });
+ }
+
+ @Override
public void cancelSubscriptions(final List<DefaultSubscriptionBase> subscriptions, final List<SubscriptionBaseEvent> cancelEvents, final InternalCallContext context) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@Override
@@ -1046,12 +1060,12 @@ public class DefaultSubscriptionDao extends EntityDaoBase<SubscriptionBundleMode
}
//
- // Either records a notfication or sends a bus event is operation is immediate
+ // Either records a notification or sends a bus event if operation is immediate
//
private void recordBusOrFutureNotificationFromTransaction(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final boolean busEvent,
final int seqId, final InternalCallContext context) {
if (busEvent) {
- notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, seqId, context);
+ rebuildSubscriptionAndNotifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, seqId, context);
} else {
recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
event.getEffectiveDate(),
@@ -1060,25 +1074,29 @@ public class DefaultSubscriptionDao extends EntityDaoBase<SubscriptionBundleMode
}
}
- //
- // Sends bus notification for event on effecfive date-- only used for operation that happen immediately:
- // - CREATE,
- // - IMM CANCEL or CHANGE
- //
- private void notifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final DefaultSubscriptionBase subscription,
- final SubscriptionBaseEvent immediateEvent, final int seqId, final InternalCallContext context) {
+ // Sends bus notification for event on effective date -- only used for operation that happen immediately
+ private void rebuildSubscriptionAndNotifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final DefaultSubscriptionBase subscription,
+ final SubscriptionBaseEvent immediateEvent, final int seqId, final InternalCallContext context) {
try {
final DefaultSubscriptionBase upToDateSubscription = createSubscriptionWithNewEvent(subscription, immediateEvent, context);
+ notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, upToDateSubscription, immediateEvent, seqId, context);
+ } catch (final CatalogApiException e) {
+ log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
+ }
+ }
- final SubscriptionBaseTransitionData transition = upToDateSubscription.getTransitionFromEvent(immediateEvent, seqId);
- final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, upToDateSubscription.getAlignStartDate(),
- context.getUserToken(),
- context.getAccountRecordId(), context.getTenantRecordId());
+ private void notifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final DefaultSubscriptionBase subscription,
+ final SubscriptionBaseEvent immediateEvent, final int seqId, final InternalCallContext context) {
+ try {
+ final SubscriptionBaseTransitionData transition = subscription.getTransitionFromEvent(immediateEvent, seqId);
+ final BusEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition,
+ subscription.getAlignStartDate(),
+ context.getUserToken(),
+ context.getAccountRecordId(),
+ context.getTenantRecordId());
eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory.getHandle().getConnection());
- } catch (EventBusException e) {
- log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
- } catch (CatalogApiException e) {
+ } catch (final EventBusException e) {
log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
}
}
@@ -1087,7 +1105,7 @@ public class DefaultSubscriptionDao extends EntityDaoBase<SubscriptionBundleMode
final SubscriptionBaseEvent nextEvent, final SubscriptionBaseTransitionType transitionType, final InternalCallContext context) {
try {
eventBus.postFromTransaction(new DefaultRequestedSubscriptionEvent(subscription, nextEvent, transitionType, context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken()), entitySqlDaoWrapperFactory.getHandle().getConnection());
- } catch (EventBusException e) {
+ } catch (final EventBusException e) {
log.warn("Failed to post requested change event for subscription " + subscription.getId(), e);
}
}
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/SubscriptionDao.java b/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/SubscriptionDao.java
index 12178d8..487fae3 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/SubscriptionDao.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/SubscriptionDao.java
@@ -69,7 +69,7 @@ public interface SubscriptionDao extends EntityDao<SubscriptionBundleModelDao, S
public void updateChargedThroughDate(DefaultSubscriptionBase subscription, InternalCallContext context);
// Event apis
- public void createNextPhaseEvent(DefaultSubscriptionBase subscription, SubscriptionBaseEvent nextPhase, InternalCallContext context);
+ public void createNextPhaseEvent(DefaultSubscriptionBase subscription, SubscriptionBaseEvent readyPhaseEvent, SubscriptionBaseEvent nextPhase, InternalCallContext context);
public SubscriptionBaseEvent getEventById(UUID eventId, InternalTenantContext context);
@@ -86,6 +86,8 @@ public interface SubscriptionDao extends EntityDao<SubscriptionBundleModelDao, S
public void recreateSubscription(DefaultSubscriptionBase subscription, List<SubscriptionBaseEvent> recreateEvents, InternalCallContext context);
+ public void cancelSubscriptionsOnBasePlanEvent(DefaultSubscriptionBase subscription, SubscriptionBaseEvent event, List<DefaultSubscriptionBase> subscriptions, List<SubscriptionBaseEvent> cancelEvents, InternalCallContext context);
+
public void cancelSubscriptions(List<DefaultSubscriptionBase> subscriptions, List<SubscriptionBaseEvent> cancelEvents, InternalCallContext context);
public void uncancelSubscription(DefaultSubscriptionBase subscription, List<SubscriptionBaseEvent> uncancelEvents, InternalCallContext context);
diff --git a/subscription/src/test/java/org/killbill/billing/subscription/engine/dao/MockSubscriptionDaoMemory.java b/subscription/src/test/java/org/killbill/billing/subscription/engine/dao/MockSubscriptionDaoMemory.java
index 57113b5..f7947f5 100644
--- a/subscription/src/test/java/org/killbill/billing/subscription/engine/dao/MockSubscriptionDaoMemory.java
+++ b/subscription/src/test/java/org/killbill/billing/subscription/engine/dao/MockSubscriptionDaoMemory.java
@@ -42,9 +42,11 @@ import org.killbill.billing.subscription.api.migration.AccountMigrationData;
import org.killbill.billing.subscription.api.migration.AccountMigrationData.BundleMigrationData;
import org.killbill.billing.subscription.api.migration.AccountMigrationData.SubscriptionMigrationData;
import org.killbill.billing.subscription.api.transfer.TransferCancelData;
+import org.killbill.billing.subscription.api.user.DefaultEffectiveSubscriptionEvent;
import org.killbill.billing.subscription.api.user.DefaultSubscriptionBase;
import org.killbill.billing.subscription.api.user.DefaultSubscriptionBaseBundle;
import org.killbill.billing.subscription.api.user.SubscriptionBaseBundle;
+import org.killbill.billing.subscription.api.user.SubscriptionBaseTransitionData;
import org.killbill.billing.subscription.api.user.SubscriptionBuilder;
import org.killbill.billing.subscription.engine.core.DefaultSubscriptionBaseService;
import org.killbill.billing.subscription.engine.core.SubscriptionNotificationKey;
@@ -57,6 +59,9 @@ import org.killbill.billing.util.entity.DefaultPagination;
import org.killbill.billing.util.entity.Pagination;
import org.killbill.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
import org.killbill.billing.util.entity.dao.MockEntityDaoBase;
+import org.killbill.bus.api.BusEvent;
+import org.killbill.bus.api.PersistentBus;
+import org.killbill.bus.api.PersistentBus.EventBusException;
import org.killbill.clock.Clock;
import org.killbill.notificationq.api.NotificationEvent;
import org.killbill.notificationq.api.NotificationQueue;
@@ -78,18 +83,21 @@ public class MockSubscriptionDaoMemory extends MockEntityDaoBase<SubscriptionBun
private final MockNonEntityDao mockNonEntityDao;
private final Clock clock;
private final NotificationQueueService notificationQueueService;
+ private final PersistentBus eventBus;
private final CatalogService catalogService;
@Inject
public MockSubscriptionDaoMemory(final MockNonEntityDao mockNonEntityDao,
final Clock clock,
final NotificationQueueService notificationQueueService,
+ final PersistentBus eventBus,
final CatalogService catalogService) {
super();
this.mockNonEntityDao = mockNonEntityDao;
this.clock = clock;
this.catalogService = catalogService;
this.notificationQueueService = notificationQueueService;
+ this.eventBus = eventBus;
this.bundles = new ArrayList<SubscriptionBaseBundle>();
this.subscriptions = new ArrayList<SubscriptionBase>();
this.events = new TreeSet<SubscriptionBaseEvent>();
@@ -305,9 +313,10 @@ public class MockSubscriptionDaoMemory extends MockEntityDaoBase<SubscriptionBun
}
@Override
- public void createNextPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent nextPhase, final InternalCallContext context) {
+ public void createNextPhaseEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent readyPhaseEvent, final SubscriptionBaseEvent nextPhase, final InternalCallContext context) {
cancelNextPhaseEvent(subscription.getId(), context);
insertEvent(nextPhase, context);
+ notifyBusOfEffectiveImmediateChange(subscription, readyPhaseEvent, 0, context);
}
private SubscriptionBase buildSubscription(final DefaultSubscriptionBase in, final InternalTenantContext context) {
@@ -341,6 +350,12 @@ public class MockSubscriptionDaoMemory extends MockEntityDaoBase<SubscriptionBun
}
@Override
+ public void cancelSubscriptionsOnBasePlanEvent(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent event, final List<DefaultSubscriptionBase> subscriptions, final List<SubscriptionBaseEvent> cancelEvents, final InternalCallContext context) {
+ cancelSubscriptions(subscriptions, cancelEvents, context);
+ notifyBusOfEffectiveImmediateChange(subscription, event, subscriptions.size(), context);
+ }
+
+ @Override
public void cancelSubscriptions(final List<DefaultSubscriptionBase> subscriptions, final List<SubscriptionBaseEvent> cancelEvents, final InternalCallContext context) {
synchronized (events) {
for (int i = 0; i < subscriptions.size(); i++) {
@@ -493,6 +508,21 @@ public class MockSubscriptionDaoMemory extends MockEntityDaoBase<SubscriptionBun
}
}
+ private void notifyBusOfEffectiveImmediateChange(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent immediateEvent, final int seqId, final InternalCallContext context) {
+ try {
+ final SubscriptionBaseTransitionData transition = subscription.getTransitionFromEvent(immediateEvent, seqId);
+ final BusEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition,
+ subscription.getAlignStartDate(),
+ context.getUserToken(),
+ context.getAccountRecordId(),
+ context.getTenantRecordId());
+
+ eventBus.post(busEvent);
+ } catch (final EventBusException e) {
+ log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
+ }
+ }
+
@Override
public Iterable<SubscriptionBaseEvent> getFutureEventsForAccount(final InternalTenantContext context) {
return null;