killbill-uncached

entitlement: send event from transaction when setting new

1/12/2016 12:06:26 PM

Details

diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/block/DefaultBlockingChecker.java b/entitlement/src/main/java/org/killbill/billing/entitlement/block/DefaultBlockingChecker.java
index d468dbd..0c2b756 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/block/DefaultBlockingChecker.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/block/DefaultBlockingChecker.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -115,6 +117,8 @@ public class DefaultBlockingChecker implements BlockingChecker {
     private final SubscriptionBaseInternalApi subscriptionApi;
     private final BlockingStateDao dao;
 
+    private final StatelessBlockingChecker statelessBlockingChecker = new StatelessBlockingChecker();
+
     @Inject
     public DefaultBlockingChecker(final SubscriptionBaseInternalApi subscriptionApi, final BlockingStateDao dao) {
         this.subscriptionApi = subscriptionApi;
@@ -126,7 +130,7 @@ public class DefaultBlockingChecker implements BlockingChecker {
         try {
             subscription = subscriptionApi.getSubscriptionFromId(subscriptionId, context);
             return getBlockedStateSubscription(subscription, upToDate, context);
-        } catch (SubscriptionBaseApiException e) {
+        } catch (final SubscriptionBaseApiException e) {
             throw new BlockingApiException(e, ErrorCode.fromCode(e.getCode()));
         }
     }
@@ -152,7 +156,7 @@ public class DefaultBlockingChecker implements BlockingChecker {
         try {
             bundle = subscriptionApi.getBundleFromId(bundleId, context);
             return getBlockedStateBundle(bundle, upToDate, context);
-        } catch (SubscriptionBaseApiException e) {
+        } catch (final SubscriptionBaseApiException e) {
             throw new BlockingApiException(e, ErrorCode.fromCode(e.getCode()));
         }
     }
@@ -185,15 +189,7 @@ public class DefaultBlockingChecker implements BlockingChecker {
         } else {
             blockableState = ImmutableList.<BlockingState>of();
         }
-        return getBlockedState(blockableState);
-    }
-
-    private DefaultBlockingAggregator getBlockedState(final Iterable<BlockingState> currentBlockableStatePerService) {
-        final DefaultBlockingAggregator result = new DefaultBlockingAggregator();
-        for (final BlockingState cur : currentBlockableStatePerService) {
-            result.or(cur);
-        }
-        return result;
+        return statelessBlockingChecker.getBlockedState(blockableState);
     }
 
     @Override
@@ -209,10 +205,7 @@ public class DefaultBlockingChecker implements BlockingChecker {
 
     @Override
     public BlockingAggregator getBlockedStatus(final List<BlockingState> accountEntitlementStates, final List<BlockingState> bundleEntitlementStates, final List<BlockingState> subscriptionEntitlementStates, final InternalTenantContext internalTenantContext) {
-        final DefaultBlockingAggregator result = getBlockedState(subscriptionEntitlementStates);
-        result.or(getBlockedState(bundleEntitlementStates));
-        result.or(getBlockedState(accountEntitlementStates));
-        return result;
+        return statelessBlockingChecker.getBlockedState(accountEntitlementStates, bundleEntitlementStates, subscriptionEntitlementStates);
     }
 
     @Override
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/block/StatelessBlockingChecker.java b/entitlement/src/main/java/org/killbill/billing/entitlement/block/StatelessBlockingChecker.java
new file mode 100644
index 0000000..495c494
--- /dev/null
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/block/StatelessBlockingChecker.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016 Groupon, Inc
+ * Copyright 2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.entitlement.block;
+
+import org.killbill.billing.entitlement.api.BlockingState;
+import org.killbill.billing.entitlement.block.DefaultBlockingChecker.DefaultBlockingAggregator;
+
+public class StatelessBlockingChecker {
+
+    public DefaultBlockingAggregator getBlockedState(final Iterable<BlockingState> accountEntitlementStates,
+                                                     final Iterable<BlockingState> bundleEntitlementStates,
+                                                     final Iterable<BlockingState> subscriptionEntitlementStates) {
+        final DefaultBlockingAggregator result = getBlockedState(subscriptionEntitlementStates);
+        result.or(getBlockedState(bundleEntitlementStates));
+        result.or(getBlockedState(accountEntitlementStates));
+        return result;
+    }
+
+    public DefaultBlockingAggregator getBlockedState(final Iterable<BlockingState> currentBlockableStatePerService) {
+        final DefaultBlockingAggregator result = new DefaultBlockingAggregator();
+        for (final BlockingState cur : currentBlockableStatePerService) {
+            result.or(cur);
+        }
+        return result;
+    }
+}
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/BlockingStateDao.java b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/BlockingStateDao.java
index 75c4ebb..770f56d 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/BlockingStateDao.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/BlockingStateDao.java
@@ -61,12 +61,13 @@ public interface BlockingStateDao extends EntityDao<BlockingStateModelDao, Block
     public List<BlockingState> getBlockingAllForAccountRecordId(InternalTenantContext context);
 
     /**
-     * Sets a new state for a specific service.
+     * Sets a new state for a specific service and send an event if needed
      *
      * @param state   blocking state to set
+     * @param bundleId bundle id of the associated bundle if the blocking state type is SUBSCRIPTION
      * @param context call context
      */
-    public void setBlockingState(BlockingState state, InternalCallContext context);
+    public void setBlockingStateAndPostBlockingTransitionEvent(BlockingState state, UUID bundleId, InternalCallContext context);
 
     /**
      * Unactive the blocking state
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/DefaultBlockingStateDao.java b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/DefaultBlockingStateDao.java
index a921ba2..6db86ed 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/DefaultBlockingStateDao.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/DefaultBlockingStateDao.java
@@ -18,6 +18,7 @@
 
 package org.killbill.billing.entitlement.dao;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -30,27 +31,49 @@ import javax.annotation.Nullable;
 
 import org.joda.time.DateTime;
 import org.killbill.billing.ErrorCode;
+import org.killbill.billing.ObjectType;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.entitlement.DefaultEntitlementService;
+import org.killbill.billing.entitlement.EntitlementService;
+import org.killbill.billing.entitlement.api.BlockingApiException;
 import org.killbill.billing.entitlement.api.BlockingState;
 import org.killbill.billing.entitlement.api.BlockingStateType;
+import org.killbill.billing.entitlement.api.DefaultBlockingTransitionInternalEvent;
 import org.killbill.billing.entitlement.api.EntitlementApiException;
+import org.killbill.billing.entitlement.block.BlockingChecker.BlockingAggregator;
+import org.killbill.billing.entitlement.block.StatelessBlockingChecker;
+import org.killbill.billing.entitlement.engine.core.BlockingTransitionNotificationKey;
+import org.killbill.billing.util.cache.Cachable.CacheType;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.dao.NonEntityDao;
 import org.killbill.billing.util.entity.dao.EntityDaoBase;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import org.killbill.bus.api.BusEvent;
+import org.killbill.bus.api.PersistentBus;
+import org.killbill.bus.api.PersistentBus.EventBusException;
 import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 
 public class DefaultBlockingStateDao extends EntityDaoBase<BlockingStateModelDao, BlockingState, EntitlementApiException> implements BlockingStateDao {
 
+    private static final Logger log = LoggerFactory.getLogger(DefaultBlockingStateDao.class);
+
     // Assume the input is blocking states for a single blockable id
     private static final Ordering<BlockingStateModelDao> BLOCKING_STATE_MODEL_DAO_ORDERING = Ordering.<BlockingStateModelDao>from(new Comparator<BlockingStateModelDao>() {
         @Override
@@ -79,11 +102,21 @@ public class DefaultBlockingStateDao extends EntityDaoBase<BlockingStateModelDao
     });
 
     private final Clock clock;
+    private final NotificationQueueService notificationQueueService;
+    private final PersistentBus eventBus;
+    private final CacheControllerDispatcher cacheControllerDispatcher;
+    private final NonEntityDao nonEntityDao;
+
+    private final StatelessBlockingChecker statelessBlockingChecker = new StatelessBlockingChecker();
 
-    public DefaultBlockingStateDao(final IDBI dbi, final Clock clock,
+    public DefaultBlockingStateDao(final IDBI dbi, final Clock clock, final NotificationQueueService notificationQueueService, final PersistentBus eventBus,
                                    final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
         super(new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, cacheControllerDispatcher, nonEntityDao), BlockingStateSqlDao.class);
         this.clock = clock;
+        this.notificationQueueService = notificationQueueService;
+        this.eventBus = eventBus;
+        this.cacheControllerDispatcher = cacheControllerDispatcher;
+        this.nonEntityDao = nonEntityDao;
     }
 
     @Override
@@ -109,20 +142,25 @@ public class DefaultBlockingStateDao extends EntityDaoBase<BlockingStateModelDao
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<BlockingState>>() {
             @Override
             public List<BlockingState> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
-                // Upper bound time limit is now
-                final Date upTo = upToDate.toDate();
-                final List<BlockingStateModelDao> models = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class).getBlockingState(blockableId, upTo, context);
-                final Collection<BlockingStateModelDao> modelsFiltered = filterBlockingStates(models, blockingStateType);
-                return new ArrayList<BlockingState>(Collections2.transform(modelsFiltered, new Function<BlockingStateModelDao, BlockingState>() {
-                    @Override
-                    public BlockingState apply(@Nullable final BlockingStateModelDao src) {
-                        return BlockingStateModelDao.toBlockingState(src);
-                    }
-                }));
+                final BlockingStateSqlDao sqlDao = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class);
+                return getBlockingState(sqlDao, blockableId, blockingStateType, upToDate, context);
             }
         });
     }
 
+    private List<BlockingState> getBlockingState(final BlockingStateSqlDao sqlDao, final UUID blockableId, final BlockingStateType blockingStateType, final DateTime upToDate, final InternalTenantContext context) {
+        final Date upTo = upToDate.toDate();
+        final List<BlockingStateModelDao> models = sqlDao.getBlockingState(blockableId, upTo, context);
+        final Collection<BlockingStateModelDao> modelsFiltered = filterBlockingStates(models, blockingStateType);
+        return new ArrayList<BlockingState>(Collections2.transform(modelsFiltered,
+                                                                   new Function<BlockingStateModelDao, BlockingState>() {
+                                                                       @Override
+                                                                       public BlockingState apply(@Nullable final BlockingStateModelDao src) {
+                                                                           return BlockingStateModelDao.toBlockingState(src);
+                                                                       }
+                                                                   }));
+    }
+
     @Override
     public List<BlockingState> getBlockingAllForAccountRecordId(final InternalTenantContext context) {
         return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<BlockingState>>() {
@@ -141,13 +179,16 @@ public class DefaultBlockingStateDao extends EntityDaoBase<BlockingStateModelDao
     }
 
     @Override
-    public void setBlockingState(final BlockingState state, final InternalCallContext context) {
+    public void setBlockingStateAndPostBlockingTransitionEvent(final BlockingState state, final UUID bundleId, final InternalCallContext context) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
             @Override
             public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
+                final DateTime upToDate = clock.getUTCNow();
+                final BlockingStateSqlDao sqlDao = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class);
+                final BlockingAggregator previousState = getBlockedStatus(sqlDao, entitySqlDaoWrapperFactory.getHandle(), state.getBlockedId(), state.getType(), bundleId, upToDate, context);
+
                 final BlockingStateModelDao newBlockingStateModelDao = new BlockingStateModelDao(state, context);
 
-                final BlockingStateSqlDao sqlDao = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class);
                 // Get all blocking states for that blocked id and service
                 final List<BlockingStateModelDao> allForBlockedItAndService = sqlDao.getBlockingHistoryForService(state.getBlockedId(), state.getService(), context);
 
@@ -185,11 +226,111 @@ public class DefaultBlockingStateDao extends EntityDaoBase<BlockingStateModelDao
                     sqlDao.create(newBlockingStateModelDao, context);
                 }
 
+                final BlockingAggregator currentState = getBlockedStatus(sqlDao, entitySqlDaoWrapperFactory.getHandle(), state.getBlockedId(), state.getType(), bundleId, upToDate, context);
+                if (previousState != null && currentState != null) {
+                    recordBusOrFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
+                                                                 state.getId(),
+                                                                 state.getEffectiveDate(),
+                                                                 state.getBlockedId(),
+                                                                 state.getType(),
+                                                                 state.getService(),
+                                                                 previousState,
+                                                                 currentState,
+                                                                 context);
+                }
+
                 return null;
             }
         });
     }
 
+    private BlockingAggregator getBlockedStatus(final BlockingStateSqlDao sqlDao, final Handle handle, final UUID blockableId, final BlockingStateType type, @Nullable final UUID bundleId, final DateTime upToDate, final InternalTenantContext context) throws BlockingApiException {
+        final List<BlockingState> accountBlockingStates;
+        final List<BlockingState> bundleBlockingStates;
+        final List<BlockingState> subscriptionBlockingStates;
+        if (type == BlockingStateType.SUBSCRIPTION) {
+            final UUID accountId = nonEntityDao.retrieveIdFromObjectInTransaction(context.getAccountRecordId(), ObjectType.ACCOUNT, cacheControllerDispatcher.getCacheController(CacheType.OBJECT_ID), handle);
+            accountBlockingStates = getBlockingState(sqlDao, accountId, BlockingStateType.ACCOUNT, upToDate, context);
+            bundleBlockingStates = getBlockingState(sqlDao, bundleId, BlockingStateType.SUBSCRIPTION_BUNDLE, upToDate, context);
+            subscriptionBlockingStates = getBlockingState(sqlDao, blockableId, BlockingStateType.SUBSCRIPTION, upToDate, context);
+        } else if (type == BlockingStateType.SUBSCRIPTION_BUNDLE) {
+            final UUID accountId = nonEntityDao.retrieveIdFromObjectInTransaction(context.getAccountRecordId(), ObjectType.ACCOUNT, cacheControllerDispatcher.getCacheController(CacheType.OBJECT_ID), handle);
+            accountBlockingStates = getBlockingState(sqlDao, accountId, BlockingStateType.ACCOUNT, upToDate, context);
+            bundleBlockingStates = getBlockingState(sqlDao, blockableId, BlockingStateType.SUBSCRIPTION_BUNDLE, upToDate, context);
+            subscriptionBlockingStates = ImmutableList.<BlockingState>of();
+        } else { // BlockingStateType.ACCOUNT {
+            accountBlockingStates = getBlockingState(sqlDao, blockableId, BlockingStateType.ACCOUNT, upToDate, context);
+            bundleBlockingStates = ImmutableList.<BlockingState>of();
+            subscriptionBlockingStates = ImmutableList.<BlockingState>of();
+        }
+        return statelessBlockingChecker.getBlockedState(accountBlockingStates, bundleBlockingStates, subscriptionBlockingStates);
+    }
+
+    private void recordBusOrFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
+                                                              final UUID blockingStateId,
+                                                              final DateTime effectiveDate,
+                                                              final UUID blockableId,
+                                                              final BlockingStateType type,
+                                                              final String serviceName,
+                                                              final BlockingAggregator previousState,
+                                                              final BlockingAggregator currentState,
+                                                              final InternalCallContext context) {
+        final boolean isTransitionToBlockedBilling = !previousState.isBlockBilling() && currentState.isBlockBilling();
+        final boolean isTransitionToUnblockedBilling = previousState.isBlockBilling() && !currentState.isBlockBilling();
+
+        final boolean isTransitionToBlockedEntitlement = !previousState.isBlockEntitlement() && currentState.isBlockEntitlement();
+        final boolean isTransitionToUnblockedEntitlement = previousState.isBlockEntitlement() && !currentState.isBlockEntitlement();
+
+        if (effectiveDate.compareTo(clock.getUTCNow()) > 0) {
+            // Add notification entry to send the bus event at the effective date
+            final NotificationEvent notificationEvent = new BlockingTransitionNotificationKey(blockingStateId, blockableId, type,
+                                                                                              isTransitionToBlockedBilling, isTransitionToUnblockedBilling,
+                                                                                              isTransitionToBlockedEntitlement, isTransitionToUnblockedEntitlement);
+            recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, notificationEvent, context);
+        } else {
+            // TODO Do we want to send a DefaultEffectiveEntitlementEvent for entitlement specific blocking states?
+            // Don't post if nothing has changed for entitlement-service
+            if (!serviceName.equals(EntitlementService.ENTITLEMENT_SERVICE_NAME) || !previousState.equals(currentState)) {
+                final BusEvent event = new DefaultBlockingTransitionInternalEvent(blockableId, type,
+                                                                                  isTransitionToBlockedBilling, isTransitionToUnblockedBilling,
+                                                                                  isTransitionToBlockedEntitlement, isTransitionToUnblockedEntitlement,
+                                                                                  context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken());
+                notifyBusFromTransaction(entitySqlDaoWrapperFactory, event);
+            } else {
+                log.debug("Skipping event for service {} (previousState={}, currentState={})", serviceName, previousState, currentState);
+            }
+
+        }
+    }
+
+    private void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
+                                                         final DateTime effectiveDate,
+                                                         final NotificationEvent notificationEvent,
+                                                         final InternalCallContext context) {
+        try {
+            final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME,
+                                                                                                           DefaultEntitlementService.NOTIFICATION_QUEUE_NAME);
+            subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(),
+                                                                           effectiveDate,
+                                                                           notificationEvent,
+                                                                           context.getUserToken(),
+                                                                           context.getAccountRecordId(),
+                                                                           context.getTenantRecordId());
+        } catch (final NoSuchNotificationQueue e) {
+            throw new RuntimeException(e);
+        } catch (final IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void notifyBusFromTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final BusEvent event) {
+        try {
+            eventBus.postFromTransaction(event, entitySqlDaoWrapperFactory.getHandle().getConnection());
+        } catch (final EventBusException e) {
+            log.warn("Failed to post event {}", e);
+        }
+    }
+
     @Override
     public void unactiveBlockingState(final UUID id, final InternalCallContext context) {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/OptimizedProxyBlockingStateDao.java b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/OptimizedProxyBlockingStateDao.java
index cef565e..3c228f2 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/OptimizedProxyBlockingStateDao.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/OptimizedProxyBlockingStateDao.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -34,7 +36,9 @@ import org.killbill.billing.subscription.api.SubscriptionBaseInternalApi;
 import org.killbill.billing.subscription.api.user.SubscriptionBaseBundle;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.dao.NonEntityDao;
+import org.killbill.bus.api.PersistentBus;
 import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
 import org.skife.jdbi.v2.IDBI;
 
 import com.google.common.collect.ImmutableList;
@@ -42,9 +46,9 @@ import com.google.common.collect.ImmutableList;
 public class OptimizedProxyBlockingStateDao extends ProxyBlockingStateDao {
 
     public OptimizedProxyBlockingStateDao(final EventsStreamBuilder eventsStreamBuilder, final SubscriptionBaseInternalApi subscriptionBaseInternalApi,
-                                          final IDBI dbi, final Clock clock, final CacheControllerDispatcher cacheControllerDispatcher,
-                                          final NonEntityDao nonEntityDao) {
-        super(eventsStreamBuilder, subscriptionBaseInternalApi, dbi, clock, cacheControllerDispatcher, nonEntityDao);
+                                          final IDBI dbi, final Clock clock, final NotificationQueueService notificationQueueService, final PersistentBus eventBus,
+                                          final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
+        super(eventsStreamBuilder, subscriptionBaseInternalApi, dbi, clock, notificationQueueService, eventBus, cacheControllerDispatcher, nonEntityDao);
     }
 
     /**
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/ProxyBlockingStateDao.java b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/ProxyBlockingStateDao.java
index e575ecb..4e6a6b9 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/ProxyBlockingStateDao.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/ProxyBlockingStateDao.java
@@ -47,7 +47,9 @@ import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.customfield.ShouldntHappenException;
 import org.killbill.billing.util.dao.NonEntityDao;
 import org.killbill.billing.util.entity.Pagination;
+import org.killbill.bus.api.PersistentBus;
 import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
 import org.skife.jdbi.v2.IDBI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -164,12 +166,12 @@ public class ProxyBlockingStateDao implements BlockingStateDao {
 
     @Inject
     public ProxyBlockingStateDao(final EventsStreamBuilder eventsStreamBuilder, final SubscriptionBaseInternalApi subscriptionBaseInternalApi,
-                                 final IDBI dbi, final Clock clock,
+                                 final IDBI dbi, final Clock clock, final NotificationQueueService notificationQueueService, final PersistentBus eventBus,
                                  final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
         this.eventsStreamBuilder = eventsStreamBuilder;
         this.subscriptionInternalApi = subscriptionBaseInternalApi;
         this.clock = clock;
-        this.delegate = new DefaultBlockingStateDao(dbi, clock, cacheControllerDispatcher, nonEntityDao);
+        this.delegate = new DefaultBlockingStateDao(dbi, clock, notificationQueueService, eventBus, cacheControllerDispatcher, nonEntityDao);
     }
 
     @Override
@@ -229,8 +231,8 @@ public class ProxyBlockingStateDao implements BlockingStateDao {
     }
 
     @Override
-    public void setBlockingState(final BlockingState state, final InternalCallContext context) {
-        delegate.setBlockingState(state, context);
+    public void setBlockingStateAndPostBlockingTransitionEvent(final BlockingState state, final UUID bundleId, final InternalCallContext context) {
+        delegate.setBlockingStateAndPostBlockingTransitionEvent(state, bundleId, context);
     }
 
     @Override
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EntitlementUtils.java b/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EntitlementUtils.java
index 8256d96..b0aee5b 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EntitlementUtils.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EntitlementUtils.java
@@ -18,88 +18,62 @@
 
 package org.killbill.billing.entitlement.engine.core;
 
-import java.io.IOException;
 import java.util.UUID;
 
 import javax.inject.Inject;
 
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.killbill.bus.api.BusEvent;
-import org.killbill.bus.api.PersistentBus;
-import org.killbill.bus.api.PersistentBus.EventBusException;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
-import org.killbill.clock.Clock;
 import org.killbill.billing.entitlement.DefaultEntitlementService;
-import org.killbill.billing.entitlement.EntitlementService;
-import org.killbill.billing.entitlement.api.BlockingApiException;
 import org.killbill.billing.entitlement.api.BlockingState;
 import org.killbill.billing.entitlement.api.BlockingStateType;
-import org.killbill.billing.entitlement.api.DefaultBlockingTransitionInternalEvent;
 import org.killbill.billing.entitlement.api.DefaultEntitlementApi;
-import org.killbill.billing.entitlement.block.BlockingChecker;
-import org.killbill.billing.entitlement.block.BlockingChecker.BlockingAggregator;
 import org.killbill.billing.entitlement.dao.BlockingStateDao;
-import org.killbill.notificationq.api.NotificationEvent;
-import org.killbill.notificationq.api.NotificationQueue;
-import org.killbill.notificationq.api.NotificationQueueService;
-import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
 import org.killbill.billing.subscription.api.SubscriptionBaseInternalApi;
+import org.killbill.billing.subscription.api.user.SubscriptionBaseApiException;
+import org.killbill.notificationq.api.NotificationQueueService;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 
 public class EntitlementUtils {
 
-    private static final Logger log = LoggerFactory.getLogger(EntitlementUtils.class);
+    protected final NotificationQueueService notificationQueueService;
 
     private final BlockingStateDao dao;
-    private final BlockingChecker blockingChecker;
     private final SubscriptionBaseInternalApi subscriptionBaseInternalApi;
-    private final PersistentBus eventBus;
-    private final Clock clock;
-    protected final NotificationQueueService notificationQueueService;
 
     @Inject
-    public EntitlementUtils(final BlockingStateDao dao, final BlockingChecker blockingChecker,
-                            final PersistentBus eventBus, final Clock clock,
+    public EntitlementUtils(final BlockingStateDao dao,
                             final SubscriptionBaseInternalApi subscriptionBaseInternalApi,
                             final NotificationQueueService notificationQueueService) {
         this.dao = dao;
-        this.blockingChecker = blockingChecker;
-        this.eventBus = eventBus;
-        this.clock = clock;
         this.subscriptionBaseInternalApi = subscriptionBaseInternalApi;
         this.notificationQueueService = notificationQueueService;
     }
 
     /**
-     * Wrapper around BlockingStateDao#setBlockingState which will send an event on the bus if needed
-     *
      * @param state   new state to store
      * @param context call context
      */
     public void setBlockingStateAndPostBlockingTransitionEvent(final BlockingState state, final InternalCallContext context) {
-        final BlockingAggregator previousState = getBlockingStateFor(state.getBlockedId(), state.getType(), clock.getUTCNow(), context);
-
-        dao.setBlockingState(state, context);
-
-        final BlockingAggregator currentState = getBlockingStateFor(state.getBlockedId(), state.getType(), state.getEffectiveDate(), context);
-        if (previousState != null && currentState != null) {
-            postBlockingTransitionEvent(state.getId(), state.getEffectiveDate(), state.getBlockedId(), state.getType(), state.getService(), previousState, currentState, context);
+        UUID bundleId = null;
+        if (state.getType() == BlockingStateType.SUBSCRIPTION) {
+            try {
+                bundleId = subscriptionBaseInternalApi.getSubscriptionFromId(state.getBlockedId(), context).getBundleId();
+            } catch (final SubscriptionBaseApiException e) {
+                throw new RuntimeException(e);
+            }
         }
+        dao.setBlockingStateAndPostBlockingTransitionEvent(state, bundleId, context);
     }
 
     /**
-     *
-     * @param externalKey the bundle externalKey
+     * @param externalKey   the bundle externalKey
      * @param tenantContext the context
      * @return the id of the first subscription (BASE or STANDALONE) that is still active for that key
      */
-    public UUID getFirstActiveSubscriptionIdForKeyOrNull(final String externalKey, final InternalTenantContext tenantContext)  {
+    public UUID getFirstActiveSubscriptionIdForKeyOrNull(final String externalKey, final InternalTenantContext tenantContext) {
 
         final Iterable<UUID> nonAddonUUIDs = subscriptionBaseInternalApi.getNonAOSubscriptionIdsForKey(externalKey, tenantContext);
         return Iterables.tryFind(nonAddonUUIDs, new Predicate<UUID>() {
@@ -110,70 +84,4 @@ public class EntitlementUtils {
             }
         }).orNull();
     }
-
-
-    private BlockingAggregator getBlockingStateFor(final UUID blockableId, final BlockingStateType type, final DateTime effectiveDate, final InternalCallContext context) {
-        try {
-            return blockingChecker.getBlockedStatus(blockableId, type, effectiveDate, context);
-        } catch (BlockingApiException e) {
-            log.warn("Failed to retrieve blocking state for {} {}", blockableId, type);
-            return null;
-        }
-    }
-
-    private void postBlockingTransitionEvent(final UUID blockingStateId, final DateTime effectiveDate, final UUID blockableId, final BlockingStateType type,
-                                             final String serviceName, final BlockingAggregator previousState, final BlockingAggregator currentState,
-                                             final InternalCallContext context) {
-        final boolean isTransitionToBlockedBilling = !previousState.isBlockBilling() && currentState.isBlockBilling();
-        final boolean isTransitionToUnblockedBilling = previousState.isBlockBilling() && !currentState.isBlockBilling();
-
-        final boolean isTransitionToBlockedEntitlement = !previousState.isBlockEntitlement() && currentState.isBlockEntitlement();
-        final boolean isTransitionToUnblockedEntitlement = previousState.isBlockEntitlement() && !currentState.isBlockEntitlement();
-
-        if (effectiveDate.compareTo(clock.getUTCNow()) > 0) {
-            // Add notification entry to send the bus event at the effective date
-            final NotificationEvent notificationEvent = new BlockingTransitionNotificationKey(blockingStateId, blockableId, type,
-                                                                                              isTransitionToBlockedBilling, isTransitionToUnblockedBilling,
-                                                                                              isTransitionToBlockedEntitlement, isTransitionToUnblockedEntitlement);
-            recordFutureNotification(effectiveDate, notificationEvent, context);
-        } else {
-            // TODO Do we want to send a DefaultEffectiveEntitlementEvent for entitlement specific blocking states?
-
-            // Don't post if nothing has changed for entitlement-service
-           if (! serviceName.equals(EntitlementService.ENTITLEMENT_SERVICE_NAME) || ! previousState.equals(currentState)) {
-                final BusEvent event = new DefaultBlockingTransitionInternalEvent(blockableId, type,
-                                                                                  isTransitionToBlockedBilling, isTransitionToUnblockedBilling,
-                                                                                  isTransitionToBlockedEntitlement, isTransitionToUnblockedEntitlement,
-                                                                                  context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken());
-                postBusEvent(event);
-           } else {
-               System.out.println("**********   SKIPPING EVENT ");
-           }
-
-        }
-    }
-
-    private void postBusEvent(final BusEvent event) {
-        try {
-            // TODO STEPH Ideally we would like to post from transaction when we inserted the new blocking state, but new state would have to be recalculated from transaction which is
-            // difficult without the help of BlockingChecker -- which itself relies on dao. Other alternative is duplicating the logic, or refactoring the DAO to export higher level api.
-            eventBus.post(event);
-        } catch (EventBusException e) {
-            log.warn("Failed to post event {}", e);
-        }
-    }
-
-    private void recordFutureNotification(final DateTime effectiveDate,
-                                          final NotificationEvent notificationEvent,
-                                          final InternalCallContext context) {
-        try {
-            final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME,
-                                                                                                           DefaultEntitlementService.NOTIFICATION_QUEUE_NAME);
-            subscriptionEventQueue.recordFutureNotification(effectiveDate, notificationEvent, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId());
-        } catch (NoSuchNotificationQueue e) {
-            throw new RuntimeException(e);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
 }
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EventsStreamBuilder.java b/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EventsStreamBuilder.java
index afd51c8..b090a03 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EventsStreamBuilder.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EventsStreamBuilder.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -53,7 +55,9 @@ import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.TenantContext;
 import org.killbill.billing.util.dao.NonEntityDao;
+import org.killbill.bus.api.PersistentBus;
 import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
 import org.skife.jdbi.v2.IDBI;
 
 import com.google.common.base.Objects;
@@ -75,6 +79,7 @@ public class EventsStreamBuilder {
     @Inject
     public EventsStreamBuilder(final AccountInternalApi accountInternalApi, final SubscriptionBaseInternalApi subscriptionInternalApi,
                                final BlockingChecker checker, final IDBI dbi, final Clock clock,
+                               final NotificationQueueService notificationQueueService, final PersistentBus eventBus,
                                final CacheControllerDispatcher cacheControllerDispatcher,
                                final NonEntityDao nonEntityDao,
                                final InternalCallContextFactory internalCallContextFactory) {
@@ -84,8 +89,8 @@ public class EventsStreamBuilder {
         this.clock = clock;
         this.internalCallContextFactory = internalCallContextFactory;
 
-        this.defaultBlockingStateDao = new DefaultBlockingStateDao(dbi, clock, cacheControllerDispatcher, nonEntityDao);
-        this.blockingStateDao = new OptimizedProxyBlockingStateDao(this, subscriptionInternalApi, dbi, clock, cacheControllerDispatcher, nonEntityDao);
+        this.defaultBlockingStateDao = new DefaultBlockingStateDao(dbi, clock, notificationQueueService, eventBus, cacheControllerDispatcher, nonEntityDao);
+        this.blockingStateDao = new OptimizedProxyBlockingStateDao(this, subscriptionInternalApi, dbi, clock, notificationQueueService, eventBus, cacheControllerDispatcher, nonEntityDao);
     }
 
     public EventsStream refresh(final EventsStream eventsStream, final TenantContext tenantContext) throws EntitlementApiException {
diff --git a/entitlement/src/test/java/org/killbill/billing/entitlement/block/TestBlockingChecker.java b/entitlement/src/test/java/org/killbill/billing/entitlement/block/TestBlockingChecker.java
index 8689df2..37c0e6b 100644
--- a/entitlement/src/test/java/org/killbill/billing/entitlement/block/TestBlockingChecker.java
+++ b/entitlement/src/test/java/org/killbill/billing/entitlement/block/TestBlockingChecker.java
@@ -73,17 +73,17 @@ public class TestBlockingChecker extends EntitlementTestSuiteNoDB {
 
     private void setStateBundle(final boolean bC, final boolean bE, final boolean bB) {
         final BlockingState bundleState = new DefaultBlockingState(bundle.getId(), BlockingStateType.SUBSCRIPTION_BUNDLE,"state", "test-service", bC, bE, bB, clock.getUTCNow());
-        blockingStateDao.setBlockingState(bundleState, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(bundleState, null, internalCallContext);
     }
 
     private void setStateAccount(final boolean bC, final boolean bE, final boolean bB) {
         final BlockingState accountState = new DefaultBlockingState(account.getId(), BlockingStateType.ACCOUNT, "state", "test-service", bC, bE, bB, clock.getUTCNow());
-        blockingStateDao.setBlockingState(accountState, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(accountState, null, internalCallContext);
     }
 
     private void setStateSubscription(final boolean bC, final boolean bE, final boolean bB) {
         final BlockingState subscriptionState = new DefaultBlockingState(subscription.getId(), BlockingStateType.SUBSCRIPTION, "state", "test-service", bC, bE, bB, clock.getUTCNow());
-        blockingStateDao.setBlockingState(subscriptionState, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(subscriptionState, subscription.getBundleId(), internalCallContext);
     }
 
     @Test(groups = "fast")
diff --git a/entitlement/src/test/java/org/killbill/billing/entitlement/dao/MockBlockingStateDao.java b/entitlement/src/test/java/org/killbill/billing/entitlement/dao/MockBlockingStateDao.java
index 52ff2f3..4fc146b 100644
--- a/entitlement/src/test/java/org/killbill/billing/entitlement/dao/MockBlockingStateDao.java
+++ b/entitlement/src/test/java/org/killbill/billing/entitlement/dao/MockBlockingStateDao.java
@@ -84,7 +84,7 @@ public class MockBlockingStateDao extends MockEntityDaoBase<BlockingStateModelDa
     }
 
     @Override
-    public synchronized void setBlockingState(final BlockingState state, final InternalCallContext context) {
+    public synchronized void setBlockingStateAndPostBlockingTransitionEvent(final BlockingState state, final UUID bundleId, final InternalCallContext context) {
         if (blockingStates.get(state.getBlockedId()) == null) {
             blockingStates.put(state.getBlockedId(), new ArrayList<BlockingState>());
         }
diff --git a/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestBlockingDao.java b/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestBlockingDao.java
index b5addb7..ba25a23 100644
--- a/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestBlockingDao.java
+++ b/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestBlockingDao.java
@@ -55,13 +55,13 @@ public class TestBlockingDao extends EntitlementTestSuiteWithEmbeddedDB {
         clock.setDay(new LocalDate(2012, 4, 1));
 
         final BlockingState state1 = new DefaultBlockingState(uuid, BlockingStateType.ACCOUNT, overdueStateName, service, blockChange, blockEntitlement, blockBilling, clock.getUTCNow());
-        blockingStateDao.setBlockingState(state1, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(state1, null, internalCallContext);
 
         clock.addDays(1);
 
         final String overdueStateName2 = "NoReallyThisCantGoOn";
         final BlockingState state2 = new DefaultBlockingState(uuid, BlockingStateType.ACCOUNT, overdueStateName2, service, blockChange, blockEntitlement, blockBilling, clock.getUTCNow());
-        blockingStateDao.setBlockingState(state2, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(state2, null, internalCallContext);
 
         Assert.assertEquals(blockingStateDao.getBlockingStateForService(uuid, BlockingStateType.ACCOUNT, service, internalCallContext).getStateName(), state2.getStateName());
 
@@ -83,14 +83,14 @@ public class TestBlockingDao extends EntitlementTestSuiteWithEmbeddedDB {
         final boolean blockBilling = false;
 
         final BlockingState state1 = new DefaultBlockingState(uuid, BlockingStateType.ACCOUNT, overdueStateName, service1, blockChange, blockEntitlement, blockBilling, clock.getUTCNow());
-        blockingStateDao.setBlockingState(state1, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(state1, null, internalCallContext);
         clock.setDeltaFromReality(1000 * 3600 * 24);
 
         final String service2 = "TEST2";
 
         final String overdueStateName2 = "NoReallyThisCantGoOn";
         final BlockingState state2 = new DefaultBlockingState(uuid, BlockingStateType.ACCOUNT, overdueStateName2, service2, blockChange, blockEntitlement, blockBilling, clock.getUTCNow());
-        blockingStateDao.setBlockingState(state2, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(state2, null, internalCallContext);
 
         final List<BlockingState> history2 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
         Assert.assertEquals(history2.size(), 2);
diff --git a/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestDefaultBlockingStateDao.java b/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestDefaultBlockingStateDao.java
index bb066b4..8935cdd 100644
--- a/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestDefaultBlockingStateDao.java
+++ b/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestDefaultBlockingStateDao.java
@@ -74,7 +74,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
         // Set a state
         final DateTime stateDateTime = new DateTime(2013, 5, 6, 10, 11, 12, DateTimeZone.UTC);
         final BlockingState blockingState = new DefaultBlockingState(entitlement.getId(), type, state, service, false, false, false, stateDateTime);
-        blockingStateDao.setBlockingState(blockingState, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState, entitlement.getBundleId(), internalCallContext);
 
         Assert.assertEquals(blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext).size(), 1);
     }
@@ -97,7 +97,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
         // Set a state for service A
         final DateTime stateDateTime = new DateTime(2013, 5, 6, 10, 11, 12, DateTimeZone.UTC);
         final BlockingState blockingState1 = new DefaultBlockingState(blockableId, type, state, serviceA, false, false, false, stateDateTime);
-        blockingStateDao.setBlockingState(blockingState1, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState1, null, internalCallContext);
         final List<BlockingState> blockingStates1 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
         Assert.assertEquals(blockingStates1.size(), 1);
         Assert.assertEquals(blockingStates1.get(0).getBlockedId(), blockableId);
@@ -106,7 +106,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
         Assert.assertEquals(blockingStates1.get(0).getEffectiveDate(), stateDateTime);
 
         // Set the same state again - no change
-        blockingStateDao.setBlockingState(blockingState1, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState1, null, internalCallContext);
         final List<BlockingState> blockingStates2 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
         Assert.assertEquals(blockingStates2.size(), 1);
         Assert.assertEquals(blockingStates2.get(0).getBlockedId(), blockableId);
@@ -116,7 +116,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
 
         // Set the state for service B
         final BlockingState blockingState2 = new DefaultBlockingState(blockableId, type, state, serviceB, false, false, false, stateDateTime);
-        blockingStateDao.setBlockingState(blockingState2, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState2, null, internalCallContext);
         final List<BlockingState> blockingStates3 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
         Assert.assertEquals(blockingStates3.size(), 2);
         Assert.assertEquals(blockingStates3.get(0).getBlockedId(), blockableId);
@@ -131,7 +131,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
         // Set the state for service A in the future - there should be no change (already effective)
         final DateTime stateDateTime2 = new DateTime(2013, 6, 6, 10, 11, 12, DateTimeZone.UTC);
         final BlockingState blockingState3 = new DefaultBlockingState(blockableId, type, state, serviceA, false, false, false, stateDateTime2);
-        blockingStateDao.setBlockingState(blockingState3, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState3, null, internalCallContext);
         final List<BlockingState> blockingStates4 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
         Assert.assertEquals(blockingStates4.size(), 2);
         Assert.assertEquals(blockingStates4.get(0).getBlockedId(), blockableId);
@@ -146,7 +146,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
         // Set the state for service A in the past - the new effective date should be respected
         final DateTime stateDateTime3 = new DateTime(2013, 2, 6, 10, 11, 12, DateTimeZone.UTC);
         final BlockingState blockingState4 = new DefaultBlockingState(blockableId, type, state, serviceA, false, false, false, stateDateTime3);
-        blockingStateDao.setBlockingState(blockingState4, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState4, null, internalCallContext);
         final List<BlockingState> blockingStates5 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
         Assert.assertEquals(blockingStates5.size(), 2);
         Assert.assertEquals(blockingStates5.get(0).getBlockedId(), blockableId);
@@ -161,7 +161,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
         // Set a new state for service A
         final DateTime state2DateTime = new DateTime(2013, 12, 6, 10, 11, 12, DateTimeZone.UTC);
         final BlockingState blockingState5 = new DefaultBlockingState(blockableId, type, state2, serviceA, false, false, false, state2DateTime);
-        blockingStateDao.setBlockingState(blockingState5, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState5, null, internalCallContext);
         final List<BlockingState> blockingStates6 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
         Assert.assertEquals(blockingStates6.size(), 3);
         Assert.assertEquals(blockingStates6.get(0).getBlockedId(), blockableId);
diff --git a/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBillingApi.java b/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBillingApi.java
index a9f10f7..e6e0041 100644
--- a/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBillingApi.java
+++ b/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBillingApi.java
@@ -182,8 +182,8 @@ public class TestBillingApi extends JunctionTestSuiteNoDB {
 
         final Account account = createAccount(32);
 
-        blockingStateDao.setBlockingState(new DefaultBlockingState(bunId, BlockingStateType.SUBSCRIPTION_BUNDLE, DISABLED_BUNDLE, "test", true, true, true, now.plusDays(1)), internalCallContext);
-        blockingStateDao.setBlockingState(new DefaultBlockingState(bunId, BlockingStateType.SUBSCRIPTION_BUNDLE, CLEAR_BUNDLE, "test", false, false, false, now.plusDays(2)), internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(new DefaultBlockingState(bunId, BlockingStateType.SUBSCRIPTION_BUNDLE, DISABLED_BUNDLE, "test", true, true, true, now.plusDays(1)), null, internalCallContext);
+        blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(new DefaultBlockingState(bunId, BlockingStateType.SUBSCRIPTION_BUNDLE, CLEAR_BUNDLE, "test", false, false, false, now.plusDays(2)), null, internalCallContext);
 
         final SortedSet<BillingEvent> events = billingInternalApi.getBillingEventsForAccountAndUpdateAccountBCD(account.getId(), null, internalCallContext);