killbill-uncached

entitlement: avoid inserting duplicate blocking states This

10/29/2013 5:58:37 PM

Details

diff --git a/.idea/dictionaries/pierre.xml b/.idea/dictionaries/pierre.xml
index 044303d..987b7a2 100644
--- a/.idea/dictionaries/pierre.xml
+++ b/.idea/dictionaries/pierre.xml
@@ -2,6 +2,7 @@
   <dictionary name="pierre">
     <words>
       <w>aoped</w>
+      <w>blockable</w>
       <w>daos</w>
       <w>guice</w>
       <w>infos</w>
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultEntitlement.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultEntitlement.java
index 7eb1882..295f529 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultEntitlement.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/DefaultEntitlement.java
@@ -241,13 +241,13 @@ public class DefaultEntitlement extends EntityBase implements Entitlement {
                 return EntitlementService.ENTITLEMENT_SERVICE_NAME.equals(input.getService()) && input.getEffectiveDate().isAfter(clock.getUTCNow());
             }
         });
-        final BlockingState futureCancellation = filtered.iterator().hasNext() ? filtered.iterator().next() : null;
-        if (futureCancellation == null) {
-            return;
-        }
 
         // Reactivate entitlement
-        blockingStateDao.unactiveBlockingState(futureCancellation.getId(), contextWithValidAccountRecordId);
+        // We should only have one future event in theory - but cleanup the data if it's not the case
+        // See https://github.com/killbill/killbill/issues/111
+        for (final BlockingState futureCancellation : filtered) {
+            blockingStateDao.unactiveBlockingState(futureCancellation.getId(), contextWithValidAccountRecordId);
+        }
 
         // If billing was previously cancelled, reactivate
         if (subscriptionBase.getFutureEndDate() != null) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/dao/DefaultBlockingStateDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/dao/DefaultBlockingStateDao.java
index 602ba75..c86331b 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/dao/DefaultBlockingStateDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/dao/DefaultBlockingStateDao.java
@@ -17,6 +17,9 @@
 package com.ning.billing.entitlement.dao;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.UUID;
 
@@ -25,11 +28,11 @@ import javax.inject.Inject;
 
 import org.skife.jdbi.v2.IDBI;
 
-import com.ning.billing.entitlement.api.BlockingState;
-import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.callcontext.InternalCallContext;
 import com.ning.billing.callcontext.InternalTenantContext;
 import com.ning.billing.clock.Clock;
+import com.ning.billing.entitlement.api.BlockingState;
+import com.ning.billing.util.cache.CacheControllerDispatcher;
 import com.ning.billing.util.dao.NonEntityDao;
 import com.ning.billing.util.entity.dao.EntitySqlDao;
 import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
