killbill-aplcache

entitlement: refactor DefaultEntitlementApi Introduce

11/8/2013 1:22:11 PM

Details

diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultEntitlement.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultEntitlement.java
index 98cbb44..bcd3e0f 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultEntitlement.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultEntitlement.java
@@ -47,6 +47,7 @@ import com.ning.billing.entitlement.dao.BlockingStateDao;
 import com.ning.billing.entitlement.engine.core.EntitlementNotificationKey;
 import com.ning.billing.entitlement.engine.core.EntitlementNotificationKeyAction;
 import com.ning.billing.entitlement.engine.core.EntitlementUtils;
+import com.ning.billing.entitlement.engine.core.EventsStream;
 import com.ning.billing.entity.EntityBase;
 import com.ning.billing.junction.DefaultBlockingState;
 import com.ning.billing.notificationq.api.NotificationEvent;
@@ -84,6 +85,15 @@ public class DefaultEntitlement extends EntityBase implements Entitlement {
     protected String externalKey;
     protected DateTimeZone accountTimeZone;
 
+    public DefaultEntitlement(final EntitlementDateHelper dateHelper, final EventsStream eventsStream,
+                              final AccountInternalApi accountApi, final EntitlementApi entitlementApi, final SubscriptionBaseInternalApi subscriptionInternalApi, final InternalCallContextFactory internalCallContextFactory,
+                              final BlockingStateDao blockingStateDao, final Clock clock, final BlockingChecker checker, final NotificationQueueService notificationQueueService,
+                              final EntitlementUtils entitlementUtils) {
+        this(dateHelper, eventsStream.getSubscription(), eventsStream.getAccount().getId(), eventsStream.getBundle().getExternalKey(),
+             eventsStream.getEntitlementState(), eventsStream.getEntitlementEffectiveEndDate(), eventsStream.getAccount().getTimeZone(),
+             accountApi, entitlementApi, subscriptionInternalApi, internalCallContextFactory, blockingStateDao, clock, checker, notificationQueueService, entitlementUtils);
+    }
+
     public DefaultEntitlement(final EntitlementDateHelper dateHelper, final SubscriptionBase subscriptionBase, final UUID accountId,
                               final String externalKey, final EntitlementState state, final LocalDate effectiveEndDate, final DateTimeZone accountTimeZone,
                               final AccountInternalApi accountApi, final EntitlementApi entitlementApi, final SubscriptionBaseInternalApi subscriptionInternalApi, final InternalCallContextFactory internalCallContextFactory,
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultEntitlementApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultEntitlementApi.java
index c3cc501..a39d224 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultEntitlementApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultEntitlementApi.java
@@ -16,17 +16,14 @@
 
 package com.ning.billing.entitlement.api;
 
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import javax.annotation.Nullable;
 import javax.inject.Inject;
 
 import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
 import org.joda.time.LocalDate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,15 +39,15 @@ import com.ning.billing.callcontext.InternalCallContext;
 import com.ning.billing.callcontext.InternalTenantContext;
 import com.ning.billing.catalog.api.BillingActionPolicy;
 import com.ning.billing.catalog.api.PlanPhaseSpecifier;
-import com.ning.billing.catalog.api.ProductCategory;
 import com.ning.billing.clock.Clock;
 import com.ning.billing.entitlement.EntitlementService;
 import com.ning.billing.entitlement.EntitlementTransitionType;
 import com.ning.billing.entitlement.api.Entitlement.EntitlementState;
 import com.ning.billing.entitlement.block.BlockingChecker;
-import com.ning.billing.entitlement.block.BlockingChecker.BlockingAggregator;
 import com.ning.billing.entitlement.dao.BlockingStateDao;
 import com.ning.billing.entitlement.engine.core.EntitlementUtils;
+import com.ning.billing.entitlement.engine.core.EventsStream;
+import com.ning.billing.entitlement.engine.core.EventsStreamBuilder;
 import com.ning.billing.junction.DefaultBlockingState;
 import com.ning.billing.notificationq.api.NotificationQueueService;
 import com.ning.billing.subscription.api.SubscriptionBase;
@@ -64,8 +61,10 @@ import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.TenantContext;
 
 import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 
 public class DefaultEntitlementApi implements EntitlementApi {
 
@@ -84,14 +83,16 @@ public class DefaultEntitlementApi implements EntitlementApi {
     private final BlockingStateDao blockingStateDao;
     private final EntitlementDateHelper dateHelper;
     private final PersistentBus eventBus;
+    private final EventsStreamBuilder eventsStreamBuilder;
+    private final EntitlementUtils entitlementUtils;
     protected final NotificationQueueService notificationQueueService;
-    protected final EntitlementUtils entitlementUtils;
 
     @Inject
     public DefaultEntitlementApi(final PersistentBus eventBus, final InternalCallContextFactory internalCallContextFactory,
                                  final SubscriptionBaseTransferApi subscriptionTransferApi, final SubscriptionBaseInternalApi subscriptionInternalApi,
                                  final AccountInternalApi accountApi, final BlockingStateDao blockingStateDao, final Clock clock,
-                                 final BlockingChecker checker, final NotificationQueueService notificationQueueService, final EntitlementUtils entitlementUtils) {
+                                 final BlockingChecker checker, final NotificationQueueService notificationQueueService,
+                                 final EventsStreamBuilder eventsStreamBuilder, final EntitlementUtils entitlementUtils) {
         this.eventBus = eventBus;
         this.internalCallContextFactory = internalCallContextFactory;
         this.subscriptionInternalApi = subscriptionInternalApi;
@@ -101,6 +102,7 @@ public class DefaultEntitlementApi implements EntitlementApi {
         this.checker = checker;
         this.blockingStateDao = blockingStateDao;
         this.notificationQueueService = notificationQueueService;
+        this.eventsStreamBuilder = eventsStreamBuilder;
         this.entitlementUtils = entitlementUtils;
         this.dateHelper = new EntitlementDateHelper(accountApi, clock);
     }
@@ -126,47 +128,33 @@ public class DefaultEntitlementApi implements EntitlementApi {
 
     @Override
     public Entitlement addEntitlement(final UUID bundleId, final PlanPhaseSpecifier planPhaseSpecifier, final LocalDate effectiveDate, final CallContext callContext) throws EntitlementApiException {
-        final InternalCallContext context = internalCallContextFactory.createInternalCallContext(callContext);
-        try {
-            final SubscriptionBaseBundle bundle = subscriptionInternalApi.getBundleFromId(bundleId, context);
-            final SubscriptionBase baseSubscription = subscriptionInternalApi.getBaseSubscription(bundleId, context);
-
-
-            final InternalCallContext contextWithValidAccountRecordId = internalCallContextFactory.createInternalCallContext(bundle.getAccountId(), callContext);
+        final EventsStream eventsStreamForBaseSubscription = eventsStreamBuilder.buildForBaseSubscription(bundleId, callContext);
 
-            final Account account = accountApi.getAccountById(bundle.getAccountId(), context);
-            final LocalDate baseEntitlementEffectiveEndDate = getEffectiveEndDate(bundle.getAccountId(), baseSubscription, account.getTimeZone(), contextWithValidAccountRecordId);
-            // Check if there is a BP and if it is active
-            final EntitlementState baseEntitlementState = getStateForEntitlement(baseEntitlementEffectiveEndDate, baseSubscription, account.getTimeZone(), contextWithValidAccountRecordId);
-            if (baseSubscription.getCategory() != ProductCategory.BASE ||
-                baseEntitlementState != EntitlementState.ACTIVE) {
-                throw new EntitlementApiException(ErrorCode.SUB_GET_NO_SUCH_BASE_SUBSCRIPTION, baseSubscription.getBundleId());
-            }
+        // Check the base entitlement state is active
+        if (!eventsStreamForBaseSubscription.isEntitlementActive()) {
+            throw new EntitlementApiException(ErrorCode.SUB_GET_NO_SUCH_BASE_SUBSCRIPTION, bundleId);
+        }
 
-            // Check if BP is blockedChange
-            final BlockingAggregator blocking = checker.getBlockedStatus(baseSubscription, contextWithValidAccountRecordId);
-            if (blocking.isBlockChange()) {
-                throw new EntitlementApiException(new BlockingApiException(ErrorCode.BLOCK_BLOCKED_ACTION, BlockingChecker.ACTION_CHANGE, BlockingChecker.TYPE_SUBSCRIPTION, baseSubscription.getId().toString()));
-            }
+        // Check the base entitlement state is not blocked
+        if (eventsStreamForBaseSubscription.getCurrentBlockingAggregator().isBlockChange()) {
+            throw new EntitlementApiException(new BlockingApiException(ErrorCode.BLOCK_BLOCKED_ACTION, BlockingChecker.ACTION_CHANGE, BlockingChecker.TYPE_SUBSCRIPTION, eventsStreamForBaseSubscription.getSubscription().getId().toString()));
+        }
 
+        final DateTime requestedDate = dateHelper.fromLocalDateAndReferenceTime(effectiveDate, eventsStreamForBaseSubscription.getSubscription().getStartDate(), eventsStreamForBaseSubscription.getInternalTenantContext());
 
-            final DateTime requestedDate = dateHelper.fromLocalDateAndReferenceTime(effectiveDate, baseSubscription.getStartDate(), contextWithValidAccountRecordId);
-            final SubscriptionBase subscription = subscriptionInternalApi.createSubscription(baseSubscription.getBundleId(), planPhaseSpecifier, requestedDate, context);
+        try {
+            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(callContext);
+            final SubscriptionBase subscription = subscriptionInternalApi.createSubscription(bundleId, planPhaseSpecifier, requestedDate, context);
 
-            return new DefaultEntitlement(dateHelper, subscription, bundle.getAccountId(), bundle.getExternalKey(), EntitlementState.ACTIVE, null, account.getTimeZone(),
+            return new DefaultEntitlement(dateHelper, subscription, eventsStreamForBaseSubscription.getAccount().getId(), eventsStreamForBaseSubscription.getBundle().getExternalKey(), EntitlementState.ACTIVE, null, eventsStreamForBaseSubscription.getAccount().getTimeZone(),
                                           accountApi, this, subscriptionInternalApi, internalCallContextFactory, blockingStateDao, clock, checker, notificationQueueService, entitlementUtils);
         } catch (SubscriptionBaseApiException e) {
             throw new EntitlementApiException(e);
-        } catch (BlockingApiException e) {
-            throw new EntitlementApiException(e);
-        } catch (AccountApiException e) {
-            throw new EntitlementApiException(e);
         }
     }
 
     @Override
     public List<EntitlementAOStatusDryRun> getDryRunStatusForChange(final UUID bundleId, final String targetProductName, final LocalDate effectiveDate, final TenantContext context) throws EntitlementApiException {
-
         final InternalTenantContext internalContext = internalCallContextFactory.createInternalTenantContext(context);
         try {
             final SubscriptionBaseBundle bundle = subscriptionInternalApi.getBundleFromId(bundleId, internalContext);
@@ -182,162 +170,56 @@ public class DefaultEntitlementApi implements EntitlementApi {
 
     @Override
     public Entitlement getEntitlementForId(final UUID uuid, final TenantContext tenantContext) throws EntitlementApiException {
-        final InternalTenantContext context = internalCallContextFactory.createInternalTenantContext(tenantContext);
-        try {
-            final SubscriptionBase subscription = subscriptionInternalApi.getSubscriptionFromId(uuid, context);
-            final SubscriptionBaseBundle bundle = subscriptionInternalApi.getBundleFromId(subscription.getBundleId(), context);
-
-            final Account account = accountApi.getAccountById(bundle.getAccountId(), context);
-
-            final InternalTenantContext contextWithValidAccountRecordId = internalCallContextFactory.createInternalTenantContext(account.getId(), tenantContext);
-            final LocalDate entitlementEffectiveEndDate = getEffectiveEndDate(bundle.getAccountId(), subscription, account.getTimeZone(), contextWithValidAccountRecordId);
-            final EntitlementState entitlementState = getStateForEntitlement(entitlementEffectiveEndDate, subscription, account.getTimeZone(), contextWithValidAccountRecordId);
-
-
-            return new DefaultEntitlement(dateHelper, subscription, bundle.getAccountId(), bundle.getExternalKey(), entitlementState, entitlementEffectiveEndDate, account.getTimeZone(),
-                                          accountApi, this, subscriptionInternalApi, internalCallContextFactory, blockingStateDao, clock, checker, notificationQueueService, entitlementUtils);
-        } catch (SubscriptionBaseApiException e) {
-            throw new EntitlementApiException(e);
-        } catch (AccountApiException e) {
-            throw new EntitlementApiException(e);
-        }
+        final EventsStream eventsStream = eventsStreamBuilder.buildForEntitlement(uuid, tenantContext);
+        return new DefaultEntitlement(dateHelper, eventsStream, accountApi, this, subscriptionInternalApi, internalCallContextFactory,
+                                      blockingStateDao, clock, checker, notificationQueueService, entitlementUtils);
     }
 
     @Override
     public List<Entitlement> getAllEntitlementsForBundle(final UUID bundleId, final TenantContext tenantContext) throws EntitlementApiException {
         final InternalTenantContext context = internalCallContextFactory.createInternalTenantContext(tenantContext);
-        try {
-            final SubscriptionBaseBundle bundle = subscriptionInternalApi.getBundleFromId(bundleId, context);
-            final Account account = accountApi.getAccountById(bundle.getAccountId(), context);
-
-            final InternalTenantContext contextWithValidAccountRecordId = internalCallContextFactory.createInternalTenantContext(account.getId(), tenantContext);
-            return getAllEntitlementsForBundleId(bundleId, bundle.getAccountId(), account.getTimeZone(), bundle.getExternalKey(), contextWithValidAccountRecordId);
-        } catch (SubscriptionBaseApiException e) {
-            throw new EntitlementApiException(e);
-        } catch (AccountApiException e) {
-            throw new EntitlementApiException(e);
-        }
+        return getAllEntitlementsForBundle(subscriptionInternalApi.getSubscriptionsForBundle(bundleId, context), tenantContext);
     }
 
     @Override
     public List<Entitlement> getAllEntitlementsForAccountIdAndExternalKey(final UUID accountId, final String externalKey, final TenantContext tenantContext) throws EntitlementApiException {
-        final InternalTenantContext context = internalCallContextFactory.createInternalTenantContext(accountId, tenantContext);
-        try {
-            final List<SubscriptionBaseBundle> bundles = subscriptionInternalApi.getBundlesForAccountAndKey(accountId, externalKey, context);
-            if (bundles.size() == 0) {
-                return Collections.emptyList();
-            }
-            final Account account = accountApi.getAccountById(bundles.get(0).getAccountId(), context);
-            return getEntitlementsForBundles(bundles, account, context);
-
-        } catch (SubscriptionBaseApiException e) {
-            throw new EntitlementApiException(e);
-        } catch (AccountApiException e) {
-            throw new EntitlementApiException(e);
-        }
+        // getAllEntitlementsForAccountId should be fast (uses account_record_id)
+        return ImmutableList.<Entitlement>copyOf(Iterables.<Entitlement>filter(getAllEntitlementsForAccountId(accountId, tenantContext),
+                                                                               new Predicate<Entitlement>() {
+                                                                                   @Override
+                                                                                   public boolean apply(final Entitlement input) {
+                                                                                       return externalKey.equals(input.getExternalKey());
+                                                                                   }
+                                                                               }));
     }
 
     @Override
     public List<Entitlement> getAllEntitlementsForAccountId(final UUID accountId, final TenantContext tenantContext) throws EntitlementApiException {
         final InternalTenantContext context = internalCallContextFactory.createInternalTenantContext(accountId, tenantContext);
-        final Account account;
-        try {
-            account = accountApi.getAccountById(accountId, context);
-        } catch (AccountApiException e) {
-            throw new EntitlementApiException(e);
-        }
-        final List<SubscriptionBaseBundle> bundles = subscriptionInternalApi.getBundlesForAccount(account.getId(), context);
-        return getEntitlementsForBundles(bundles, account, context);
-    }
-
-    private List<Entitlement> getEntitlementsForBundles(final List<SubscriptionBaseBundle> bundles, final Account account, final InternalTenantContext context) {
         final Map<UUID, List<SubscriptionBase>> subscriptionsPerBundle = subscriptionInternalApi.getSubscriptionsForAccount(context);
+
         final List<Entitlement> result = new LinkedList<Entitlement>();
-        for (final SubscriptionBaseBundle bundle : bundles) {
-            final List<Entitlement> entitlements = getAllEntitlementsForBundleId(bundle.getAccountId(), account.getTimeZone(), bundle.getExternalKey(), subscriptionsPerBundle.get(bundle.getId()), context);
+        for (final UUID bundleId : subscriptionsPerBundle.keySet()) {
+            final List<Entitlement> entitlements = getAllEntitlementsForBundle(subscriptionsPerBundle.get(bundleId), tenantContext);
             result.addAll(entitlements);
         }
         return result;
     }
 
-    private List<Entitlement> getAllEntitlementsForBundleId(final UUID bundleId, final UUID accountId, final DateTimeZone accountTimeZone, final String externalKey, final InternalTenantContext context) throws EntitlementApiException {
-        final List<SubscriptionBase> subscriptions = subscriptionInternalApi.getSubscriptionsForBundle(bundleId, context);
-        return getAllEntitlementsForBundleId(accountId, accountTimeZone, externalKey, subscriptions, context);
+    private List<Entitlement> getAllEntitlementsForBundle(final List<SubscriptionBase> subscriptions, final TenantContext context) {
+        return Lists.transform(subscriptions,
+                               new Function<SubscriptionBase, Entitlement>() {
+                                   @Override
+                                   public Entitlement apply(final SubscriptionBase input) {
+                                       try {
+                                           return getEntitlementForId(input.getId(), context);
+                                       } catch (EntitlementApiException e) {
+                                           throw new RuntimeException("Failed to extract blocking state for subscription " + input.getId().toString());
+                                       }
+                                   }
+                               });
     }
 
-    private List<Entitlement> getAllEntitlementsForBundleId(final UUID accountId, final DateTimeZone accountTimeZone, final String externalKey, final List<SubscriptionBase> subscriptions, final InternalTenantContext context) {
-        final EntitlementApi thisEntitlementApi = this;
-        return ImmutableList.<Entitlement>copyOf(Collections2.transform(subscriptions, new Function<SubscriptionBase, Entitlement>() {
-            @Nullable
-            @Override
-            public Entitlement apply(@Nullable final SubscriptionBase input) {
-
-                final LocalDate effectiveEndDate = getEffectiveEndDate(accountId, input, accountTimeZone, context);
-
-                EntitlementState entitlementState;
-                try {
-                    entitlementState = getStateForEntitlement(effectiveEndDate, input, accountTimeZone, context);
-                } catch (EntitlementApiException e) {
-                    log.warn("Failed to extract blocking state for subscription " + input.getId().toString());
-                    entitlementState = EntitlementState.CANCELLED;
-                }
-
-                return new DefaultEntitlement(dateHelper, input, accountId, externalKey,
-                                              entitlementState,
-                                              effectiveEndDate,
-                                              accountTimeZone,
-                                              accountApi, thisEntitlementApi,
-                                              subscriptionInternalApi, internalCallContextFactory,
-                                              blockingStateDao, clock, checker, notificationQueueService, entitlementUtils);
-            }
-        }));
-    }
-
-    private LocalDate getEffectiveEndDate(final UUID accountId, final SubscriptionBase subscriptionBase, final DateTimeZone accountTimeZone, final InternalTenantContext context) {
-
-        LocalDate result = null;
-        BlockingState lastEntry = null;
-
-        final List<BlockingState> subEntitlementState = blockingStateDao.getBlockingHistoryForService(subscriptionBase.getId(), EntitlementService.ENTITLEMENT_SERVICE_NAME, context);
-        lastEntry = (subEntitlementState.size() > 0) ? subEntitlementState.get(subEntitlementState.size() - 1) : null;
-        if (lastEntry != null && ENT_STATE_CANCELLED.equals(lastEntry.getStateName())) {
-            result = new LocalDate(lastEntry.getEffectiveDate(), accountTimeZone);
-        }
-
-        final List<BlockingState> bundleEntitlementState = blockingStateDao.getBlockingHistoryForService(subscriptionBase.getBundleId(), EntitlementService.ENTITLEMENT_SERVICE_NAME, context);
-        lastEntry = (bundleEntitlementState.size() > 0) ? bundleEntitlementState.get(bundleEntitlementState.size() - 1) : null;
-        if (lastEntry != null && ENT_STATE_CANCELLED.equals(lastEntry.getStateName())) {
-            final LocalDate localDate = new LocalDate(lastEntry.getEffectiveDate(), accountTimeZone);
-            result = ((result == null) || (localDate.compareTo(result) < 0)) ? localDate : result;
-        }
-
-        final List<BlockingState> accountEntitlementState = blockingStateDao.getBlockingHistoryForService(accountId, EntitlementService.ENTITLEMENT_SERVICE_NAME, context);
-        lastEntry = (accountEntitlementState.size() > 0) ? accountEntitlementState.get(accountEntitlementState.size() - 1) : null;
-        if (lastEntry != null && ENT_STATE_CANCELLED.equals(lastEntry.getStateName())) {
-            final LocalDate localDate = new LocalDate(lastEntry.getEffectiveDate(), accountTimeZone);
-            result = ((result == null) || (localDate.compareTo(result) < 0)) ? localDate : result;
-        }
-        return result;
-    }
-
-    private EntitlementState getStateForEntitlement(final LocalDate entitlementEndDate, final SubscriptionBase subscriptionBase, final DateTimeZone accountTimeZone, final InternalTenantContext context) throws EntitlementApiException {
-
-        // Current state for the ENTITLEMENT_SERVICE_NAME is set to cancelled
-        if (entitlementEndDate != null &&
-            entitlementEndDate.compareTo(new LocalDate(clock.getUTCNow(), accountTimeZone)) <= 0) {
-            return EntitlementState.CANCELLED;
-        }
-
-        try {
-            // Gather states across all services and check if one of them is set to 'blockEntitlement'
-            BlockingAggregator blocking = checker.getBlockedStatus(subscriptionBase, context);
-            return blocking != null && blocking.isBlockEntitlement() ? EntitlementState.BLOCKED : EntitlementState.ACTIVE;
-        } catch (BlockingApiException e) {
-            throw new EntitlementApiException(e);
-        }
-    }
-
-
     @Override
     public void pause(final UUID bundleId, final LocalDate localEffectiveDate, final CallContext context) throws EntitlementApiException {
         try {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventsStream.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventsStream.java
new file mode 100644
index 0000000..acd9558
--- /dev/null
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventsStream.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2010-2013 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.entitlement.engine.core;
+
+import java.util.List;
+
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+
+import com.ning.billing.account.api.Account;
+import com.ning.billing.callcontext.InternalTenantContext;
+import com.ning.billing.entitlement.EntitlementService;
+import com.ning.billing.entitlement.api.BlockingState;
+import com.ning.billing.entitlement.api.DefaultEntitlementApi;
+import com.ning.billing.entitlement.api.Entitlement.EntitlementState;
+import com.ning.billing.entitlement.block.BlockingChecker.BlockingAggregator;
+import com.ning.billing.subscription.api.SubscriptionBase;
+import com.ning.billing.subscription.api.SubscriptionBaseTransitionType;
+import com.ning.billing.subscription.api.user.SubscriptionBaseBundle;
+import com.ning.billing.subscription.api.user.SubscriptionBaseTransition;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class EventsStream {
+
+    private final Account account;
+    private final SubscriptionBaseBundle bundle;
+    private final List<BlockingState> subscriptionEntitlementStates;
+    private final List<BlockingState> bundleEntitlementStates;
+    private final List<BlockingState> accountEntitlementStates;
+    private final BlockingAggregator blockingAggregator;
+    private final SubscriptionBase subscription;
+    private final InternalTenantContext internalTenantContext;
+    private final DateTime utcNow;
+
+    private LocalDate entitlementEffectiveEndDate;
+    private BlockingState entitlementCancelEvent;
+    private EntitlementState entitlementState;
+
+    public EventsStream(final Account account, final SubscriptionBaseBundle bundle,
+                        final List<BlockingState> subscriptionEntitlementStates, final List<BlockingState> bundleEntitlementStates,
+                        final List<BlockingState> accountEntitlementStates, final BlockingAggregator blockingAggregator,
+                        final SubscriptionBase subscription, final InternalTenantContext contextWithValidAccountRecordId, final DateTime utcNow) {
+        this.account = account;
+        this.bundle = bundle;
+        this.subscriptionEntitlementStates = subscriptionEntitlementStates;
+        this.bundleEntitlementStates = bundleEntitlementStates;
+        this.accountEntitlementStates = accountEntitlementStates;
+        this.blockingAggregator = blockingAggregator;
+        this.subscription = subscription;
+        this.internalTenantContext = contextWithValidAccountRecordId;
+        this.utcNow = utcNow;
+
+        setup();
+    }
+
+    public Account getAccount() {
+        return account;
+    }
+
+    public SubscriptionBaseBundle getBundle() {
+        return bundle;
+    }
+
+    public SubscriptionBase getSubscription() {
+        return subscription;
+    }
+
+    public InternalTenantContext getInternalTenantContext() {
+        return internalTenantContext;
+    }
+
+    public LocalDate getEntitlementEffectiveEndDate() {
+        return entitlementEffectiveEndDate;
+    }
+
+    public BlockingState getEntitlementCancelEvent() {
+        return entitlementCancelEvent;
+    }
+
+    public EntitlementState getEntitlementState() {
+        return entitlementState;
+    }
+
+    public BlockingAggregator getCurrentBlockingAggregator() {
+        return blockingAggregator;
+    }
+
+    public boolean isFutureEntitlementCancelled() {
+        return entitlementCancelEvent != null && entitlementCancelEvent.getEffectiveDate().isAfter(utcNow);
+    }
+
+    public boolean isEntitlementActive() {
+        return entitlementState == EntitlementState.ACTIVE;
+    }
+
+    public boolean isEntitlementCancelled() {
+        return entitlementState == EntitlementState.CANCELLED;
+    }
+
+    public SubscriptionBaseTransition getPendingSubscriptionEvents(final SubscriptionBaseTransitionType... types) {
+        final List<SubscriptionBaseTransitionType> typeList = ImmutableList.<SubscriptionBaseTransitionType>copyOf(types);
+        return Iterables.<SubscriptionBaseTransition>tryFind(subscription.getAllTransitions(),
+                                                             new Predicate<SubscriptionBaseTransition>() {
+                                                                 @Override
+                                                                 public boolean apply(final SubscriptionBaseTransition input) {
+                                                                     return input.getEffectiveTransitionTime().isAfter(utcNow) && typeList.contains(input.getTransitionType());
+                                                                 }
+                                                             }).orNull();
+    }
+
+    private void setup() {
+        computeEntitlementEffectiveEndDate();
+        computeEntitlementCancelEvent();
+        computeStateForEntitlement();
+    }
+
+    private void computeEntitlementEffectiveEndDate() {
+        LocalDate result = null;
+        BlockingState lastEntry;
+
+        lastEntry = (!subscriptionEntitlementStates.isEmpty()) ? subscriptionEntitlementStates.get(subscriptionEntitlementStates.size() - 1) : null;
+        if (lastEntry != null && DefaultEntitlementApi.ENT_STATE_CANCELLED.equals(lastEntry.getStateName())) {
+            result = new LocalDate(lastEntry.getEffectiveDate(), account.getTimeZone());
+        }
+
+        lastEntry = (!bundleEntitlementStates.isEmpty()) ? bundleEntitlementStates.get(bundleEntitlementStates.size() - 1) : null;
+        if (lastEntry != null && DefaultEntitlementApi.ENT_STATE_CANCELLED.equals(lastEntry.getStateName())) {
+            final LocalDate localDate = new LocalDate(lastEntry.getEffectiveDate(), account.getTimeZone());
+            result = ((result == null) || (localDate.compareTo(result) < 0)) ? localDate : result;
+        }
+
+        lastEntry = (!accountEntitlementStates.isEmpty()) ? accountEntitlementStates.get(accountEntitlementStates.size() - 1) : null;
+        if (lastEntry != null && DefaultEntitlementApi.ENT_STATE_CANCELLED.equals(lastEntry.getStateName())) {
+            final LocalDate localDate = new LocalDate(lastEntry.getEffectiveDate(), account.getTimeZone());
+            result = ((result == null) || (localDate.compareTo(result) < 0)) ? localDate : result;
+        }
+
+        entitlementEffectiveEndDate = result;
+    }
+
+    private void computeEntitlementCancelEvent() {
+        entitlementCancelEvent = Iterables.<BlockingState>tryFind(subscriptionEntitlementStates,
+                                                                  new Predicate<BlockingState>() {
+                                                                      @Override
+                                                                      public boolean apply(final BlockingState input) {
+                                                                          return EntitlementService.ENTITLEMENT_SERVICE_NAME.equals(input.getService()) &&
+                                                                                 DefaultEntitlementApi.ENT_STATE_CANCELLED.equals(input.getStateName());
+                                                                      }
+                                                                  }).orNull();
+    }
+
+    private void computeStateForEntitlement() {
+        // Current state for the ENTITLEMENT_SERVICE_NAME is set to cancelled
+        if (entitlementEffectiveEndDate != null && entitlementEffectiveEndDate.compareTo(new LocalDate(utcNow, account.getTimeZone())) <= 0) {
+            entitlementState = EntitlementState.CANCELLED;
+        } else {
+            // Gather states across all services and check if one of them is set to 'blockEntitlement'
+            entitlementState = (blockingAggregator != null && blockingAggregator.isBlockEntitlement() ? EntitlementState.BLOCKED : EntitlementState.ACTIVE);
+        }
+    }
+}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventsStreamBuilder.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventsStreamBuilder.java
new file mode 100644
index 0000000..4634b00
--- /dev/null
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/EventsStreamBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2010-2013 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.entitlement.engine.core;
+
+import java.util.List;
+import java.util.UUID;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.ning.billing.account.api.Account;
+import com.ning.billing.account.api.AccountApiException;
+import com.ning.billing.account.api.AccountInternalApi;
+import com.ning.billing.callcontext.InternalTenantContext;
+import com.ning.billing.clock.Clock;
+import com.ning.billing.entitlement.EntitlementService;
+import com.ning.billing.entitlement.api.BlockingApiException;
+import com.ning.billing.entitlement.api.BlockingState;
+import com.ning.billing.entitlement.api.EntitlementApiException;
+import com.ning.billing.entitlement.block.BlockingChecker;
+import com.ning.billing.entitlement.block.BlockingChecker.BlockingAggregator;
+import com.ning.billing.entitlement.dao.BlockingStateDao;
+import com.ning.billing.subscription.api.SubscriptionBase;
+import com.ning.billing.subscription.api.SubscriptionBaseInternalApi;
+import com.ning.billing.subscription.api.user.SubscriptionBaseApiException;
+import com.ning.billing.subscription.api.user.SubscriptionBaseBundle;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.callcontext.TenantContext;
+
+@Singleton
+public class EventsStreamBuilder {
+
+    private final AccountInternalApi accountInternalApi;
+    private final SubscriptionBaseInternalApi subscriptionInternalApi;
+    private final BlockingChecker checker;
+    private final BlockingStateDao blockingStateDao;
+    private final Clock clock;
+    private final InternalCallContextFactory internalCallContextFactory;
+
+    @Inject
+    public EventsStreamBuilder(final AccountInternalApi accountInternalApi, final SubscriptionBaseInternalApi subscriptionInternalApi,
+                               final BlockingChecker checker, final BlockingStateDao blockingStateDao,
+                               final Clock clock, final InternalCallContextFactory internalCallContextFactory) {
+        this.accountInternalApi = accountInternalApi;
+        this.subscriptionInternalApi = subscriptionInternalApi;
+        this.checker = checker;
+        this.blockingStateDao = blockingStateDao;
+        this.clock = clock;
+        this.internalCallContextFactory = internalCallContextFactory;
+    }
+
+    public EventsStream buildForBaseSubscription(final UUID bundleId, final TenantContext tenantContext) throws EntitlementApiException {
+        final SubscriptionBaseBundle bundle;
+        final SubscriptionBase subscription;
+        try {
+            final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(tenantContext);
+            bundle = subscriptionInternalApi.getBundleFromId(bundleId, internalTenantContext);
+            subscription = subscriptionInternalApi.getBaseSubscription(bundleId, internalTenantContext);
+        } catch (SubscriptionBaseApiException e) {
+            throw new EntitlementApiException(e);
+        }
+
+        return buildForEntitlement(bundle, subscription, tenantContext);
+    }
+
+    public EventsStream buildForEntitlement(final UUID entitlementId, final TenantContext tenantContext) throws EntitlementApiException {
+        final SubscriptionBase subscription;
+        final SubscriptionBaseBundle bundle;
+        try {
+            final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(tenantContext);
+            subscription = subscriptionInternalApi.getSubscriptionFromId(entitlementId, internalTenantContext);
+            bundle = subscriptionInternalApi.getBundleFromId(subscription.getBundleId(), internalTenantContext);
+        } catch (SubscriptionBaseApiException e) {
+            throw new EntitlementApiException(e);
+        }
+
+        return buildForEntitlement(bundle, subscription, tenantContext);
+    }
+
+    private EventsStream buildForEntitlement(final SubscriptionBaseBundle bundle, final SubscriptionBase subscription, final TenantContext tenantContext) throws EntitlementApiException {
+        final InternalTenantContext contextWithValidAccountRecordId = internalCallContextFactory.createInternalTenantContext(bundle.getAccountId(), tenantContext);
+
+        final Account account;
+        try {
+            account = accountInternalApi.getAccountById(bundle.getAccountId(), contextWithValidAccountRecordId);
+        } catch (AccountApiException e) {
+            throw new EntitlementApiException(e);
+        }
+
+        final List<BlockingState> subscriptionEntitlementStates = blockingStateDao.getBlockingHistoryForService(subscription.getId(), EntitlementService.ENTITLEMENT_SERVICE_NAME, contextWithValidAccountRecordId);
+        final List<BlockingState> bundleEntitlementStates = blockingStateDao.getBlockingHistoryForService(bundle.getId(), EntitlementService.ENTITLEMENT_SERVICE_NAME, contextWithValidAccountRecordId);
+        final List<BlockingState> accountEntitlementStates = blockingStateDao.getBlockingHistoryForService(account.getId(), EntitlementService.ENTITLEMENT_SERVICE_NAME, contextWithValidAccountRecordId);
+
+        final BlockingAggregator blockingAggregator;
+        try {
+            blockingAggregator = checker.getBlockedStatus(subscription, contextWithValidAccountRecordId);
+        } catch (BlockingApiException e) {
+            throw new EntitlementApiException(e);
+        }
+
+        return new EventsStream(account,
+                                bundle,
+                                subscriptionEntitlementStates,
+                                bundleEntitlementStates,
+                                accountEntitlementStates,
+                                blockingAggregator,
+                                subscription,
+                                contextWithValidAccountRecordId,
+                                clock.getUTCNow());
+    }
+}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java b/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
index c7d9e26..f639db3 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
@@ -30,6 +30,7 @@ import com.ning.billing.entitlement.block.DefaultBlockingChecker;
 import com.ning.billing.entitlement.dao.BlockingStateDao;
 import com.ning.billing.entitlement.dao.DefaultBlockingStateDao;
 import com.ning.billing.entitlement.engine.core.EntitlementUtils;
+import com.ning.billing.entitlement.engine.core.EventsStreamBuilder;
 import com.ning.billing.glue.EntitlementModule;
 import com.ning.billing.junction.BlockingInternalApi;
 
@@ -50,6 +51,7 @@ public class DefaultEntitlementModule extends AbstractModule implements Entitlem
         installBlockingChecker();
         bind(EntitlementService.class).to(DefaultEntitlementService.class).asEagerSingleton();
         bind(EntitlementUtils.class).asEagerSingleton();
+        bind(EventsStreamBuilder.class).asEagerSingleton();
     }
 
     @Override