killbill-uncached
Changes
entitlement/src/main/java/org/killbill/billing/entitlement/block/DefaultBlockingChecker.java 25(+9 -16)
entitlement/src/main/java/org/killbill/billing/entitlement/block/StatelessBlockingChecker.java 41(+41 -0)
entitlement/src/main/java/org/killbill/billing/entitlement/dao/DefaultBlockingStateDao.java 167(+154 -13)
entitlement/src/main/java/org/killbill/billing/entitlement/dao/OptimizedProxyBlockingStateDao.java 12(+8 -4)
entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EntitlementUtils.java 120(+14 -106)
entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EventsStreamBuilder.java 11(+8 -3)
Details
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/block/DefaultBlockingChecker.java b/entitlement/src/main/java/org/killbill/billing/entitlement/block/DefaultBlockingChecker.java
index d468dbd..0c2b756 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/block/DefaultBlockingChecker.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/block/DefaultBlockingChecker.java
@@ -1,7 +1,9 @@
/*
* Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
*
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
@@ -115,6 +117,8 @@ public class DefaultBlockingChecker implements BlockingChecker {
private final SubscriptionBaseInternalApi subscriptionApi;
private final BlockingStateDao dao;
+ private final StatelessBlockingChecker statelessBlockingChecker = new StatelessBlockingChecker();
+
@Inject
public DefaultBlockingChecker(final SubscriptionBaseInternalApi subscriptionApi, final BlockingStateDao dao) {
this.subscriptionApi = subscriptionApi;
@@ -126,7 +130,7 @@ public class DefaultBlockingChecker implements BlockingChecker {
try {
subscription = subscriptionApi.getSubscriptionFromId(subscriptionId, context);
return getBlockedStateSubscription(subscription, upToDate, context);
- } catch (SubscriptionBaseApiException e) {
+ } catch (final SubscriptionBaseApiException e) {
throw new BlockingApiException(e, ErrorCode.fromCode(e.getCode()));
}
}
@@ -152,7 +156,7 @@ public class DefaultBlockingChecker implements BlockingChecker {
try {
bundle = subscriptionApi.getBundleFromId(bundleId, context);
return getBlockedStateBundle(bundle, upToDate, context);
- } catch (SubscriptionBaseApiException e) {
+ } catch (final SubscriptionBaseApiException e) {
throw new BlockingApiException(e, ErrorCode.fromCode(e.getCode()));
}
}
@@ -185,15 +189,7 @@ public class DefaultBlockingChecker implements BlockingChecker {
} else {
blockableState = ImmutableList.<BlockingState>of();
}
- return getBlockedState(blockableState);
- }
-
- private DefaultBlockingAggregator getBlockedState(final Iterable<BlockingState> currentBlockableStatePerService) {
- final DefaultBlockingAggregator result = new DefaultBlockingAggregator();
- for (final BlockingState cur : currentBlockableStatePerService) {
- result.or(cur);
- }
- return result;
+ return statelessBlockingChecker.getBlockedState(blockableState);
}
@Override
@@ -209,10 +205,7 @@ public class DefaultBlockingChecker implements BlockingChecker {
@Override
public BlockingAggregator getBlockedStatus(final List<BlockingState> accountEntitlementStates, final List<BlockingState> bundleEntitlementStates, final List<BlockingState> subscriptionEntitlementStates, final InternalTenantContext internalTenantContext) {
- final DefaultBlockingAggregator result = getBlockedState(subscriptionEntitlementStates);
- result.or(getBlockedState(bundleEntitlementStates));
- result.or(getBlockedState(accountEntitlementStates));
- return result;
+ return statelessBlockingChecker.getBlockedState(accountEntitlementStates, bundleEntitlementStates, subscriptionEntitlementStates);
}
@Override
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/block/StatelessBlockingChecker.java b/entitlement/src/main/java/org/killbill/billing/entitlement/block/StatelessBlockingChecker.java
new file mode 100644
index 0000000..495c494
--- /dev/null
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/block/StatelessBlockingChecker.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016 Groupon, Inc
+ * Copyright 2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.entitlement.block;
+
+import org.killbill.billing.entitlement.api.BlockingState;
+import org.killbill.billing.entitlement.block.DefaultBlockingChecker.DefaultBlockingAggregator;
+
+public class StatelessBlockingChecker {
+
+ public DefaultBlockingAggregator getBlockedState(final Iterable<BlockingState> accountEntitlementStates,
+ final Iterable<BlockingState> bundleEntitlementStates,
+ final Iterable<BlockingState> subscriptionEntitlementStates) {
+ final DefaultBlockingAggregator result = getBlockedState(subscriptionEntitlementStates);
+ result.or(getBlockedState(bundleEntitlementStates));
+ result.or(getBlockedState(accountEntitlementStates));
+ return result;
+ }
+
+ public DefaultBlockingAggregator getBlockedState(final Iterable<BlockingState> currentBlockableStatePerService) {
+ final DefaultBlockingAggregator result = new DefaultBlockingAggregator();
+ for (final BlockingState cur : currentBlockableStatePerService) {
+ result.or(cur);
+ }
+ return result;
+ }
+}
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/BlockingStateDao.java b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/BlockingStateDao.java
index 75c4ebb..770f56d 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/BlockingStateDao.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/BlockingStateDao.java
@@ -61,12 +61,13 @@ public interface BlockingStateDao extends EntityDao<BlockingStateModelDao, Block
public List<BlockingState> getBlockingAllForAccountRecordId(InternalTenantContext context);
/**
- * Sets a new state for a specific service.
+ * Sets a new state for a specific service and send an event if needed
*
* @param state blocking state to set
+ * @param bundleId bundle id of the associated bundle if the blocking state type is SUBSCRIPTION
* @param context call context
*/
- public void setBlockingState(BlockingState state, InternalCallContext context);
+ public void setBlockingStateAndPostBlockingTransitionEvent(BlockingState state, UUID bundleId, InternalCallContext context);
/**
* Unactive the blocking state
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/DefaultBlockingStateDao.java b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/DefaultBlockingStateDao.java
index a921ba2..6db86ed 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/DefaultBlockingStateDao.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/DefaultBlockingStateDao.java
@@ -18,6 +18,7 @@
package org.killbill.billing.entitlement.dao;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@@ -30,27 +31,49 @@ import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.killbill.billing.ErrorCode;
+import org.killbill.billing.ObjectType;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.entitlement.DefaultEntitlementService;
+import org.killbill.billing.entitlement.EntitlementService;
+import org.killbill.billing.entitlement.api.BlockingApiException;
import org.killbill.billing.entitlement.api.BlockingState;
import org.killbill.billing.entitlement.api.BlockingStateType;
+import org.killbill.billing.entitlement.api.DefaultBlockingTransitionInternalEvent;
import org.killbill.billing.entitlement.api.EntitlementApiException;
+import org.killbill.billing.entitlement.block.BlockingChecker.BlockingAggregator;
+import org.killbill.billing.entitlement.block.StatelessBlockingChecker;
+import org.killbill.billing.entitlement.engine.core.BlockingTransitionNotificationKey;
+import org.killbill.billing.util.cache.Cachable.CacheType;
import org.killbill.billing.util.cache.CacheControllerDispatcher;
import org.killbill.billing.util.dao.NonEntityDao;
import org.killbill.billing.util.entity.dao.EntityDaoBase;
import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
import org.killbill.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import org.killbill.bus.api.BusEvent;
+import org.killbill.bus.api.PersistentBus;
+import org.killbill.bus.api.PersistentBus.EventBusException;
import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
public class DefaultBlockingStateDao extends EntityDaoBase<BlockingStateModelDao, BlockingState, EntitlementApiException> implements BlockingStateDao {
+ private static final Logger log = LoggerFactory.getLogger(DefaultBlockingStateDao.class);
+
// Assume the input is blocking states for a single blockable id
private static final Ordering<BlockingStateModelDao> BLOCKING_STATE_MODEL_DAO_ORDERING = Ordering.<BlockingStateModelDao>from(new Comparator<BlockingStateModelDao>() {
@Override
@@ -79,11 +102,21 @@ public class DefaultBlockingStateDao extends EntityDaoBase<BlockingStateModelDao
});
private final Clock clock;
+ private final NotificationQueueService notificationQueueService;
+ private final PersistentBus eventBus;
+ private final CacheControllerDispatcher cacheControllerDispatcher;
+ private final NonEntityDao nonEntityDao;
+
+ private final StatelessBlockingChecker statelessBlockingChecker = new StatelessBlockingChecker();
- public DefaultBlockingStateDao(final IDBI dbi, final Clock clock,
+ public DefaultBlockingStateDao(final IDBI dbi, final Clock clock, final NotificationQueueService notificationQueueService, final PersistentBus eventBus,
final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
super(new EntitySqlDaoTransactionalJdbiWrapper(dbi, clock, cacheControllerDispatcher, nonEntityDao), BlockingStateSqlDao.class);
this.clock = clock;
+ this.notificationQueueService = notificationQueueService;
+ this.eventBus = eventBus;
+ this.cacheControllerDispatcher = cacheControllerDispatcher;
+ this.nonEntityDao = nonEntityDao;
}
@Override
@@ -109,20 +142,25 @@ public class DefaultBlockingStateDao extends EntityDaoBase<BlockingStateModelDao
return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<BlockingState>>() {
@Override
public List<BlockingState> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
- // Upper bound time limit is now
- final Date upTo = upToDate.toDate();
- final List<BlockingStateModelDao> models = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class).getBlockingState(blockableId, upTo, context);
- final Collection<BlockingStateModelDao> modelsFiltered = filterBlockingStates(models, blockingStateType);
- return new ArrayList<BlockingState>(Collections2.transform(modelsFiltered, new Function<BlockingStateModelDao, BlockingState>() {
- @Override
- public BlockingState apply(@Nullable final BlockingStateModelDao src) {
- return BlockingStateModelDao.toBlockingState(src);
- }
- }));
+ final BlockingStateSqlDao sqlDao = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class);
+ return getBlockingState(sqlDao, blockableId, blockingStateType, upToDate, context);
}
});
}
+ private List<BlockingState> getBlockingState(final BlockingStateSqlDao sqlDao, final UUID blockableId, final BlockingStateType blockingStateType, final DateTime upToDate, final InternalTenantContext context) {
+ final Date upTo = upToDate.toDate();
+ final List<BlockingStateModelDao> models = sqlDao.getBlockingState(blockableId, upTo, context);
+ final Collection<BlockingStateModelDao> modelsFiltered = filterBlockingStates(models, blockingStateType);
+ return new ArrayList<BlockingState>(Collections2.transform(modelsFiltered,
+ new Function<BlockingStateModelDao, BlockingState>() {
+ @Override
+ public BlockingState apply(@Nullable final BlockingStateModelDao src) {
+ return BlockingStateModelDao.toBlockingState(src);
+ }
+ }));
+ }
+
@Override
public List<BlockingState> getBlockingAllForAccountRecordId(final InternalTenantContext context) {
return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<BlockingState>>() {
@@ -141,13 +179,16 @@ public class DefaultBlockingStateDao extends EntityDaoBase<BlockingStateModelDao
}
@Override
- public void setBlockingState(final BlockingState state, final InternalCallContext context) {
+ public void setBlockingStateAndPostBlockingTransitionEvent(final BlockingState state, final UUID bundleId, final InternalCallContext context) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@Override
public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
+ final DateTime upToDate = clock.getUTCNow();
+ final BlockingStateSqlDao sqlDao = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class);
+ final BlockingAggregator previousState = getBlockedStatus(sqlDao, entitySqlDaoWrapperFactory.getHandle(), state.getBlockedId(), state.getType(), bundleId, upToDate, context);
+
final BlockingStateModelDao newBlockingStateModelDao = new BlockingStateModelDao(state, context);
- final BlockingStateSqlDao sqlDao = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class);
// Get all blocking states for that blocked id and service
final List<BlockingStateModelDao> allForBlockedItAndService = sqlDao.getBlockingHistoryForService(state.getBlockedId(), state.getService(), context);
@@ -185,11 +226,111 @@ public class DefaultBlockingStateDao extends EntityDaoBase<BlockingStateModelDao
sqlDao.create(newBlockingStateModelDao, context);
}
+ final BlockingAggregator currentState = getBlockedStatus(sqlDao, entitySqlDaoWrapperFactory.getHandle(), state.getBlockedId(), state.getType(), bundleId, upToDate, context);
+ if (previousState != null && currentState != null) {
+ recordBusOrFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
+ state.getId(),
+ state.getEffectiveDate(),
+ state.getBlockedId(),
+ state.getType(),
+ state.getService(),
+ previousState,
+ currentState,
+ context);
+ }
+
return null;
}
});
}
+ private BlockingAggregator getBlockedStatus(final BlockingStateSqlDao sqlDao, final Handle handle, final UUID blockableId, final BlockingStateType type, @Nullable final UUID bundleId, final DateTime upToDate, final InternalTenantContext context) throws BlockingApiException {
+ final List<BlockingState> accountBlockingStates;
+ final List<BlockingState> bundleBlockingStates;
+ final List<BlockingState> subscriptionBlockingStates;
+ if (type == BlockingStateType.SUBSCRIPTION) {
+ final UUID accountId = nonEntityDao.retrieveIdFromObjectInTransaction(context.getAccountRecordId(), ObjectType.ACCOUNT, cacheControllerDispatcher.getCacheController(CacheType.OBJECT_ID), handle);
+ accountBlockingStates = getBlockingState(sqlDao, accountId, BlockingStateType.ACCOUNT, upToDate, context);
+ bundleBlockingStates = getBlockingState(sqlDao, bundleId, BlockingStateType.SUBSCRIPTION_BUNDLE, upToDate, context);
+ subscriptionBlockingStates = getBlockingState(sqlDao, blockableId, BlockingStateType.SUBSCRIPTION, upToDate, context);
+ } else if (type == BlockingStateType.SUBSCRIPTION_BUNDLE) {
+ final UUID accountId = nonEntityDao.retrieveIdFromObjectInTransaction(context.getAccountRecordId(), ObjectType.ACCOUNT, cacheControllerDispatcher.getCacheController(CacheType.OBJECT_ID), handle);
+ accountBlockingStates = getBlockingState(sqlDao, accountId, BlockingStateType.ACCOUNT, upToDate, context);
+ bundleBlockingStates = getBlockingState(sqlDao, blockableId, BlockingStateType.SUBSCRIPTION_BUNDLE, upToDate, context);
+ subscriptionBlockingStates = ImmutableList.<BlockingState>of();
+ } else { // BlockingStateType.ACCOUNT {
+ accountBlockingStates = getBlockingState(sqlDao, blockableId, BlockingStateType.ACCOUNT, upToDate, context);
+ bundleBlockingStates = ImmutableList.<BlockingState>of();
+ subscriptionBlockingStates = ImmutableList.<BlockingState>of();
+ }
+ return statelessBlockingChecker.getBlockedState(accountBlockingStates, bundleBlockingStates, subscriptionBlockingStates);
+ }
+
+ private void recordBusOrFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
+ final UUID blockingStateId,
+ final DateTime effectiveDate,
+ final UUID blockableId,
+ final BlockingStateType type,
+ final String serviceName,
+ final BlockingAggregator previousState,
+ final BlockingAggregator currentState,
+ final InternalCallContext context) {
+ final boolean isTransitionToBlockedBilling = !previousState.isBlockBilling() && currentState.isBlockBilling();
+ final boolean isTransitionToUnblockedBilling = previousState.isBlockBilling() && !currentState.isBlockBilling();
+
+ final boolean isTransitionToBlockedEntitlement = !previousState.isBlockEntitlement() && currentState.isBlockEntitlement();
+ final boolean isTransitionToUnblockedEntitlement = previousState.isBlockEntitlement() && !currentState.isBlockEntitlement();
+
+ if (effectiveDate.compareTo(clock.getUTCNow()) > 0) {
+ // Add notification entry to send the bus event at the effective date
+ final NotificationEvent notificationEvent = new BlockingTransitionNotificationKey(blockingStateId, blockableId, type,
+ isTransitionToBlockedBilling, isTransitionToUnblockedBilling,
+ isTransitionToBlockedEntitlement, isTransitionToUnblockedEntitlement);
+ recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, notificationEvent, context);
+ } else {
+ // TODO Do we want to send a DefaultEffectiveEntitlementEvent for entitlement specific blocking states?
+ // Don't post if nothing has changed for entitlement-service
+ if (!serviceName.equals(EntitlementService.ENTITLEMENT_SERVICE_NAME) || !previousState.equals(currentState)) {
+ final BusEvent event = new DefaultBlockingTransitionInternalEvent(blockableId, type,
+ isTransitionToBlockedBilling, isTransitionToUnblockedBilling,
+ isTransitionToBlockedEntitlement, isTransitionToUnblockedEntitlement,
+ context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken());
+ notifyBusFromTransaction(entitySqlDaoWrapperFactory, event);
+ } else {
+ log.debug("Skipping event for service {} (previousState={}, currentState={})", serviceName, previousState, currentState);
+ }
+
+ }
+ }
+
+ private void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
+ final DateTime effectiveDate,
+ final NotificationEvent notificationEvent,
+ final InternalCallContext context) {
+ try {
+ final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME,
+ DefaultEntitlementService.NOTIFICATION_QUEUE_NAME);
+ subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(),
+ effectiveDate,
+ notificationEvent,
+ context.getUserToken(),
+ context.getAccountRecordId(),
+ context.getTenantRecordId());
+ } catch (final NoSuchNotificationQueue e) {
+ throw new RuntimeException(e);
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void notifyBusFromTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final BusEvent event) {
+ try {
+ eventBus.postFromTransaction(event, entitySqlDaoWrapperFactory.getHandle().getConnection());
+ } catch (final EventBusException e) {
+ log.warn("Failed to post event {}", e);
+ }
+ }
+
@Override
public void unactiveBlockingState(final UUID id, final InternalCallContext context) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/OptimizedProxyBlockingStateDao.java b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/OptimizedProxyBlockingStateDao.java
index cef565e..3c228f2 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/OptimizedProxyBlockingStateDao.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/OptimizedProxyBlockingStateDao.java
@@ -1,7 +1,9 @@
/*
* Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
*
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
@@ -34,7 +36,9 @@ import org.killbill.billing.subscription.api.SubscriptionBaseInternalApi;
import org.killbill.billing.subscription.api.user.SubscriptionBaseBundle;
import org.killbill.billing.util.cache.CacheControllerDispatcher;
import org.killbill.billing.util.dao.NonEntityDao;
+import org.killbill.bus.api.PersistentBus;
import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
import org.skife.jdbi.v2.IDBI;
import com.google.common.collect.ImmutableList;
@@ -42,9 +46,9 @@ import com.google.common.collect.ImmutableList;
public class OptimizedProxyBlockingStateDao extends ProxyBlockingStateDao {
public OptimizedProxyBlockingStateDao(final EventsStreamBuilder eventsStreamBuilder, final SubscriptionBaseInternalApi subscriptionBaseInternalApi,
- final IDBI dbi, final Clock clock, final CacheControllerDispatcher cacheControllerDispatcher,
- final NonEntityDao nonEntityDao) {
- super(eventsStreamBuilder, subscriptionBaseInternalApi, dbi, clock, cacheControllerDispatcher, nonEntityDao);
+ final IDBI dbi, final Clock clock, final NotificationQueueService notificationQueueService, final PersistentBus eventBus,
+ final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
+ super(eventsStreamBuilder, subscriptionBaseInternalApi, dbi, clock, notificationQueueService, eventBus, cacheControllerDispatcher, nonEntityDao);
}
/**
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/ProxyBlockingStateDao.java b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/ProxyBlockingStateDao.java
index e575ecb..4e6a6b9 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/dao/ProxyBlockingStateDao.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/dao/ProxyBlockingStateDao.java
@@ -47,7 +47,9 @@ import org.killbill.billing.util.cache.CacheControllerDispatcher;
import org.killbill.billing.util.customfield.ShouldntHappenException;
import org.killbill.billing.util.dao.NonEntityDao;
import org.killbill.billing.util.entity.Pagination;
+import org.killbill.bus.api.PersistentBus;
import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
import org.skife.jdbi.v2.IDBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -164,12 +166,12 @@ public class ProxyBlockingStateDao implements BlockingStateDao {
@Inject
public ProxyBlockingStateDao(final EventsStreamBuilder eventsStreamBuilder, final SubscriptionBaseInternalApi subscriptionBaseInternalApi,
- final IDBI dbi, final Clock clock,
+ final IDBI dbi, final Clock clock, final NotificationQueueService notificationQueueService, final PersistentBus eventBus,
final CacheControllerDispatcher cacheControllerDispatcher, final NonEntityDao nonEntityDao) {
this.eventsStreamBuilder = eventsStreamBuilder;
this.subscriptionInternalApi = subscriptionBaseInternalApi;
this.clock = clock;
- this.delegate = new DefaultBlockingStateDao(dbi, clock, cacheControllerDispatcher, nonEntityDao);
+ this.delegate = new DefaultBlockingStateDao(dbi, clock, notificationQueueService, eventBus, cacheControllerDispatcher, nonEntityDao);
}
@Override
@@ -229,8 +231,8 @@ public class ProxyBlockingStateDao implements BlockingStateDao {
}
@Override
- public void setBlockingState(final BlockingState state, final InternalCallContext context) {
- delegate.setBlockingState(state, context);
+ public void setBlockingStateAndPostBlockingTransitionEvent(final BlockingState state, final UUID bundleId, final InternalCallContext context) {
+ delegate.setBlockingStateAndPostBlockingTransitionEvent(state, bundleId, context);
}
@Override
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EntitlementUtils.java b/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EntitlementUtils.java
index 8256d96..b0aee5b 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EntitlementUtils.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EntitlementUtils.java
@@ -18,88 +18,62 @@
package org.killbill.billing.entitlement.engine.core;
-import java.io.IOException;
import java.util.UUID;
import javax.inject.Inject;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.killbill.bus.api.BusEvent;
-import org.killbill.bus.api.PersistentBus;
-import org.killbill.bus.api.PersistentBus.EventBusException;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.callcontext.InternalTenantContext;
-import org.killbill.clock.Clock;
import org.killbill.billing.entitlement.DefaultEntitlementService;
-import org.killbill.billing.entitlement.EntitlementService;
-import org.killbill.billing.entitlement.api.BlockingApiException;
import org.killbill.billing.entitlement.api.BlockingState;
import org.killbill.billing.entitlement.api.BlockingStateType;
-import org.killbill.billing.entitlement.api.DefaultBlockingTransitionInternalEvent;
import org.killbill.billing.entitlement.api.DefaultEntitlementApi;
-import org.killbill.billing.entitlement.block.BlockingChecker;
-import org.killbill.billing.entitlement.block.BlockingChecker.BlockingAggregator;
import org.killbill.billing.entitlement.dao.BlockingStateDao;
-import org.killbill.notificationq.api.NotificationEvent;
-import org.killbill.notificationq.api.NotificationQueue;
-import org.killbill.notificationq.api.NotificationQueueService;
-import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
import org.killbill.billing.subscription.api.SubscriptionBaseInternalApi;
+import org.killbill.billing.subscription.api.user.SubscriptionBaseApiException;
+import org.killbill.notificationq.api.NotificationQueueService;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
public class EntitlementUtils {
- private static final Logger log = LoggerFactory.getLogger(EntitlementUtils.class);
+ protected final NotificationQueueService notificationQueueService;
private final BlockingStateDao dao;
- private final BlockingChecker blockingChecker;
private final SubscriptionBaseInternalApi subscriptionBaseInternalApi;
- private final PersistentBus eventBus;
- private final Clock clock;
- protected final NotificationQueueService notificationQueueService;
@Inject
- public EntitlementUtils(final BlockingStateDao dao, final BlockingChecker blockingChecker,
- final PersistentBus eventBus, final Clock clock,
+ public EntitlementUtils(final BlockingStateDao dao,
final SubscriptionBaseInternalApi subscriptionBaseInternalApi,
final NotificationQueueService notificationQueueService) {
this.dao = dao;
- this.blockingChecker = blockingChecker;
- this.eventBus = eventBus;
- this.clock = clock;
this.subscriptionBaseInternalApi = subscriptionBaseInternalApi;
this.notificationQueueService = notificationQueueService;
}
/**
- * Wrapper around BlockingStateDao#setBlockingState which will send an event on the bus if needed
- *
* @param state new state to store
* @param context call context
*/
public void setBlockingStateAndPostBlockingTransitionEvent(final BlockingState state, final InternalCallContext context) {
- final BlockingAggregator previousState = getBlockingStateFor(state.getBlockedId(), state.getType(), clock.getUTCNow(), context);
-
- dao.setBlockingState(state, context);
-
- final BlockingAggregator currentState = getBlockingStateFor(state.getBlockedId(), state.getType(), state.getEffectiveDate(), context);
- if (previousState != null && currentState != null) {
- postBlockingTransitionEvent(state.getId(), state.getEffectiveDate(), state.getBlockedId(), state.getType(), state.getService(), previousState, currentState, context);
+ UUID bundleId = null;
+ if (state.getType() == BlockingStateType.SUBSCRIPTION) {
+ try {
+ bundleId = subscriptionBaseInternalApi.getSubscriptionFromId(state.getBlockedId(), context).getBundleId();
+ } catch (final SubscriptionBaseApiException e) {
+ throw new RuntimeException(e);
+ }
}
+ dao.setBlockingStateAndPostBlockingTransitionEvent(state, bundleId, context);
}
/**
- *
- * @param externalKey the bundle externalKey
+ * @param externalKey the bundle externalKey
* @param tenantContext the context
* @return the id of the first subscription (BASE or STANDALONE) that is still active for that key
*/
- public UUID getFirstActiveSubscriptionIdForKeyOrNull(final String externalKey, final InternalTenantContext tenantContext) {
+ public UUID getFirstActiveSubscriptionIdForKeyOrNull(final String externalKey, final InternalTenantContext tenantContext) {
final Iterable<UUID> nonAddonUUIDs = subscriptionBaseInternalApi.getNonAOSubscriptionIdsForKey(externalKey, tenantContext);
return Iterables.tryFind(nonAddonUUIDs, new Predicate<UUID>() {
@@ -110,70 +84,4 @@ public class EntitlementUtils {
}
}).orNull();
}
-
-
- private BlockingAggregator getBlockingStateFor(final UUID blockableId, final BlockingStateType type, final DateTime effectiveDate, final InternalCallContext context) {
- try {
- return blockingChecker.getBlockedStatus(blockableId, type, effectiveDate, context);
- } catch (BlockingApiException e) {
- log.warn("Failed to retrieve blocking state for {} {}", blockableId, type);
- return null;
- }
- }
-
- private void postBlockingTransitionEvent(final UUID blockingStateId, final DateTime effectiveDate, final UUID blockableId, final BlockingStateType type,
- final String serviceName, final BlockingAggregator previousState, final BlockingAggregator currentState,
- final InternalCallContext context) {
- final boolean isTransitionToBlockedBilling = !previousState.isBlockBilling() && currentState.isBlockBilling();
- final boolean isTransitionToUnblockedBilling = previousState.isBlockBilling() && !currentState.isBlockBilling();
-
- final boolean isTransitionToBlockedEntitlement = !previousState.isBlockEntitlement() && currentState.isBlockEntitlement();
- final boolean isTransitionToUnblockedEntitlement = previousState.isBlockEntitlement() && !currentState.isBlockEntitlement();
-
- if (effectiveDate.compareTo(clock.getUTCNow()) > 0) {
- // Add notification entry to send the bus event at the effective date
- final NotificationEvent notificationEvent = new BlockingTransitionNotificationKey(blockingStateId, blockableId, type,
- isTransitionToBlockedBilling, isTransitionToUnblockedBilling,
- isTransitionToBlockedEntitlement, isTransitionToUnblockedEntitlement);
- recordFutureNotification(effectiveDate, notificationEvent, context);
- } else {
- // TODO Do we want to send a DefaultEffectiveEntitlementEvent for entitlement specific blocking states?
-
- // Don't post if nothing has changed for entitlement-service
- if (! serviceName.equals(EntitlementService.ENTITLEMENT_SERVICE_NAME) || ! previousState.equals(currentState)) {
- final BusEvent event = new DefaultBlockingTransitionInternalEvent(blockableId, type,
- isTransitionToBlockedBilling, isTransitionToUnblockedBilling,
- isTransitionToBlockedEntitlement, isTransitionToUnblockedEntitlement,
- context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken());
- postBusEvent(event);
- } else {
- System.out.println("********** SKIPPING EVENT ");
- }
-
- }
- }
-
- private void postBusEvent(final BusEvent event) {
- try {
- // TODO STEPH Ideally we would like to post from transaction when we inserted the new blocking state, but new state would have to be recalculated from transaction which is
- // difficult without the help of BlockingChecker -- which itself relies on dao. Other alternative is duplicating the logic, or refactoring the DAO to export higher level api.
- eventBus.post(event);
- } catch (EventBusException e) {
- log.warn("Failed to post event {}", e);
- }
- }
-
- private void recordFutureNotification(final DateTime effectiveDate,
- final NotificationEvent notificationEvent,
- final InternalCallContext context) {
- try {
- final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME,
- DefaultEntitlementService.NOTIFICATION_QUEUE_NAME);
- subscriptionEventQueue.recordFutureNotification(effectiveDate, notificationEvent, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId());
- } catch (NoSuchNotificationQueue e) {
- throw new RuntimeException(e);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
}
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EventsStreamBuilder.java b/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EventsStreamBuilder.java
index afd51c8..b090a03 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EventsStreamBuilder.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/EventsStreamBuilder.java
@@ -1,7 +1,9 @@
/*
* Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
*
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
@@ -53,7 +55,9 @@ import org.killbill.billing.util.cache.CacheControllerDispatcher;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.TenantContext;
import org.killbill.billing.util.dao.NonEntityDao;
+import org.killbill.bus.api.PersistentBus;
import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
import org.skife.jdbi.v2.IDBI;
import com.google.common.base.Objects;
@@ -75,6 +79,7 @@ public class EventsStreamBuilder {
@Inject
public EventsStreamBuilder(final AccountInternalApi accountInternalApi, final SubscriptionBaseInternalApi subscriptionInternalApi,
final BlockingChecker checker, final IDBI dbi, final Clock clock,
+ final NotificationQueueService notificationQueueService, final PersistentBus eventBus,
final CacheControllerDispatcher cacheControllerDispatcher,
final NonEntityDao nonEntityDao,
final InternalCallContextFactory internalCallContextFactory) {
@@ -84,8 +89,8 @@ public class EventsStreamBuilder {
this.clock = clock;
this.internalCallContextFactory = internalCallContextFactory;
- this.defaultBlockingStateDao = new DefaultBlockingStateDao(dbi, clock, cacheControllerDispatcher, nonEntityDao);
- this.blockingStateDao = new OptimizedProxyBlockingStateDao(this, subscriptionInternalApi, dbi, clock, cacheControllerDispatcher, nonEntityDao);
+ this.defaultBlockingStateDao = new DefaultBlockingStateDao(dbi, clock, notificationQueueService, eventBus, cacheControllerDispatcher, nonEntityDao);
+ this.blockingStateDao = new OptimizedProxyBlockingStateDao(this, subscriptionInternalApi, dbi, clock, notificationQueueService, eventBus, cacheControllerDispatcher, nonEntityDao);
}
public EventsStream refresh(final EventsStream eventsStream, final TenantContext tenantContext) throws EntitlementApiException {
diff --git a/entitlement/src/test/java/org/killbill/billing/entitlement/block/TestBlockingChecker.java b/entitlement/src/test/java/org/killbill/billing/entitlement/block/TestBlockingChecker.java
index 8689df2..37c0e6b 100644
--- a/entitlement/src/test/java/org/killbill/billing/entitlement/block/TestBlockingChecker.java
+++ b/entitlement/src/test/java/org/killbill/billing/entitlement/block/TestBlockingChecker.java
@@ -73,17 +73,17 @@ public class TestBlockingChecker extends EntitlementTestSuiteNoDB {
private void setStateBundle(final boolean bC, final boolean bE, final boolean bB) {
final BlockingState bundleState = new DefaultBlockingState(bundle.getId(), BlockingStateType.SUBSCRIPTION_BUNDLE,"state", "test-service", bC, bE, bB, clock.getUTCNow());
- blockingStateDao.setBlockingState(bundleState, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(bundleState, null, internalCallContext);
}
private void setStateAccount(final boolean bC, final boolean bE, final boolean bB) {
final BlockingState accountState = new DefaultBlockingState(account.getId(), BlockingStateType.ACCOUNT, "state", "test-service", bC, bE, bB, clock.getUTCNow());
- blockingStateDao.setBlockingState(accountState, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(accountState, null, internalCallContext);
}
private void setStateSubscription(final boolean bC, final boolean bE, final boolean bB) {
final BlockingState subscriptionState = new DefaultBlockingState(subscription.getId(), BlockingStateType.SUBSCRIPTION, "state", "test-service", bC, bE, bB, clock.getUTCNow());
- blockingStateDao.setBlockingState(subscriptionState, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(subscriptionState, subscription.getBundleId(), internalCallContext);
}
@Test(groups = "fast")
diff --git a/entitlement/src/test/java/org/killbill/billing/entitlement/dao/MockBlockingStateDao.java b/entitlement/src/test/java/org/killbill/billing/entitlement/dao/MockBlockingStateDao.java
index 52ff2f3..4fc146b 100644
--- a/entitlement/src/test/java/org/killbill/billing/entitlement/dao/MockBlockingStateDao.java
+++ b/entitlement/src/test/java/org/killbill/billing/entitlement/dao/MockBlockingStateDao.java
@@ -84,7 +84,7 @@ public class MockBlockingStateDao extends MockEntityDaoBase<BlockingStateModelDa
}
@Override
- public synchronized void setBlockingState(final BlockingState state, final InternalCallContext context) {
+ public synchronized void setBlockingStateAndPostBlockingTransitionEvent(final BlockingState state, final UUID bundleId, final InternalCallContext context) {
if (blockingStates.get(state.getBlockedId()) == null) {
blockingStates.put(state.getBlockedId(), new ArrayList<BlockingState>());
}
diff --git a/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestBlockingDao.java b/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestBlockingDao.java
index b5addb7..ba25a23 100644
--- a/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestBlockingDao.java
+++ b/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestBlockingDao.java
@@ -55,13 +55,13 @@ public class TestBlockingDao extends EntitlementTestSuiteWithEmbeddedDB {
clock.setDay(new LocalDate(2012, 4, 1));
final BlockingState state1 = new DefaultBlockingState(uuid, BlockingStateType.ACCOUNT, overdueStateName, service, blockChange, blockEntitlement, blockBilling, clock.getUTCNow());
- blockingStateDao.setBlockingState(state1, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(state1, null, internalCallContext);
clock.addDays(1);
final String overdueStateName2 = "NoReallyThisCantGoOn";
final BlockingState state2 = new DefaultBlockingState(uuid, BlockingStateType.ACCOUNT, overdueStateName2, service, blockChange, blockEntitlement, blockBilling, clock.getUTCNow());
- blockingStateDao.setBlockingState(state2, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(state2, null, internalCallContext);
Assert.assertEquals(blockingStateDao.getBlockingStateForService(uuid, BlockingStateType.ACCOUNT, service, internalCallContext).getStateName(), state2.getStateName());
@@ -83,14 +83,14 @@ public class TestBlockingDao extends EntitlementTestSuiteWithEmbeddedDB {
final boolean blockBilling = false;
final BlockingState state1 = new DefaultBlockingState(uuid, BlockingStateType.ACCOUNT, overdueStateName, service1, blockChange, blockEntitlement, blockBilling, clock.getUTCNow());
- blockingStateDao.setBlockingState(state1, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(state1, null, internalCallContext);
clock.setDeltaFromReality(1000 * 3600 * 24);
final String service2 = "TEST2";
final String overdueStateName2 = "NoReallyThisCantGoOn";
final BlockingState state2 = new DefaultBlockingState(uuid, BlockingStateType.ACCOUNT, overdueStateName2, service2, blockChange, blockEntitlement, blockBilling, clock.getUTCNow());
- blockingStateDao.setBlockingState(state2, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(state2, null, internalCallContext);
final List<BlockingState> history2 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
Assert.assertEquals(history2.size(), 2);
diff --git a/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestDefaultBlockingStateDao.java b/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestDefaultBlockingStateDao.java
index bb066b4..8935cdd 100644
--- a/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestDefaultBlockingStateDao.java
+++ b/entitlement/src/test/java/org/killbill/billing/entitlement/dao/TestDefaultBlockingStateDao.java
@@ -74,7 +74,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
// Set a state
final DateTime stateDateTime = new DateTime(2013, 5, 6, 10, 11, 12, DateTimeZone.UTC);
final BlockingState blockingState = new DefaultBlockingState(entitlement.getId(), type, state, service, false, false, false, stateDateTime);
- blockingStateDao.setBlockingState(blockingState, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState, entitlement.getBundleId(), internalCallContext);
Assert.assertEquals(blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext).size(), 1);
}
@@ -97,7 +97,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
// Set a state for service A
final DateTime stateDateTime = new DateTime(2013, 5, 6, 10, 11, 12, DateTimeZone.UTC);
final BlockingState blockingState1 = new DefaultBlockingState(blockableId, type, state, serviceA, false, false, false, stateDateTime);
- blockingStateDao.setBlockingState(blockingState1, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState1, null, internalCallContext);
final List<BlockingState> blockingStates1 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
Assert.assertEquals(blockingStates1.size(), 1);
Assert.assertEquals(blockingStates1.get(0).getBlockedId(), blockableId);
@@ -106,7 +106,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
Assert.assertEquals(blockingStates1.get(0).getEffectiveDate(), stateDateTime);
// Set the same state again - no change
- blockingStateDao.setBlockingState(blockingState1, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState1, null, internalCallContext);
final List<BlockingState> blockingStates2 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
Assert.assertEquals(blockingStates2.size(), 1);
Assert.assertEquals(blockingStates2.get(0).getBlockedId(), blockableId);
@@ -116,7 +116,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
// Set the state for service B
final BlockingState blockingState2 = new DefaultBlockingState(blockableId, type, state, serviceB, false, false, false, stateDateTime);
- blockingStateDao.setBlockingState(blockingState2, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState2, null, internalCallContext);
final List<BlockingState> blockingStates3 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
Assert.assertEquals(blockingStates3.size(), 2);
Assert.assertEquals(blockingStates3.get(0).getBlockedId(), blockableId);
@@ -131,7 +131,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
// Set the state for service A in the future - there should be no change (already effective)
final DateTime stateDateTime2 = new DateTime(2013, 6, 6, 10, 11, 12, DateTimeZone.UTC);
final BlockingState blockingState3 = new DefaultBlockingState(blockableId, type, state, serviceA, false, false, false, stateDateTime2);
- blockingStateDao.setBlockingState(blockingState3, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState3, null, internalCallContext);
final List<BlockingState> blockingStates4 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
Assert.assertEquals(blockingStates4.size(), 2);
Assert.assertEquals(blockingStates4.get(0).getBlockedId(), blockableId);
@@ -146,7 +146,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
// Set the state for service A in the past - the new effective date should be respected
final DateTime stateDateTime3 = new DateTime(2013, 2, 6, 10, 11, 12, DateTimeZone.UTC);
final BlockingState blockingState4 = new DefaultBlockingState(blockableId, type, state, serviceA, false, false, false, stateDateTime3);
- blockingStateDao.setBlockingState(blockingState4, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState4, null, internalCallContext);
final List<BlockingState> blockingStates5 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
Assert.assertEquals(blockingStates5.size(), 2);
Assert.assertEquals(blockingStates5.get(0).getBlockedId(), blockableId);
@@ -161,7 +161,7 @@ public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbedde
// Set a new state for service A
final DateTime state2DateTime = new DateTime(2013, 12, 6, 10, 11, 12, DateTimeZone.UTC);
final BlockingState blockingState5 = new DefaultBlockingState(blockableId, type, state2, serviceA, false, false, false, state2DateTime);
- blockingStateDao.setBlockingState(blockingState5, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(blockingState5, null, internalCallContext);
final List<BlockingState> blockingStates6 = blockingStateDao.getBlockingAllForAccountRecordId(internalCallContext);
Assert.assertEquals(blockingStates6.size(), 3);
Assert.assertEquals(blockingStates6.get(0).getBlockedId(), blockableId);
diff --git a/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBillingApi.java b/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBillingApi.java
index a9f10f7..e6e0041 100644
--- a/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBillingApi.java
+++ b/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBillingApi.java
@@ -182,8 +182,8 @@ public class TestBillingApi extends JunctionTestSuiteNoDB {
final Account account = createAccount(32);
- blockingStateDao.setBlockingState(new DefaultBlockingState(bunId, BlockingStateType.SUBSCRIPTION_BUNDLE, DISABLED_BUNDLE, "test", true, true, true, now.plusDays(1)), internalCallContext);
- blockingStateDao.setBlockingState(new DefaultBlockingState(bunId, BlockingStateType.SUBSCRIPTION_BUNDLE, CLEAR_BUNDLE, "test", false, false, false, now.plusDays(2)), internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(new DefaultBlockingState(bunId, BlockingStateType.SUBSCRIPTION_BUNDLE, DISABLED_BUNDLE, "test", true, true, true, now.plusDays(1)), null, internalCallContext);
+ blockingStateDao.setBlockingStateAndPostBlockingTransitionEvent(new DefaultBlockingState(bunId, BlockingStateType.SUBSCRIPTION_BUNDLE, CLEAR_BUNDLE, "test", false, false, false, now.plusDays(2)), null, internalCallContext);
final SortedSet<BillingEvent> events = billingInternalApi.getBillingEventsForAccountAndUpdateAccountBCD(account.getId(), null, internalCallContext);