diff --git a/subscription/src/main/java/com/ning/billing/subscription/engine/dao/DefaultSubscriptionDao.java b/subscription/src/main/java/com/ning/billing/subscription/engine/dao/DefaultSubscriptionDao.java
index 7aec7aa..21edeeb 100644
--- a/subscription/src/main/java/com/ning/billing/subscription/engine/dao/DefaultSubscriptionDao.java
+++ b/subscription/src/main/java/com/ning/billing/subscription/engine/dao/DefaultSubscriptionDao.java
@@ -39,10 +39,20 @@ import org.slf4j.LoggerFactory;
import com.ning.billing.ErrorCode;
import com.ning.billing.bus.api.PersistentBus;
import com.ning.billing.bus.api.PersistentBus.EventBusException;
+import com.ning.billing.callcontext.InternalCallContext;
+import com.ning.billing.callcontext.InternalTenantContext;
import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.Plan;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.clock.Clock;
+import com.ning.billing.entity.EntityPersistenceException;
+import com.ning.billing.events.EffectiveSubscriptionInternalEvent;
+import com.ning.billing.events.RepairSubscriptionInternalEvent;
+import com.ning.billing.notificationq.api.NotificationEvent;
+import com.ning.billing.notificationq.api.NotificationQueue;
+import com.ning.billing.notificationq.api.NotificationQueueService;
+import com.ning.billing.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import com.ning.billing.subscription.api.SubscriptionBase;
import com.ning.billing.subscription.api.migration.AccountMigrationData;
import com.ning.billing.subscription.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.subscription.api.migration.AccountMigrationData.SubscriptionMigrationData;
@@ -54,13 +64,13 @@ import com.ning.billing.subscription.api.user.DefaultRequestedSubscriptionEvent;
import com.ning.billing.subscription.api.user.DefaultSubscriptionBase;
import com.ning.billing.subscription.api.user.DefaultSubscriptionBaseBundle;
import com.ning.billing.subscription.api.user.SubscriptionBaseBundle;
-import com.ning.billing.subscription.api.user.SubscriptionBuilder;
import com.ning.billing.subscription.api.user.SubscriptionBaseTransitionData;
+import com.ning.billing.subscription.api.user.SubscriptionBuilder;
import com.ning.billing.subscription.engine.addon.AddonUtils;
import com.ning.billing.subscription.engine.core.DefaultSubscriptionBaseService;
import com.ning.billing.subscription.engine.core.SubscriptionNotificationKey;
-import com.ning.billing.subscription.engine.dao.model.SubscriptionEventModelDao;
import com.ning.billing.subscription.engine.dao.model.SubscriptionBundleModelDao;
+import com.ning.billing.subscription.engine.dao.model.SubscriptionEventModelDao;
import com.ning.billing.subscription.engine.dao.model.SubscriptionModelDao;
import com.ning.billing.subscription.events.SubscriptionBaseEvent;
import com.ning.billing.subscription.events.SubscriptionBaseEvent.EventType;
@@ -72,27 +82,20 @@ import com.ning.billing.subscription.events.user.ApiEventChange;
import com.ning.billing.subscription.events.user.ApiEventMigrateBilling;
import com.ning.billing.subscription.events.user.ApiEventType;
import com.ning.billing.subscription.exceptions.SubscriptionBaseError;
-import com.ning.billing.notificationq.api.NotificationEvent;
-import com.ning.billing.notificationq.api.NotificationQueue;
-import com.ning.billing.notificationq.api.NotificationQueueService;
-import com.ning.billing.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
-import com.ning.billing.subscription.api.SubscriptionBase;
import com.ning.billing.util.cache.CacheControllerDispatcher;
-import com.ning.billing.callcontext.InternalCallContext;
-import com.ning.billing.callcontext.InternalTenantContext;
import com.ning.billing.util.dao.NonEntityDao;
-import com.ning.billing.entity.EntityPersistenceException;
import com.ning.billing.util.entity.dao.EntitySqlDao;
import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
-import com.ning.billing.events.EffectiveSubscriptionInternalEvent;
-import com.ning.billing.events.RepairSubscriptionInternalEvent;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
-
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
public class DefaultSubscriptionDao implements SubscriptionDao {
@@ -236,10 +239,9 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
return buildSubscription(shellSubscription, context);
}
-
@Override
public List<SubscriptionBase> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
- return buildBundleSubscriptions(bundleId, getSubscriptionFromBundleId(bundleId, context), context);
+ return buildBundleSubscriptions(getSubscriptionFromBundleId(bundleId, context), null, context);
}
private List<SubscriptionBase> getSubscriptionFromBundleId(final UUID bundleId, final InternalTenantContext context) {
@@ -257,13 +259,37 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
});
}
+
@Override
public Map<UUID, List<SubscriptionBase>> getSubscriptionsForAccount(final InternalTenantContext context) {
final Map<UUID, List<SubscriptionBase>> subscriptionsFromAccountId = getSubscriptionsFromAccountId(context);
+ final List<SubscriptionBaseEvent> eventsForAccount = getEventsForAccountId(context);
+
final Map<UUID, List<SubscriptionBase>> result = new HashMap<UUID, List<SubscriptionBase>>();
for (final UUID bundleId : subscriptionsFromAccountId.keySet()) {
- result.put(bundleId, buildBundleSubscriptions(bundleId, subscriptionsFromAccountId.get(bundleId), context));
+
+ final List<SubscriptionBase> subscriptionsForBundle = subscriptionsFromAccountId.get(bundleId);
+ final Collection<UUID> subscriptionIdsForBundle = Collections2.transform(subscriptionsForBundle, new Function<SubscriptionBase, UUID>() {
+ @Override
+ public UUID apply(final SubscriptionBase input) {
+ return input.getId();
+ }
+ });
+ final Multimap<UUID, SubscriptionBaseEvent> eventsForSubscriptions = ArrayListMultimap.create();
+
+ for (final SubscriptionBase cur : subscriptionsForBundle) {
+ final Collection<SubscriptionBaseEvent> events= Collections2.filter(eventsForAccount, new Predicate<SubscriptionBaseEvent>() {
+ @Override
+ public boolean apply(final SubscriptionBaseEvent input) {
+ return input.getSubscriptionId().equals(cur.getId());
+
+ }
+ });
+ eventsForSubscriptions.putAll(cur.getId(), ImmutableList.copyOf(events));
+ }
+
+ result.put(bundleId, buildBundleSubscriptions(subscriptionsForBundle, eventsForSubscriptions,context));
}
return result;
}
@@ -292,23 +318,6 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
return result;
}
- /*
- @Override
- public List<SubscriptionBase> getSubscriptionsForAccountAndKey(final UUID accountId,
- final String bundleKey, final InternalTenantContext callcontext) {
- return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<SubscriptionBase>>() {
- @Override
- public List<SubscriptionBase> inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
- final SubscriptionBundleModelDao bundleModel = entitySqlDaoWrapperFactory.become(BundleSqlDao.class).getBundlesFromAccountAndKey(accountId.toString(), bundleKey, callcontext);
- if (bundleModel == null) {
- return Collections.emptyList();
- }
- return getSubscriptions(bundleModel.getId(), callcontext);
- }
- });
- }
- */
-
@Override
public void updateChargedThroughDate(final DefaultSubscriptionBase subscription, final InternalCallContext context) {
final Date ctd = (subscription.getChargedThroughDate() != null) ? subscription.getChargedThroughDate().toDate() : null;
@@ -365,23 +374,12 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
@Override
public List<SubscriptionBaseEvent> inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
final List<SubscriptionEventModelDao> models = entitySqlDaoWrapperFactory.become(SubscriptionEventSqlDao.class).getEventsForSubscription(subscriptionId.toString(), context);
- // Remove UNCANCEL events early on as they are not representative of a state transition but are just markers
- final Collection<SubscriptionEventModelDao> filteredModels = Collections2.filter(models, new Predicate<SubscriptionEventModelDao>() {
- @Override
- public boolean apply(@Nullable final SubscriptionEventModelDao input) {
- return input.getUserType() != ApiEventType.UNCANCEL;
- }
- });
- return new ArrayList<SubscriptionBaseEvent>(Collections2.transform(filteredModels, new Function<SubscriptionEventModelDao, SubscriptionBaseEvent>() {
- @Override
- public SubscriptionBaseEvent apply(@Nullable final SubscriptionEventModelDao input) {
- return SubscriptionEventModelDao.toSubscriptionEvent(input);
- }
- }));
+ return filterSubscriptionBaseEvents(models);
}
});
}
+
@Override
public Map<UUID, List<SubscriptionBaseEvent>> getEventsForBundle(final UUID bundleId, final InternalTenantContext context) {
return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Map<UUID, List<SubscriptionBaseEvent>>>() {
@@ -455,7 +453,6 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
});
}
-
@Override
public void recreateSubscription(final DefaultSubscriptionBase subscription, final List<SubscriptionBaseEvent> recreateEvents, final InternalCallContext context) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -494,7 +491,6 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
});
}
-
@Override
public void cancelSubscription(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent cancelEvent, final InternalCallContext context, final int seqId) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -557,9 +553,9 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
final UUID subscriptionId = subscription.getId();
final List<SubscriptionBaseEvent> changeEventsTweakedWithMigrateBilling = reinsertFutureMigrateBillingEventOnChangeFromTransaction(subscriptionId,
- changeEvents,
- entitySqlDaoWrapperFactory,
- context);
+ changeEvents,
+ entitySqlDaoWrapperFactory,
+ context);
cancelFutureEventsFromTransaction(subscriptionId, entitySqlDaoWrapperFactory, context);
@@ -669,6 +665,31 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
return changeEvents;
}
+ private List<SubscriptionBaseEvent> filterSubscriptionBaseEvents(final List<SubscriptionEventModelDao> models) {
+ final Collection<SubscriptionEventModelDao> filteredModels = Collections2.filter(models, new Predicate<SubscriptionEventModelDao>() {
+ @Override
+ public boolean apply(@Nullable final SubscriptionEventModelDao input) {
+ return input.getUserType() != ApiEventType.UNCANCEL;
+ }
+ });
+ return new ArrayList<SubscriptionBaseEvent>(Collections2.transform(filteredModels, new Function<SubscriptionEventModelDao, SubscriptionBaseEvent>() {
+ @Override
+ public SubscriptionBaseEvent apply(@Nullable final SubscriptionEventModelDao input) {
+ return SubscriptionEventModelDao.toSubscriptionEvent(input);
+ }
+ }));
+ }
+
+ private List<SubscriptionBaseEvent> getEventsForAccountId(final InternalTenantContext context) {
+ return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<SubscriptionBaseEvent>>() {
+ @Override
+ public List<SubscriptionBaseEvent> inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+ final List<SubscriptionEventModelDao> models = entitySqlDaoWrapperFactory.become(SubscriptionEventSqlDao.class).getByAccountRecordId(context);
+ return filterSubscriptionBaseEvents(models);
+ }
+ });
+ }
+
private void cancelSubscriptionFromTransaction(final DefaultSubscriptionBase subscription, final SubscriptionBaseEvent cancelEvent, final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final InternalCallContext context, final int seqId)
throws EntityPersistenceException {
final UUID subscriptionId = subscription.getId();
@@ -701,7 +722,7 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
}
private SubscriptionEventModelDao findFutureEventFromTransaction(final UUID subscriptionId, final EntitySqlDaoWrapperFactory<EntitySqlDao> dao, final EventType type,
- @Nullable final ApiEventType apiType, final InternalCallContext context) {
+ @Nullable final ApiEventType apiType, final InternalCallContext context) {
SubscriptionEventModelDao futureEvent = null;
final Date now = clock.getUTCNow().toDate();
@@ -711,7 +732,7 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
(apiType == null || apiType == cur.getUserType())) {
if (futureEvent != null) {
throw new SubscriptionBaseError(String.format("Found multiple future events for type %s for subscriptions %s",
- type, subscriptionId.toString()));
+ type, subscriptionId.toString()));
}
futureEvent = cur;
// To check that there is only one such event
@@ -746,7 +767,7 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
bundleInput.add(input);
}
- final List<SubscriptionBase> reloadedSubscriptions = buildBundleSubscriptions(input.getBundleId(), bundleInput, context);
+ final List<SubscriptionBase> reloadedSubscriptions = buildBundleSubscriptions(bundleInput, null, context);
for (final SubscriptionBase cur : reloadedSubscriptions) {
if (cur.getId().equals(input.getId())) {
return cur;
@@ -756,7 +777,7 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
throw new SubscriptionBaseError("Unexpected code path in buildSubscription");
}
- private List<SubscriptionBase> buildBundleSubscriptions(final UUID bundleId, final List<SubscriptionBase> input, final InternalTenantContext context) {
+ private List<SubscriptionBase> buildBundleSubscriptions(final List<SubscriptionBase> input, @Nullable final Multimap<UUID, SubscriptionBaseEvent> eventsForSubscription, final InternalTenantContext context) {
if (input == null || input.size() == 0) {
return Collections.emptyList();
}
@@ -778,7 +799,12 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
SubscriptionBaseEvent futureBaseEvent = null;
final List<SubscriptionBase> result = new ArrayList<SubscriptionBase>(input.size());
for (final SubscriptionBase cur : input) {
- final List<SubscriptionBaseEvent> events = getEventsForSubscription(cur.getId(), context);
+
+
+ final List<SubscriptionBaseEvent> events = eventsForSubscription != null ?
+ (List<SubscriptionBaseEvent>) eventsForSubscription.get(cur.getId()) :
+ getEventsForSubscription(cur.getId(), context);
+
SubscriptionBase reloaded = createSubscriptionForInternalUse(cur, events);
switch (cur.getCategory()) {
@@ -806,15 +832,15 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
if (createCancelEvent && reloaded.getFutureEndDate() == null) {
final DateTime now = clock.getUTCNow();
final SubscriptionBaseEvent addOnCancelEvent = new ApiEventCancel(new ApiEventBuilder()
- .setSubscriptionId(reloaded.getId())
- .setActiveVersion(((DefaultSubscriptionBase) reloaded).getActiveVersion())
- .setProcessedDate(now)
- .setEffectiveDate(futureBaseEvent.getEffectiveDate())
- .setRequestedDate(now)
- .setCreatedDate(futureBaseEvent.getCreatedDate())
- // This event is only there to indicate the ADD_ON is future canceled, but it is not there
- // on disk until the base plan cancellation becomes effective
- .setFromDisk(false));
+ .setSubscriptionId(reloaded.getId())
+ .setActiveVersion(((DefaultSubscriptionBase) reloaded).getActiveVersion())
+ .setProcessedDate(now)
+ .setEffectiveDate(futureBaseEvent.getEffectiveDate())
+ .setRequestedDate(now)
+ .setCreatedDate(futureBaseEvent.getCreatedDate())
+ // This event is only there to indicate the ADD_ON is future canceled, but it is not there
+ // on disk until the base plan cancellation becomes effective
+ .setFromDisk(false));
events.add(addOnCancelEvent);
// Finally reload subscription with full set of events
@@ -873,7 +899,7 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
try {
// Note: we don't send a requested change event here, but a repair event
final RepairSubscriptionInternalEvent busEvent = new DefaultRepairSubscriptionEvent(accountId, bundleId, clock.getUTCNow(),
- context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken());
+ context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken());
eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory.getSqlDao());
} catch (EventBusException e) {
log.warn("Failed to post repair subscription event for bundle " + bundleId, e);
@@ -922,7 +948,6 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
return null;
}
-
//
// Either records a notfication or sends a bus event is operation is immediate
//
@@ -953,7 +978,6 @@ public class DefaultSubscriptionDao implements SubscriptionDao {
context.getUserToken(),
context.getAccountRecordId(), context.getTenantRecordId());
-
eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory.getSqlDao());
} catch (EventBusException e) {
log.warn("Failed to post effective event for subscription " + subscription.getId(), e);