@@ -38,6 +41,7 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
+import com.google.common.collect.Ordering;
 
 public class DefaultBlockingStateDao implements BlockingStateDao {
 
@@ -52,7 +56,7 @@ public class DefaultBlockingStateDao implements BlockingStateDao {
 
     @Override
     public BlockingState getBlockingStateForService(final UUID blockableId, final String serviceName, final InternalTenantContext context) {
-        return transactionalSqlDao.execute(  new EntitySqlDaoTransactionWrapper<BlockingState>() {
+        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<BlockingState>() {
             @Override
             public BlockingState inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
                 final BlockingStateModelDao model = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class).getBlockingStateForService(blockableId, serviceName, clock.getUTCNow().toDate(), context);
@@ -64,10 +68,10 @@ public class DefaultBlockingStateDao implements BlockingStateDao {
 
     @Override
     public List<BlockingState> getBlockingState(final UUID blockableId, final InternalTenantContext context) {
-        return transactionalSqlDao.execute(  new EntitySqlDaoTransactionWrapper<List<BlockingState>>() {
+        return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<BlockingState>>() {
             @Override
             public List<BlockingState> inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
-                final  List<BlockingStateModelDao> models = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class).getBlockingState(blockableId, clock.getUTCNow().toDate(), context);
+                final List<BlockingStateModelDao> models = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class).getBlockingState(blockableId, clock.getUTCNow().toDate(), context);
                 return new ArrayList<BlockingState>(Collections2.transform(models, new Function<BlockingStateModelDao, BlockingState>() {
                     @Override
                     public BlockingState apply(@Nullable final BlockingStateModelDao src) {
@@ -147,8 +151,58 @@ public class DefaultBlockingStateDao implements BlockingStateDao {
         transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
             @Override
             public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+                final BlockingStateModelDao newBlockingStateModelDao = new BlockingStateModelDao(state, context);
+
                 final BlockingStateSqlDao sqlDao = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class);
-                sqlDao.create(new BlockingStateModelDao(state, context), context);
+                // Get all blocking states for that blocked id and service
+                final List<BlockingStateModelDao> allForBlockedItAndService = sqlDao.getBlockingHistoryForService(state.getBlockedId(), state.getService(), clock.getUTCNow().toDate(), context);
+
+                // Add the new one (we rely below on the fact that the ID for newBlockingStateModelDao is now set)
+                allForBlockedItAndService.add(newBlockingStateModelDao);
+
+                // Re-order what should be the final list (allForBlockedItAndService is ordered by record_id in the SQL and we just added a new state)
+                final List<BlockingStateModelDao> allForBlockedItAndServiceOrdered = Ordering.<BlockingStateModelDao>from(new Comparator<BlockingStateModelDao>() {
+                    @Override
+                    public int compare(final BlockingStateModelDao o1, final BlockingStateModelDao o2) {
+                        // effective_date column NOT NULL
+                        final int comparison = o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
+                        if (comparison == 0) {
+                            // Keep a stable ordering for ties
+                            return o1.getCreatedDate().compareTo(o2.getCreatedDate());
+                        } else {
+                            return comparison;
+                        }
+                    }
+                }).immutableSortedCopy(allForBlockedItAndService);
+
+                // Go through the (ordered) stream of blocking states for that blocked id and service and check
+                // if there is one or more blocking states for the same state following each others.
+                // If there are, delete them, as they are not needed anymore. A picture being worth a thousand words,
+                // if the current stream is: t0 S1 t1 S2 t3 S3 and we want to insert S2 at t0 < t1' < t1,
+                // the final stream should be: t0 S1 t1' S2 t3 S3 (and not t0 S1 t1' S2 t1 S2 t3 S3)
+                // Note that we also take care of the use case t0 S1 t1 S2 t2 S2 t3 S3 to cleanup legacy systems, although
+                // it shouldn't happen anymore
+                final Collection<UUID> blockingStatesToRemove = new HashSet<UUID>();
+                BlockingStateModelDao prevBlockingStateModelDao = null;
+                for (final BlockingStateModelDao blockingStateModelDao : allForBlockedItAndServiceOrdered) {
+                    if (prevBlockingStateModelDao != null && prevBlockingStateModelDao.getState().equals(blockingStateModelDao.getState())) {
+                        blockingStatesToRemove.add(blockingStateModelDao.getId());
+                    }
+                    prevBlockingStateModelDao = blockingStateModelDao;
+                }
+
+                // Delete unnecessary states (except newBlockingStateModelDao, which doesn't exist in the database)
+                for (final UUID blockedId : blockingStatesToRemove) {
+                    if (!newBlockingStateModelDao.getId().equals(blockedId)) {
+                        sqlDao.unactiveEvent(blockedId.toString(), context);
+                    }
+                }
+
+                // Create the state, if needed
+                if (!blockingStatesToRemove.contains(newBlockingStateModelDao.getId())) {
+                    sqlDao.create(new BlockingStateModelDao(state, context), context);
+                }
+
                 return null;
             }
         });
@@ -160,7 +214,7 @@ public class DefaultBlockingStateDao implements BlockingStateDao {
             @Override
             public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
                 final BlockingStateSqlDao sqlDao = entitySqlDaoWrapperFactory.become(BlockingStateSqlDao.class);
-                sqlDao.unactiveEvent(id.toString() , context);
+                sqlDao.unactiveEvent(id.toString(), context);
                 return null;
             }
         });
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/dao/TestDefaultBlockingStateDao.java b/entitlement/src/test/java/com/ning/billing/entitlement/dao/TestDefaultBlockingStateDao.java
new file mode 100644
index 0000000..31a87a2
--- /dev/null
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/dao/TestDefaultBlockingStateDao.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2010-2013 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.entitlement.dao;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.ning.billing.entitlement.EntitlementTestSuiteWithEmbeddedDB;
+import com.ning.billing.entitlement.api.BlockingState;
+import com.ning.billing.entitlement.api.BlockingStateType;
+import com.ning.billing.junction.DefaultBlockingState;
+
+public class TestDefaultBlockingStateDao extends EntitlementTestSuiteWithEmbeddedDB {
+
+    // See https://github.com/killbill/killbill/issues/111
+    @Test(groups = "slow", description = "Verify we don't insert duplicate blocking states")
+    public void testSetBlockingState() throws Exception {
+        final UUID blockableId = UUID.randomUUID();
+        final BlockingStateType type = BlockingStateType.ACCOUNT;
+        final String state = "state";
+        final String state2 = "state-2";
+        final String serviceA = "service-A";
+        final String serviceB = "service-B";
+
+        // Verify initial state
+        Assert.assertEquals(blockingStateDao.getBlockingAll(blockableId, internalCallContext).size(), 0);
+
+        // Note: the checkers below rely on record_id ordering, not effective date
+
+        // Set a state for service A
+        final DateTime stateDateTime = new DateTime(2013, 5, 6, 10, 11, 12, DateTimeZone.UTC);
+        final BlockingState blockingState1 = new DefaultBlockingState(blockableId, type, state, serviceA, false, false, false, stateDateTime);
+        blockingStateDao.setBlockingState(blockingState1, clock, internalCallContext);
+        final List<BlockingState> blockingStates1 = blockingStateDao.getBlockingAll(blockableId, internalCallContext);
+        Assert.assertEquals(blockingStates1.size(), 1);
+        Assert.assertEquals(blockingStates1.get(0).getBlockedId(), blockableId);
+        Assert.assertEquals(blockingStates1.get(0).getStateName(), state);
+        Assert.assertEquals(blockingStates1.get(0).getService(), serviceA);
+        Assert.assertEquals(blockingStates1.get(0).getEffectiveDate(), stateDateTime);
+
+        // Set the same state again - no change
+        blockingStateDao.setBlockingState(blockingState1, clock, internalCallContext);
+        final List<BlockingState> blockingStates2 = blockingStateDao.getBlockingAll(blockableId, internalCallContext);
+        Assert.assertEquals(blockingStates2.size(), 1);
+        Assert.assertEquals(blockingStates2.get(0).getBlockedId(), blockableId);
+        Assert.assertEquals(blockingStates2.get(0).getStateName(), state);
+        Assert.assertEquals(blockingStates2.get(0).getService(), serviceA);
+        Assert.assertEquals(blockingStates2.get(0).getEffectiveDate(), stateDateTime);
+
+        // Set the state for service B
+        final BlockingState blockingState2 = new DefaultBlockingState(blockableId, type, state, serviceB, false, false, false, stateDateTime);
+        blockingStateDao.setBlockingState(blockingState2, clock, internalCallContext);
+        final List<BlockingState> blockingStates3 = blockingStateDao.getBlockingAll(blockableId, internalCallContext);
+        Assert.assertEquals(blockingStates3.size(), 2);
+        Assert.assertEquals(blockingStates3.get(0).getBlockedId(), blockableId);
+        Assert.assertEquals(blockingStates3.get(0).getStateName(), state);
+        Assert.assertEquals(blockingStates3.get(0).getService(), serviceA);
+        Assert.assertEquals(blockingStates3.get(0).getEffectiveDate(), stateDateTime);
+        Assert.assertEquals(blockingStates3.get(1).getBlockedId(), blockableId);
+        Assert.assertEquals(blockingStates3.get(1).getStateName(), state);
+        Assert.assertEquals(blockingStates3.get(1).getService(), serviceB);
+        Assert.assertEquals(blockingStates3.get(1).getEffectiveDate(), stateDateTime);
+
+        // Set the state for service A in the future - there should be no change (already effective)
+        final DateTime stateDateTime2 = new DateTime(2013, 6, 6, 10, 11, 12, DateTimeZone.UTC);
+        final BlockingState blockingState3 = new DefaultBlockingState(blockableId, type, state, serviceA, false, false, false, stateDateTime2);
+        blockingStateDao.setBlockingState(blockingState3, clock, internalCallContext);
+        final List<BlockingState> blockingStates4 = blockingStateDao.getBlockingAll(blockableId, internalCallContext);
+        Assert.assertEquals(blockingStates4.size(), 2);
+        Assert.assertEquals(blockingStates4.get(0).getBlockedId(), blockableId);
+        Assert.assertEquals(blockingStates4.get(0).getStateName(), state);
+        Assert.assertEquals(blockingStates4.get(0).getService(), serviceA);
+        Assert.assertEquals(blockingStates4.get(0).getEffectiveDate(), stateDateTime);
+        Assert.assertEquals(blockingStates4.get(1).getBlockedId(), blockableId);
+        Assert.assertEquals(blockingStates4.get(1).getStateName(), state);
+        Assert.assertEquals(blockingStates4.get(1).getService(), serviceB);
+        Assert.assertEquals(blockingStates4.get(1).getEffectiveDate(), stateDateTime);
+
+        // Set the state for service A in the past - the new effective date should be respected
+        final DateTime stateDateTime3 = new DateTime(2013, 2, 6, 10, 11, 12, DateTimeZone.UTC);
+        final BlockingState blockingState4 = new DefaultBlockingState(blockableId, type, state, serviceA, false, false, false, stateDateTime3);
+        blockingStateDao.setBlockingState(blockingState4, clock, internalCallContext);
+        final List<BlockingState> blockingStates5 = blockingStateDao.getBlockingAll(blockableId, internalCallContext);
+        Assert.assertEquals(blockingStates5.size(), 2);
+        Assert.assertEquals(blockingStates5.get(0).getBlockedId(), blockableId);
+        Assert.assertEquals(blockingStates5.get(0).getStateName(), state);
+        Assert.assertEquals(blockingStates5.get(0).getService(), serviceB);
+        Assert.assertEquals(blockingStates5.get(0).getEffectiveDate(), stateDateTime);
+        Assert.assertEquals(blockingStates5.get(1).getBlockedId(), blockableId);
+        Assert.assertEquals(blockingStates5.get(1).getStateName(), state);
+        Assert.assertEquals(blockingStates5.get(1).getService(), serviceA);
+        Assert.assertEquals(blockingStates5.get(1).getEffectiveDate(), stateDateTime3);
+
+        // Set a new state for service A
+        final DateTime state2DateTime = new DateTime(2013, 12, 6, 10, 11, 12, DateTimeZone.UTC);
+        final BlockingState blockingState5 = new DefaultBlockingState(blockableId, type, state2, serviceA, false, false, false, state2DateTime);
+        blockingStateDao.setBlockingState(blockingState5, clock, internalCallContext);
+        final List<BlockingState> blockingStates6 = blockingStateDao.getBlockingAll(blockableId, internalCallContext);
+        Assert.assertEquals(blockingStates6.size(), 3);
+        Assert.assertEquals(blockingStates6.get(0).getBlockedId(), blockableId);
+        Assert.assertEquals(blockingStates6.get(0).getStateName(), state);
+        Assert.assertEquals(blockingStates6.get(0).getService(), serviceB);
+        Assert.assertEquals(blockingStates6.get(0).getEffectiveDate(), stateDateTime);
+        Assert.assertEquals(blockingStates6.get(1).getBlockedId(), blockableId);
+        Assert.assertEquals(blockingStates6.get(1).getStateName(), state);
+        Assert.assertEquals(blockingStates6.get(1).getService(), serviceA);
+        Assert.assertEquals(blockingStates6.get(1).getEffectiveDate(), stateDateTime3);
+        Assert.assertEquals(blockingStates6.get(2).getBlockedId(), blockableId);
+        Assert.assertEquals(blockingStates6.get(2).getStateName(), state2);
+        Assert.assertEquals(blockingStates6.get(2).getService(), serviceA);
+        Assert.assertEquals(blockingStates6.get(2).getEffectiveDate(), state2DateTime);
+    }
+}