killbill-memoizeit

Details

diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestMigrationSubscriptions.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestMigrationSubscriptions.java
index 16eadea..6e9c19a 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestMigrationSubscriptions.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestMigrationSubscriptions.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -22,27 +22,39 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.joda.time.LocalDate;
+import org.killbill.billing.ObjectType;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.account.api.AccountData;
 import org.killbill.billing.api.TestApiListener.NextEvent;
 import org.killbill.billing.beatrix.util.InvoiceChecker.ExpectedInvoiceItemCheck;
 import org.killbill.billing.catalog.api.BillingPeriod;
+import org.killbill.billing.catalog.api.Currency;
 import org.killbill.billing.catalog.api.PhaseType;
+import org.killbill.billing.catalog.api.PlanPhasePriceOverride;
 import org.killbill.billing.catalog.api.PlanPhaseSpecifier;
 import org.killbill.billing.catalog.api.PriceListSet;
 import org.killbill.billing.entitlement.api.BaseEntitlementWithAddOnsSpecifier;
+import org.killbill.billing.entitlement.api.BlockingState;
+import org.killbill.billing.entitlement.api.BlockingStateType;
 import org.killbill.billing.entitlement.api.DefaultEntitlementSpecifier;
 import org.killbill.billing.entitlement.api.Entitlement;
 import org.killbill.billing.entitlement.api.Entitlement.EntitlementState;
 import org.killbill.billing.entitlement.api.EntitlementSpecifier;
 import org.killbill.billing.invoice.api.InvoiceItemType;
+import org.killbill.billing.junction.DefaultBlockingState;
+import org.killbill.billing.mock.MockAccountBuilder;
 import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.util.tag.ControlTagType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableList;
 
+import static org.testng.Assert.assertNotNull;
+
 //
 // These scenarios emulate commons migrations problems (they go on verifying proper entitlement startDate, and proper billing startDate along with invoices, ..)
 //
@@ -321,6 +333,158 @@ public class TestMigrationSubscriptions extends TestIntegrationBase {
         assertListenerStatus();
     }
 
+    // Not exactly migration tests, but verify correct behavior when using BlockingState (see https://github.com/killbill/killbill/issues/744)
+
+    @Test(groups = "slow")
+    public void testBlockingStatesV1() throws Exception {
+        final DateTime initialDate = new DateTime(2017, 3, 1, 0, 1, 35, 0, DateTimeZone.UTC);
+        clock.setDeltaFromReality(initialDate.getMillis() - clock.getUTCNow().getMillis());
+
+        final Account account = createAccountWithNonOsgiPaymentMethod(getAccountData(0));
+        assertNotNull(account);
+
+        busHandler.pushExpectedEvents(NextEvent.BLOCK);
+        final BlockingState blockingState1 = new DefaultBlockingState(account.getId(), BlockingStateType.ACCOUNT, "state1", "Service", false, false, true, null);
+        subscriptionApi.addBlockingState(blockingState1, null, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        clock.addDays(1);
+
+        final PlanPhaseSpecifier spec = new PlanPhaseSpecifier("pistol-monthly-notrial", null);
+        busHandler.pushExpectedEvents(NextEvent.CREATE, NextEvent.BLOCK);
+        entitlementApi.createBaseEntitlement(account.getId(), spec, "bundleExternalKey", ImmutableList.<PlanPhasePriceOverride>of(), null, null, false, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        clock.addMonths(1);
+
+        busHandler.pushExpectedEvents(NextEvent.BLOCK, NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        final BlockingState blockingState2 = new DefaultBlockingState(account.getId(), BlockingStateType.ACCOUNT, "state2", "Service", false, false, false, null);
+        subscriptionApi.addBlockingState(blockingState2, null, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.addMonths(1);
+        assertListenerStatus();
+    }
+
+    @Test(groups = "slow")
+    public void testBlockingStatesV2() throws Exception {
+        final DateTime initialDate = new DateTime(2017, 3, 1, 0, 1, 35, 0, DateTimeZone.UTC);
+        clock.setDeltaFromReality(initialDate.getMillis() - clock.getUTCNow().getMillis());
+
+        final Account account = createAccountWithNonOsgiPaymentMethod(getAccountData(0));
+        assertNotNull(account);
+
+        final BlockingState blockingState1 = new DefaultBlockingState(account.getId(), BlockingStateType.ACCOUNT, "state1", "Service", false, false, true, null);
+        final PlanPhaseSpecifier spec = new PlanPhaseSpecifier("pistol-monthly-notrial", null);
+
+        // Unlike the previous scenario, we create the subscription and set the blocking state at the same time
+        busHandler.pushExpectedEvents(NextEvent.BLOCK, NextEvent.CREATE, NextEvent.BLOCK);
+        subscriptionApi.addBlockingState(blockingState1, null, ImmutableList.<PluginProperty>of(), callContext);
+        entitlementApi.createBaseEntitlement(account.getId(), spec, "bundleExternalKey", ImmutableList.<PlanPhasePriceOverride>of(), null, null, false, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        clock.addMonths(1);
+
+        busHandler.pushExpectedEvents(NextEvent.BLOCK, NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        final BlockingState blockingState2 = new DefaultBlockingState(account.getId(), BlockingStateType.ACCOUNT, "state2", "Service", false, false, false, null);
+        subscriptionApi.addBlockingState(blockingState2, null, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.addMonths(1);
+        assertListenerStatus();
+    }
+
+    @Test(groups = "slow")
+    public void testBlockingStatesV3() throws Exception {
+        final DateTimeZone timeZone = DateTimeZone.forID("America/Los_Angeles");
+
+        // 2017-03-12 00:01:35 (change to DST happens at 2am on that day)
+        final DateTime initialDate = new DateTime(2017, 3, 12, 0, 1, 35, 0, timeZone);
+        clock.setDeltaFromReality(initialDate.getMillis() - clock.getUTCNow().getMillis());
+
+        // Account in PDT
+        final AccountData accountData = new MockAccountBuilder().currency(Currency.USD)
+                                                                .timeZone(timeZone)
+                                                                .build();
+
+        final Account account = createAccountWithNonOsgiPaymentMethod(accountData);
+        assertNotNull(account);
+
+        busHandler.pushExpectedEvent(NextEvent.TAG);
+        tagUserApi.addTag(account.getId(), ObjectType.ACCOUNT, ControlTagType.AUTO_INVOICING_OFF.getId(), callContext);
+        assertListenerStatus();
+
+        final PlanPhaseSpecifier spec = new PlanPhaseSpecifier("pistol-monthly-notrial", null);
+        busHandler.pushExpectedEvents(NextEvent.CREATE, NextEvent.BLOCK);
+        entitlementApi.createBaseEntitlement(account.getId(), spec, "bundleExternalKey", ImmutableList.<PlanPhasePriceOverride>of(), null, null, false, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        // Add less than a day between the CREATE and the BLOCK, to verify invoicing behavior
+        clock.setTime(initialDate.plusHours(23).plusMinutes(30));
+
+        busHandler.pushExpectedEvents(NextEvent.BLOCK);
+        final BlockingState blockingState1 = new DefaultBlockingState(account.getId(), BlockingStateType.ACCOUNT, "state1", "Service", false, false, true, null);
+        subscriptionApi.addBlockingState(blockingState1, null, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        busHandler.pushExpectedEvents(NextEvent.TAG, NextEvent.NULL_INVOICE);
+        tagUserApi.removeTag(account.getId(), ObjectType.ACCOUNT, ControlTagType.AUTO_INVOICING_OFF.getId(), callContext);
+        assertListenerStatus();
+
+        clock.addMonths(1);
+
+        busHandler.pushExpectedEvents(NextEvent.BLOCK, NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        final BlockingState blockingState2 = new DefaultBlockingState(account.getId(), BlockingStateType.ACCOUNT, "state2", "Service", false, false, false, null);
+        subscriptionApi.addBlockingState(blockingState2, null, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.addMonths(1);
+        assertListenerStatus();
+    }
+
+    @Test(groups = "slow")
+    public void testBlockingStatesV4() throws Exception {
+        final DateTime initialDate = new DateTime(2017, 3, 1, 0, 1, 35, 0, DateTimeZone.UTC);
+        clock.setDeltaFromReality(initialDate.getMillis() - clock.getUTCNow().getMillis());
+
+        final Account account = createAccountWithNonOsgiPaymentMethod(getAccountData(0));
+        assertNotNull(account);
+
+        busHandler.pushExpectedEvents(NextEvent.BLOCK);
+        final BlockingState blockingState1 = new DefaultBlockingState(account.getId(), BlockingStateType.ACCOUNT, "state1", "Service", false, false, true, null);
+        subscriptionApi.addBlockingState(blockingState1, null, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        clock.addDays(1);
+
+        final PlanPhaseSpecifier spec = new PlanPhaseSpecifier("pistol-monthly-notrial", null);
+        busHandler.pushExpectedEvents(NextEvent.CREATE, NextEvent.BLOCK);
+        final Entitlement baseEntitlement = entitlementApi.createBaseEntitlement(account.getId(), spec, "bundleExternalKey", ImmutableList.<PlanPhasePriceOverride>of(), null, null, false, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        clock.addDays(1);
+
+        // Add an add-on while bundle is already blocked
+        final PlanPhaseSpecifier spec2 = new PlanPhaseSpecifier("cleaning-monthly", null);
+        busHandler.pushExpectedEvents(NextEvent.CREATE, NextEvent.BLOCK);
+        entitlementApi.addEntitlement(baseEntitlement.getBundleId(), spec2, ImmutableList.<PlanPhasePriceOverride>of(), null, null, false, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        clock.addMonths(1);
+
+        busHandler.pushExpectedEvents(NextEvent.BLOCK, NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        final BlockingState blockingState2 = new DefaultBlockingState(account.getId(), BlockingStateType.ACCOUNT, "state2", "Service", false, false, false, null);
+        subscriptionApi.addBlockingState(blockingState2, null, ImmutableList.<PluginProperty>of(), callContext);
+        assertListenerStatus();
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.addMonths(1);
+        assertListenerStatus();
+    }
+
     private BaseEntitlementWithAddOnsSpecifier buildBaseEntitlementWithAddOnsSpecifier(final LocalDate entitlementMigrationDate, final LocalDate billingMigrationDate, final String externalKey, final List<EntitlementSpecifier> specifierList) {
         return new BaseEntitlementWithAddOnsSpecifier() {
                 @Override
@@ -349,5 +513,4 @@ public class TestMigrationSubscriptions extends TestIntegrationBase {
                 }
             };
     }
-
 }
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/api/BlockingStateOrdering.java b/entitlement/src/main/java/org/killbill/billing/entitlement/api/BlockingStateOrdering.java
index 0c1c5b7..6648e03 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/api/BlockingStateOrdering.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/api/BlockingStateOrdering.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -19,6 +19,7 @@ package org.killbill.billing.entitlement.api;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -42,6 +43,7 @@ import org.killbill.billing.entitlement.block.BlockingChecker.BlockingAggregator
 import org.killbill.billing.entitlement.block.DefaultBlockingChecker.DefaultBlockingAggregator;
 import org.killbill.billing.junction.DefaultBlockingState;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -53,13 +55,13 @@ import com.google.common.collect.Sets;
 // Given an event stream (across one or multiple entitlements), insert the blocking events at the right place
 public class BlockingStateOrdering extends EntitlementOrderingBase {
 
-    private static final BlockingStateOrdering INSTANCE = new BlockingStateOrdering();
+    @VisibleForTesting
+    static final BlockingStateOrdering INSTANCE = new BlockingStateOrdering();
 
     private BlockingStateOrdering() {}
 
     public static void insertSorted(final Iterable<Entitlement> entitlements, final InternalTenantContext internalTenantContext, final LinkedList<SubscriptionEvent> inputAndOutputResult) {
         INSTANCE.computeEvents(entitlements, internalTenantContext, inputAndOutputResult);
-
     }
 
     private void computeEvents(final Iterable<Entitlement> entitlements, final InternalTenantContext internalTenantContext, final LinkedList<SubscriptionEvent> inputAndOutputResult) {
@@ -71,6 +73,14 @@ public class BlockingStateOrdering extends EntitlementOrderingBase {
             blockingStates.addAll(((DefaultEntitlement) entitlement).getEventsStream().getBlockingStates());
         }
 
+        computeEvents(new LinkedList<UUID>(allEntitlementUUIDs), blockingStates, internalTenantContext, inputAndOutputResult);
+    }
+
+    @VisibleForTesting
+    void computeEvents(final LinkedList<UUID> allEntitlementUUIDs, final Collection<BlockingState> blockingStates, final InternalTenantContext internalTenantContext, final LinkedList<SubscriptionEvent> inputAndOutputResult) {
+        // Make sure the ordering is stable
+        Collections.sort(allEntitlementUUIDs);
+
         final SupportForOlderVersionThan_0_17_X backwardCompatibleContext = new SupportForOlderVersionThan_0_17_X(inputAndOutputResult, blockingStates);
 
         // Trust the incoming ordering here: blocking states were sorted using ProxyBlockingStateDao#sortedCopy
@@ -107,12 +117,7 @@ public class BlockingStateOrdering extends EntitlementOrderingBase {
                     shouldContinue = false;
                     break;
                 case 0:
-                    // In case of exact same date, we want to make sure that a START_ENTITLEMENT event gets correctly populated when the STOP_BILLING is also on the same date
-                    if (currentBlockingState.getStateName().equals(DefaultEntitlementApi.ENT_STATE_START) && cur.getSubscriptionEventType() != SubscriptionEventType.STOP_BILLING) {
-                        shouldContinue = false;
-                    } else {
-                        shouldContinue = true;
-                    }
+                    shouldContinue = compareBlockingStateWithNextSubscriptionEvent(currentBlockingState, cur) > 0;
                     break;
                 case 1:
                     shouldContinue = true;
@@ -169,9 +174,75 @@ public class BlockingStateOrdering extends EntitlementOrderingBase {
                 outputNewEvents.add(toSubscriptionEvent(prevNext[0], prevNext[1], targetEntitlementId, currentBlockingState, t, internalTenantContext));
             }
         }
+
         return index;
     }
 
+    private int compareBlockingStateWithNextSubscriptionEvent(final BlockingState blockingState, final SubscriptionEvent next) {
+        final String serviceName = blockingState.getService();
+
+        // For consistency, make sure entitlement-service and billing-service events always happen in a
+        // deterministic order (e.g. after other services for STOP events and before for START events)
+        if ((DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME.equals(serviceName) ||
+             BILLING_SERVICE_NAME.equals(serviceName) ||
+             ENT_BILLING_SERVICE_NAME.equals(serviceName)) &&
+            !(DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME.equals(next.getServiceName()) ||
+              BILLING_SERVICE_NAME.equals(next.getServiceName()) ||
+              ENT_BILLING_SERVICE_NAME.equals(next.getServiceName()))) {
+            // first is an entitlement-service or billing-service event, but not second
+            if (blockingState.isBlockBilling() || blockingState.isBlockEntitlement()) {
+                // PAUSE_ and STOP_ events go last
+                return 1;
+            } else {
+                return -1;
+            }
+        } else if ((DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME.equals(next.getServiceName()) ||
+                    BILLING_SERVICE_NAME.equals(next.getServiceName()) ||
+                    ENT_BILLING_SERVICE_NAME.equals(next.getServiceName())) &&
+                   !(DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME.equals(serviceName) ||
+                     BILLING_SERVICE_NAME.equals(serviceName) ||
+                     ENT_BILLING_SERVICE_NAME.equals(serviceName))) {
+            // second is an entitlement-service or billing-service event, but not first
+            if (next.getSubscriptionEventType().equals(SubscriptionEventType.START_ENTITLEMENT) ||
+                next.getSubscriptionEventType().equals(SubscriptionEventType.START_BILLING) ||
+                next.getSubscriptionEventType().equals(SubscriptionEventType.RESUME_ENTITLEMENT) ||
+                next.getSubscriptionEventType().equals(SubscriptionEventType.RESUME_BILLING) ||
+                next.getSubscriptionEventType().equals(SubscriptionEventType.PHASE) ||
+                next.getSubscriptionEventType().equals(SubscriptionEventType.CHANGE)) {
+                return 1;
+            } else if (next.getSubscriptionEventType().equals(SubscriptionEventType.PAUSE_ENTITLEMENT) ||
+                       next.getSubscriptionEventType().equals(SubscriptionEventType.PAUSE_BILLING) ||
+                       next.getSubscriptionEventType().equals(SubscriptionEventType.STOP_ENTITLEMENT) ||
+                       next.getSubscriptionEventType().equals(SubscriptionEventType.STOP_BILLING)) {
+                return -1;
+            } else {
+                // Default behavior
+                return 1;
+            }
+        } else if (isStartEntitlement(blockingState)) {
+            // START_ENTITLEMENT is always first
+            return -1;
+        } else if (next.getSubscriptionEventType().equals(SubscriptionEventType.START_ENTITLEMENT)) {
+            // START_ENTITLEMENT is always first
+            return 1;
+        } else if (next.getSubscriptionEventType().equals(SubscriptionEventType.STOP_BILLING)) {
+            // STOP_BILLING is always last
+            return -1;
+        } else if (next.getSubscriptionEventType().equals(SubscriptionEventType.START_BILLING)) {
+            // START_BILLING is first after START_ENTITLEMENT
+            return 1;
+        } else if (isStopEntitlement(blockingState)) {
+            // STOP_ENTITLEMENT is last after STOP_BILLING
+            return 1;
+        } else if (next.getSubscriptionEventType().equals(SubscriptionEventType.STOP_ENTITLEMENT)) {
+            // STOP_ENTITLEMENT is last after STOP_BILLING
+            return -1;
+        } else {
+            // Trust the current ordering
+            return 1;
+        }
+    }
+
     // Extract prev and next events in the stream events for that particular target subscription from the insertionEvent
     private SubscriptionEvent[] findPrevNext(final List<SubscriptionEvent> events, final UUID targetEntitlementId, final SubscriptionEvent insertionEvent) {
         // Find prev/next event for the same entitlement
@@ -390,11 +461,11 @@ public class BlockingStateOrdering extends EntitlementOrderingBase {
                                                                               bs.getEffectiveDate());
 
             final List<SubscriptionEventType> result = new ArrayList<SubscriptionEventType>(4);
-            if (fixedBlockingState.getStateName().equals(DefaultEntitlementApi.ENT_STATE_START)) {
+            if (isStartEntitlement(fixedBlockingState)) {
                 isEntitlementStarted = true;
                 result.add(SubscriptionEventType.START_ENTITLEMENT);
                 return result;
-            } else if (fixedBlockingState.getStateName().equals(DefaultEntitlementApi.ENT_STATE_CANCELLED)) {
+            } else if (isStopEntitlement(fixedBlockingState)) {
                 isEntitlementStopped = true;
                 result.add(SubscriptionEventType.STOP_ENTITLEMENT);
                 return result;
@@ -450,6 +521,16 @@ public class BlockingStateOrdering extends EntitlementOrderingBase {
         }
     }
 
+    private static boolean isStartEntitlement(final BlockingState blockingState) {
+        return DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME.equals(blockingState.getService()) &&
+               DefaultEntitlementApi.ENT_STATE_START.equals(blockingState.getStateName());
+    }
+
+    private static boolean isStopEntitlement(final BlockingState blockingState) {
+        return DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME.equals(blockingState.getService()) &&
+               DefaultEntitlementApi.ENT_STATE_CANCELLED.equals(blockingState.getStateName());
+    }
+
     //
     // The logic to add the missing START_ENTITLEMENT for older subscriptions is contained in this class. When we want/need to drop backward compatibility we can
     // simply drop this class and where it is called.
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/api/SubscriptionEventOrdering.java b/entitlement/src/main/java/org/killbill/billing/entitlement/api/SubscriptionEventOrdering.java
index 9283363..96b3d26 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/api/SubscriptionEventOrdering.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/api/SubscriptionEventOrdering.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -17,7 +17,6 @@
 
 package org.killbill.billing.entitlement.api;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -25,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.killbill.billing.callcontext.InternalTenantContext;
-import org.killbill.billing.entitlement.DefaultEntitlementService;
 import org.killbill.billing.subscription.api.SubscriptionBase;
 import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
 import org.killbill.billing.subscription.api.user.SubscriptionBaseTransition;
@@ -42,8 +40,7 @@ import com.google.common.collect.ImmutableList;
 //
 public class SubscriptionEventOrdering extends EntitlementOrderingBase {
 
-    @VisibleForTesting
-    static final SubscriptionEventOrdering INSTANCE = new SubscriptionEventOrdering();
+    private static final SubscriptionEventOrdering INSTANCE = new SubscriptionEventOrdering();
 
     private SubscriptionEventOrdering() {}
 
@@ -63,7 +60,6 @@ public class SubscriptionEventOrdering extends EntitlementOrderingBase {
         BlockingStateOrdering.insertSorted(entitlements, internalTenantContext, result);
 
         // Final cleanups
-        reOrderSubscriptionEventsOnSameDateByType(result);
         removeOverlappingSubscriptionEvents(result);
 
         return result;
@@ -146,7 +142,8 @@ public class SubscriptionEventOrdering extends EntitlementOrderingBase {
         result.add(index, event);
     }
 
-    private SubscriptionEvent toSubscriptionEvent(final SubscriptionBaseTransition in, final SubscriptionEventType eventType, final InternalTenantContext internalTenantContext) {
+    @VisibleForTesting
+    static SubscriptionEvent toSubscriptionEvent(final SubscriptionBaseTransition in, final SubscriptionEventType eventType, final InternalTenantContext internalTenantContext) {
         return new DefaultSubscriptionEvent(in.getId(),
                                             in.getSubscriptionId(),
                                             in.getEffectiveTransitionTime(),
@@ -169,47 +166,6 @@ public class SubscriptionEventOrdering extends EntitlementOrderingBase {
                                             internalTenantContext);
     }
 
-    //
-    // All events have been inserted and should be at the right place, except that we want to ensure that events for a given subscription,
-    // and for a given time are ordered by SubscriptionEventType.
-    //
-    // All this seems a little over complicated, and one wonders why we don't just shove all events and call Collections.sort on the list prior
-    // to return:
-    // - One explanation is that we don't know the events in advance and each time the new events to be inserted are computed from the current state
-    //   of the stream, which requires ordering all along
-    // - A careful reader will notice that the algorithm is N^2, -- so that we care so much considering we have very events -- but in addition to that
-    //   the recursive path will be used very infrequently and when it is used, this will be probably just reorder with the prev event and that's it.
-    //
-    @VisibleForTesting
-    protected void reOrderSubscriptionEventsOnSameDateByType(final List<SubscriptionEvent> events) {
-        final int size = events.size();
-        for (int i = 0; i < size; i++) {
-            final SubscriptionEvent cur = events.get(i);
-            final SubscriptionEvent next = (i < (size - 1)) ? events.get(i + 1) : null;
-
-            final boolean shouldSwap = (next != null && shouldSwap(cur, next, true));
-            final boolean shouldReverseSort = (next == null || shouldSwap);
-
-            int currentIndex = i;
-            if (shouldSwap) {
-                Collections.swap(events, i, i + 1);
-            }
-            if (shouldReverseSort) {
-                while (currentIndex >= 1) {
-                    final SubscriptionEvent revCur = events.get(currentIndex);
-                    final SubscriptionEvent other = events.get(currentIndex - 1);
-                    if (shouldSwap(revCur, other, false)) {
-                        Collections.swap(events, currentIndex, currentIndex - 1);
-                    }
-                    if (revCur.getEffectiveDate().compareTo(other.getEffectiveDate()) != 0) {
-                        break;
-                    }
-                    currentIndex--;
-                }
-            }
-        }
-    }
-
     // Make sure the argument supports the remove operation - hence expect a LinkedList, not a List
     private void removeOverlappingSubscriptionEvents(final LinkedList<SubscriptionEvent> events) {
         final Iterator<SubscriptionEvent> iterator = events.iterator();
@@ -228,92 +184,4 @@ public class SubscriptionEventOrdering extends EntitlementOrderingBase {
             }
         }
     }
-
-    private boolean shouldSwap(final SubscriptionEvent cur, final SubscriptionEvent other, final boolean isAscending) {
-        // For a given date, order by subscriptionId, and within subscription by event type
-        final int idComp = cur.getEntitlementId().compareTo(other.getEntitlementId());
-        final Integer comparison = compareSubscriptionEventsForSameEffectiveDateAndEntitlementId(cur, other);
-        return (cur.getEffectiveDate().compareTo(other.getEffectiveDate()) == 0 &&
-                ((isAscending &&
-                  ((idComp > 0) ||
-                   (idComp == 0 && comparison != null && comparison > 0))) ||
-                 (!isAscending &&
-                  ((idComp < 0) ||
-                   (idComp == 0 && comparison != null && comparison < 0)))));
-    }
-
-    private Integer compareSubscriptionEventsForSameEffectiveDateAndEntitlementId(final SubscriptionEvent first, final SubscriptionEvent second) {
-        // For consistency, make sure entitlement-service and billing-service events always happen in a
-        // deterministic order (e.g. after other services for STOP events and before for START events)
-        if ((DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME.equals(first.getServiceName()) ||
-             BILLING_SERVICE_NAME.equals(first.getServiceName())) &&
-            !(DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME.equals(second.getServiceName()) ||
-              BILLING_SERVICE_NAME.equals(second.getServiceName()))) {
-            // first is an entitlement-service or billing-service event, but not second
-            if (first.getSubscriptionEventType().equals(SubscriptionEventType.START_ENTITLEMENT) ||
-                first.getSubscriptionEventType().equals(SubscriptionEventType.START_BILLING) ||
-                first.getSubscriptionEventType().equals(SubscriptionEventType.RESUME_ENTITLEMENT) ||
-                first.getSubscriptionEventType().equals(SubscriptionEventType.RESUME_BILLING) ||
-                first.getSubscriptionEventType().equals(SubscriptionEventType.PHASE) ||
-                first.getSubscriptionEventType().equals(SubscriptionEventType.CHANGE)) {
-                return -1;
-            } else if (first.getSubscriptionEventType().equals(SubscriptionEventType.PAUSE_ENTITLEMENT) ||
-                       first.getSubscriptionEventType().equals(SubscriptionEventType.PAUSE_BILLING) ||
-                       first.getSubscriptionEventType().equals(SubscriptionEventType.STOP_ENTITLEMENT) ||
-                       first.getSubscriptionEventType().equals(SubscriptionEventType.STOP_BILLING)) {
-                return 1;
-            } else {
-                // Default behavior
-                return -1;
-            }
-        } else if ((DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME.equals(second.getServiceName()) ||
-                    BILLING_SERVICE_NAME.equals(second.getServiceName())) &&
-                   !(DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME.equals(first.getServiceName()) ||
-                     BILLING_SERVICE_NAME.equals(first.getServiceName()))) {
-            // second is an entitlement-service or billing-service event, but not first
-            if (second.getSubscriptionEventType().equals(SubscriptionEventType.START_ENTITLEMENT) ||
-                second.getSubscriptionEventType().equals(SubscriptionEventType.START_BILLING) ||
-                second.getSubscriptionEventType().equals(SubscriptionEventType.RESUME_ENTITLEMENT) ||
-                second.getSubscriptionEventType().equals(SubscriptionEventType.RESUME_BILLING) ||
-                second.getSubscriptionEventType().equals(SubscriptionEventType.PHASE) ||
-                second.getSubscriptionEventType().equals(SubscriptionEventType.CHANGE)) {
-                return 1;
-            } else if (second.getSubscriptionEventType().equals(SubscriptionEventType.PAUSE_ENTITLEMENT) ||
-                       second.getSubscriptionEventType().equals(SubscriptionEventType.PAUSE_BILLING) ||
-                       second.getSubscriptionEventType().equals(SubscriptionEventType.STOP_ENTITLEMENT) ||
-                       second.getSubscriptionEventType().equals(SubscriptionEventType.STOP_BILLING)) {
-                return -1;
-            } else {
-                // Default behavior
-                return 1;
-            }
-        } else if (first.getSubscriptionEventType().equals(SubscriptionEventType.START_ENTITLEMENT)) {
-            // START_ENTITLEMENT is always first
-            return -1;
-        } else if (second.getSubscriptionEventType().equals(SubscriptionEventType.START_ENTITLEMENT)) {
-            // START_ENTITLEMENT is always first
-            return 1;
-        } else if (first.getSubscriptionEventType().equals(SubscriptionEventType.STOP_BILLING)) {
-            // STOP_BILLING is always last
-            return 1;
-        } else if (second.getSubscriptionEventType().equals(SubscriptionEventType.STOP_BILLING)) {
-            // STOP_BILLING is always last
-            return -1;
-        } else if (first.getSubscriptionEventType().equals(SubscriptionEventType.START_BILLING)) {
-            // START_BILLING is first after START_ENTITLEMENT
-            return -1;
-        } else if (second.getSubscriptionEventType().equals(SubscriptionEventType.START_BILLING)) {
-            // START_BILLING is first after START_ENTITLEMENT
-            return 1;
-        } else if (first.getSubscriptionEventType().equals(SubscriptionEventType.STOP_ENTITLEMENT)) {
-            // STOP_ENTITLEMENT is last after STOP_BILLING
-            return 1;
-        } else if (second.getSubscriptionEventType().equals(SubscriptionEventType.STOP_ENTITLEMENT)) {
-            // STOP_ENTITLEMENT is last after STOP_BILLING
-            return -1;
-        } else {
-            // Trust the current ordering
-            return null;
-        }
-    }
 }
diff --git a/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestBlockingStateOrdering.java b/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestBlockingStateOrdering.java
new file mode 100644
index 0000000..02a7581
--- /dev/null
+++ b/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestBlockingStateOrdering.java
@@ -0,0 +1,378 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.entitlement.api;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.entitlement.DefaultEntitlementService;
+import org.killbill.billing.entitlement.EntitlementTestSuiteNoDB;
+import org.killbill.billing.junction.DefaultBlockingState;
+import org.killbill.billing.subscription.api.user.SubscriptionBaseTransition;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+// invocationCount > 1 to verify flakiness
+public class TestBlockingStateOrdering extends EntitlementTestSuiteNoDB {
+
+    private long globalOrdering = 0;
+
+    @Test(groups = "fast", invocationCount = 10)
+    public void testIgnore_ENTITLEMENT_SERVICE_NAME_WithNoFlag() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final UUID subscriptionId1 = UUID.randomUUID();
+
+        final Collection<BlockingState> blockingStates = new LinkedList<BlockingState>();
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_START, DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME, false, false, now));
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, "stuff", DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME, false, false, now.plusDays(1)));
+
+        final LinkedList<SubscriptionEvent> allEvents = new LinkedList<SubscriptionEvent>();
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.START_BILLING, now));
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.PHASE, now.plusDays(30)));
+
+        computeEvents(allEvents, blockingStates);
+
+        Assert.assertEquals(allEvents.size(), 3);
+        Assert.assertEquals(allEvents.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
+        Assert.assertEquals(allEvents.get(2).getSubscriptionEventType(), SubscriptionEventType.PHASE);
+    }
+
+    @Test(groups = "fast", invocationCount = 10)
+    public void test_ENT_STATE_IsNotInterpreted() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final UUID subscriptionId1 = UUID.randomUUID();
+
+        final Collection<BlockingState> blockingStates = new LinkedList<BlockingState>();
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_START, DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME, false, false, now));
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_START, "svc1", false, false, now.plusDays(1)));
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_CANCELLED, "svc1", false, false, now.plusDays(2)));
+
+        final LinkedList<SubscriptionEvent> allEvents = new LinkedList<SubscriptionEvent>();
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.START_BILLING, now));
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.PHASE, now.plusDays(30)));
+
+        computeEvents(allEvents, blockingStates);
+
+        Assert.assertEquals(allEvents.size(), 5);
+        Assert.assertEquals(allEvents.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
+        Assert.assertEquals(allEvents.get(2).getSubscriptionEventType(), SubscriptionEventType.SERVICE_STATE_CHANGE);
+        Assert.assertEquals(allEvents.get(3).getSubscriptionEventType(), SubscriptionEventType.SERVICE_STATE_CHANGE);
+        Assert.assertEquals(allEvents.get(4).getSubscriptionEventType(), SubscriptionEventType.PHASE);
+    }
+
+    @Test(groups = "fast", invocationCount = 10)
+    public void testPauseAtStart() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final UUID subscriptionId1 = UUID.randomUUID();
+
+        final Collection<BlockingState> blockingStates = new LinkedList<BlockingState>();
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_START, DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME, false, false, now));
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, "stuff", "svc1", true, true, now));
+
+        final LinkedList<SubscriptionEvent> allEvents = new LinkedList<SubscriptionEvent>();
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.START_BILLING, now));
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.PHASE, now.plusDays(30)));
+
+        computeEvents(allEvents, blockingStates);
+
+        Assert.assertEquals(allEvents.size(), 5);
+        Assert.assertEquals(allEvents.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
+        Assert.assertEquals(allEvents.get(2).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(3).getSubscriptionEventType(), SubscriptionEventType.PAUSE_BILLING);
+        Assert.assertEquals(allEvents.get(4).getSubscriptionEventType(), SubscriptionEventType.PHASE);
+    }
+
+    @Test(groups = "fast", invocationCount = 10)
+    public void testPausePostPhase_0_17_X() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final UUID subscriptionId1 = UUID.randomUUID();
+
+        final Collection<BlockingState> blockingStates = new LinkedList<BlockingState>();
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, "stuff", "svc1", true, true, now.plusDays(40)));
+
+        final LinkedList<SubscriptionEvent> allEvents = new LinkedList<SubscriptionEvent>();
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.START_BILLING, now));
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.PHASE, now.plusDays(30)));
+
+        computeEvents(allEvents, blockingStates);
+
+        Assert.assertEquals(allEvents.size(), 5);
+        Assert.assertEquals(allEvents.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
+        Assert.assertEquals(allEvents.get(2).getSubscriptionEventType(), SubscriptionEventType.PHASE);
+        Assert.assertEquals(allEvents.get(3).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(4).getSubscriptionEventType(), SubscriptionEventType.PAUSE_BILLING);
+    }
+
+    @Test(groups = "fast", invocationCount = 10)
+    public void testPausePostPhase() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final UUID subscriptionId1 = UUID.randomUUID();
+
+        final Collection<BlockingState> blockingStates = new LinkedList<BlockingState>();
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_START, DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME, false, false, now));
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, "stuff", "svc1", true, true, now.plusDays(40)));
+
+        final LinkedList<SubscriptionEvent> allEvents = new LinkedList<SubscriptionEvent>();
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.START_BILLING, now));
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.PHASE, now.plusDays(30)));
+
+        computeEvents(allEvents, blockingStates);
+
+        Assert.assertEquals(allEvents.size(), 5);
+        Assert.assertEquals(allEvents.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
+        Assert.assertEquals(allEvents.get(2).getSubscriptionEventType(), SubscriptionEventType.PHASE);
+        Assert.assertEquals(allEvents.get(3).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(4).getSubscriptionEventType(), SubscriptionEventType.PAUSE_BILLING);
+    }
+
+    @Test(groups = "fast", invocationCount = 10)
+    public void testPauseAtPhase() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final UUID subscriptionId1 = UUID.randomUUID();
+
+        final Collection<BlockingState> blockingStates = new LinkedList<BlockingState>();
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_START, DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME, false, false, now));
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, "stuff", "svc1", true, true, now.plusDays(30)));
+
+        final LinkedList<SubscriptionEvent> allEvents = new LinkedList<SubscriptionEvent>();
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.START_BILLING, now));
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.PHASE, now.plusDays(30)));
+
+        computeEvents(allEvents, blockingStates);
+
+        Assert.assertEquals(allEvents.size(), 5);
+        Assert.assertEquals(allEvents.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
+        Assert.assertEquals(allEvents.get(2).getSubscriptionEventType(), SubscriptionEventType.PHASE);
+        Assert.assertEquals(allEvents.get(3).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(4).getSubscriptionEventType(), SubscriptionEventType.PAUSE_BILLING);
+    }
+
+    @Test(groups = "fast", invocationCount = 10)
+    public void testPauseResumeAtPhase() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final UUID subscriptionId1 = UUID.randomUUID();
+
+        final Collection<BlockingState> blockingStates = new LinkedList<BlockingState>();
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_START, DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME, false, false, now));
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, "stuff", "svc1", true, true, now.plusDays(30)));
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, "stuff", "svc1", false, false, now.plusDays(30)));
+
+        final LinkedList<SubscriptionEvent> allEvents = new LinkedList<SubscriptionEvent>();
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.START_BILLING, now));
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.PHASE, now.plusDays(30)));
+
+        computeEvents(allEvents, blockingStates);
+
+        Assert.assertEquals(allEvents.size(), 7);
+        Assert.assertEquals(allEvents.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
+        Assert.assertEquals(allEvents.get(2).getSubscriptionEventType(), SubscriptionEventType.PHASE);
+        Assert.assertEquals(allEvents.get(3).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(4).getSubscriptionEventType(), SubscriptionEventType.PAUSE_BILLING);
+        Assert.assertEquals(allEvents.get(5).getSubscriptionEventType(), SubscriptionEventType.RESUME_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(6).getSubscriptionEventType(), SubscriptionEventType.RESUME_BILLING);
+    }
+
+    @Test(groups = "fast", invocationCount = 10)
+    public void testPauseAccountAtPhase() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final UUID subscriptionId1 = UUID.randomUUID();
+
+        final Collection<BlockingState> blockingStates = new LinkedList<BlockingState>();
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_START, DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME, false, false, now));
+        blockingStates.add(createBlockingState(UUID.randomUUID(), BlockingStateType.ACCOUNT, "stuff", "svc1", true, true, now.plusDays(30)));
+
+        final LinkedList<SubscriptionEvent> allEvents = new LinkedList<SubscriptionEvent>();
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.START_BILLING, now));
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.PHASE, now.plusDays(30)));
+
+        computeEvents(allEvents, blockingStates);
+
+        Assert.assertEquals(allEvents.size(), 5);
+        Assert.assertEquals(allEvents.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
+        Assert.assertEquals(allEvents.get(2).getSubscriptionEventType(), SubscriptionEventType.PHASE);
+        Assert.assertEquals(allEvents.get(3).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(4).getSubscriptionEventType(), SubscriptionEventType.PAUSE_BILLING);
+    }
+
+    @Test(groups = "fast", invocationCount = 10)
+    public void testDifferentTypesOfBlockingSameService() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final UUID subscriptionId1 = UUID.randomUUID();
+
+        final Collection<BlockingState> blockingStates = new LinkedList<BlockingState>();
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_START, DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME, false, false, now));
+        blockingStates.add(createBlockingState(UUID.randomUUID(), BlockingStateType.ACCOUNT, "stuff", "svc1", false, true, now.plusDays(10)));
+        // Same service
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, "stuff", "svc1", true, false, now.plusDays(15)));
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, "stuff", "svc1", false, false, now.plusDays(20)));
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.ACCOUNT, "stuff", "svc1", false, false, now.plusDays(30)));
+
+        final LinkedList<SubscriptionEvent> allEvents = new LinkedList<SubscriptionEvent>();
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.START_BILLING, now));
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.PHASE, now.plusDays(30)));
+
+        computeEvents(allEvents, blockingStates);
+
+        Assert.assertEquals(allEvents.size(), 8);
+        Assert.assertEquals(allEvents.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
+        Assert.assertEquals(allEvents.get(2).getSubscriptionEventType(), SubscriptionEventType.PAUSE_BILLING);
+        Assert.assertEquals(allEvents.get(3).getSubscriptionEventType(), SubscriptionEventType.RESUME_BILLING);
+        Assert.assertEquals(allEvents.get(4).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(5).getSubscriptionEventType(), SubscriptionEventType.RESUME_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(6).getSubscriptionEventType(), SubscriptionEventType.PHASE);
+        Assert.assertEquals(allEvents.get(7).getSubscriptionEventType(), SubscriptionEventType.SERVICE_STATE_CHANGE);
+    }
+
+    @Test(groups = "fast", invocationCount = 10)
+    public void testDifferentTypesOfBlockingDifferentServices() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final UUID subscriptionId1 = UUID.randomUUID();
+
+        final Collection<BlockingState> blockingStates = new LinkedList<BlockingState>();
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_START, DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME, false, false, now));
+        blockingStates.add(createBlockingState(UUID.randomUUID(), BlockingStateType.ACCOUNT, "stuff", "svc1", false, true, now.plusDays(10)));
+        // Different service
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, "stuff", "svc2", true, false, now.plusDays(15)));
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, "stuff", "svc2", false, false, now.plusDays(20)));
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.ACCOUNT, "stuff", "svc1", false, false, now.plusDays(30)));
+
+        final LinkedList<SubscriptionEvent> allEvents = new LinkedList<SubscriptionEvent>();
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.START_BILLING, now));
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.PHASE, now.plusDays(30)));
+
+        computeEvents(allEvents, blockingStates);
+
+        Assert.assertEquals(allEvents.size(), 7);
+        Assert.assertEquals(allEvents.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
+        Assert.assertEquals(allEvents.get(2).getSubscriptionEventType(), SubscriptionEventType.PAUSE_BILLING);
+        Assert.assertEquals(allEvents.get(3).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(4).getSubscriptionEventType(), SubscriptionEventType.RESUME_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(5).getSubscriptionEventType(), SubscriptionEventType.PHASE);
+        Assert.assertEquals(allEvents.get(6).getSubscriptionEventType(), SubscriptionEventType.RESUME_BILLING);
+    }
+
+    @Test(groups = "fast", invocationCount = 10)
+    public void testPauseAccountAtPhaseAndPauseOtherSubscriptionFutureStartedV1() throws Exception {
+        final UUID subscriptionId1 = UUID.randomUUID();
+        UUID subscriptionId2 = UUID.randomUUID();
+        while (subscriptionId2.compareTo(subscriptionId1) <= 0) {
+            subscriptionId2 = UUID.randomUUID();
+        }
+        testPauseAccountAtPhaseAndPauseOtherSubscriptionFutureStarted(subscriptionId1, subscriptionId2);
+    }
+
+    @Test(groups = "fast", invocationCount = 10)
+    public void testPauseAccountAtPhaseAndPauseOtherSubscriptionFutureStartedV2() throws Exception {
+        final UUID subscriptionId1 = UUID.randomUUID();
+        UUID subscriptionId2 = UUID.randomUUID();
+        while (subscriptionId2.compareTo(subscriptionId1) >= 0) {
+            subscriptionId2 = UUID.randomUUID();
+        }
+        testPauseAccountAtPhaseAndPauseOtherSubscriptionFutureStarted(subscriptionId1, subscriptionId2);
+    }
+
+    private void testPauseAccountAtPhaseAndPauseOtherSubscriptionFutureStarted(final UUID subscriptionId1, final UUID subscriptionId2) throws Exception {
+        final DateTime now = clock.getUTCNow();
+
+        final Collection<BlockingState> blockingStates = new LinkedList<BlockingState>();
+        blockingStates.add(createBlockingState(subscriptionId1, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_START, DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME, false, false, now));
+        blockingStates.add(createBlockingState(subscriptionId2, BlockingStateType.SUBSCRIPTION, DefaultEntitlementApi.ENT_STATE_START, DefaultEntitlementService.ENTITLEMENT_SERVICE_NAME, false, false, now));
+        blockingStates.add(createBlockingState(UUID.randomUUID(), BlockingStateType.ACCOUNT, "stuff", "svc1", true, true, now.plusDays(30)));
+
+        final LinkedList<SubscriptionEvent> allEvents = new LinkedList<SubscriptionEvent>();
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.START_BILLING, now));
+        allEvents.add(createEvent(subscriptionId1, SubscriptionEventType.PHASE, now.plusDays(30)));
+        allEvents.add(createEvent(subscriptionId2, SubscriptionEventType.START_BILLING, now.plusDays(40)));
+
+        computeEvents(allEvents, blockingStates);
+
+        Assert.assertEquals(allEvents.size(), 8);
+        Assert.assertEquals(allEvents.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(1).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
+        Assert.assertEquals(allEvents.get(2).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
+        Assert.assertEquals(allEvents.get(3).getSubscriptionEventType(), SubscriptionEventType.PHASE);
+        if (subscriptionId1.compareTo(subscriptionId2) >= 0) {
+            Assert.assertEquals(allEvents.get(4).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
+            Assert.assertEquals(allEvents.get(5).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
+            Assert.assertEquals(allEvents.get(6).getSubscriptionEventType(), SubscriptionEventType.PAUSE_BILLING);
+        } else {
+            Assert.assertEquals(allEvents.get(4).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
+            Assert.assertEquals(allEvents.get(5).getSubscriptionEventType(), SubscriptionEventType.PAUSE_BILLING);
+            Assert.assertEquals(allEvents.get(6).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
+        }
+        Assert.assertEquals(allEvents.get(7).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
+    }
+
+    private BlockingState createBlockingState(final UUID blockedId,
+                                              final BlockingStateType blockingStateType,
+                                              final String stateName,
+                                              final String service,
+                                              final boolean blockEntitlement,
+                                              final boolean blockBilling,
+                                              final DateTime effectiveDate) {
+        return new DefaultBlockingState(UUID.randomUUID(),
+                                        blockedId,
+                                        blockingStateType,
+                                        stateName,
+                                        service,
+                                        false,
+                                        blockEntitlement,
+                                        blockBilling,
+                                        effectiveDate,
+                                        effectiveDate,
+                                        effectiveDate,
+                                        globalOrdering++);
+    }
+
+    // Re-use SubscriptionEventOrdering method, as it's the input of BlockingStateOrdering
+    private SubscriptionEvent createEvent(final UUID subscriptionId, final SubscriptionEventType type, final DateTime effectiveDate) {
+        final SubscriptionBaseTransition subscriptionBaseTransition = Mockito.mock(SubscriptionBaseTransition.class);
+        Mockito.when(subscriptionBaseTransition.getId()).thenReturn(UUID.randomUUID());
+        Mockito.when(subscriptionBaseTransition.getSubscriptionId()).thenReturn(subscriptionId);
+        Mockito.when(subscriptionBaseTransition.getEffectiveTransitionTime()).thenReturn(effectiveDate);
+        return SubscriptionEventOrdering.toSubscriptionEvent(subscriptionBaseTransition, type, internalCallContext);
+    }
+
+    private void computeEvents(final LinkedList<SubscriptionEvent> allEvents, final Collection<BlockingState> blockingStates) {
+        final Collection<UUID> allEntitlementUUIDs = new HashSet<UUID>();
+        for (final SubscriptionEvent subscriptionEvent : allEvents) {
+            allEntitlementUUIDs.add(subscriptionEvent.getEntitlementId());
+        }
+        for (final BlockingState blockingState : blockingStates) {
+            if (blockingState.getType() == BlockingStateType.SUBSCRIPTION) {
+                allEntitlementUUIDs.add(blockingState.getBlockedId());
+            }
+        }
+
+        BlockingStateOrdering.INSTANCE.computeEvents(new LinkedList<UUID>(allEntitlementUUIDs), blockingStates, internalCallContext, allEvents);
+    }
+}
diff --git a/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestDefaultSubscriptionBundleTimeline.java b/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestDefaultSubscriptionBundleTimeline.java
index fc7fb25..b82d6e8 100644
--- a/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestDefaultSubscriptionBundleTimeline.java
+++ b/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestDefaultSubscriptionBundleTimeline.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -41,7 +41,6 @@ import org.killbill.billing.subscription.api.user.SubscriptionBaseTransitionData
 import org.killbill.billing.subscription.events.SubscriptionBaseEvent.EventType;
 import org.killbill.billing.subscription.events.user.ApiEventType;
 import org.mockito.Mockito;
-import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -62,209 +61,6 @@ public class TestDefaultSubscriptionBundleTimeline extends EntitlementTestSuiteN
         bundleExternalKey = bundleId.toString();
     }
 
-    public class TestSubscriptionBundleTimeline extends DefaultSubscriptionBundleTimeline {
-
-        public TestSubscriptionBundleTimeline(final UUID accountId, final UUID bundleId, final String externalKey, final Iterable<Entitlement> entitlements) {
-            super(accountId, bundleId, externalKey, entitlements, internalCallContext);
-        }
-
-        public SubscriptionEvent createEvent(final UUID subscriptionId, final SubscriptionEventType type, final DateTime effectiveDate) {
-            return new DefaultSubscriptionEvent(UUID.randomUUID(),
-                                                subscriptionId,
-                                                effectiveDate,
-                                                type,
-                                                true,
-                                                true,
-                                                "foo",
-                                                "bar",
-                                                null,
-                                                null,
-                                                null,
-                                                null,
-                                                null,
-                                                null,
-                                                null,
-                                                null,
-                                                null,
-                                                null,
-                                                null,
-                                                internalCallContext);
-
-        }
-    }
-
-    @Test(groups = "fast")
-    public void testReOrderSubscriptionEventsOnInvalidOrder1() {
-        final TestSubscriptionBundleTimeline timeline = new TestSubscriptionBundleTimeline(null, null, null, new ArrayList<Entitlement>());
-
-        final List<SubscriptionEvent> events = new ArrayList<SubscriptionEvent>();
-        final UUID subscriptionId = UUID.randomUUID();
-        final DateTime effectiveDate = clock.getUTCNow();
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_BILLING, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_ENTITLEMENT, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_ENTITLEMENT, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_BILLING, effectiveDate));
-
-        SubscriptionEventOrdering.INSTANCE.reOrderSubscriptionEventsOnSameDateByType(events);
-
-        Assert.assertEquals(events.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
-        Assert.assertEquals(events.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
-        Assert.assertEquals(events.get(2).getSubscriptionEventType(), SubscriptionEventType.STOP_ENTITLEMENT);
-        Assert.assertEquals(events.get(3).getSubscriptionEventType(), SubscriptionEventType.STOP_BILLING);
-    }
-
-    @Test(groups = "fast")
-    public void testReOrderSubscriptionEventsOnInvalidOrder2() {
-        final TestSubscriptionBundleTimeline timeline = new TestSubscriptionBundleTimeline(null, null, null, new ArrayList<Entitlement>());
-
-        final List<SubscriptionEvent> events = new ArrayList<SubscriptionEvent>();
-        final UUID subscriptionId = UUID.randomUUID();
-        final DateTime effectiveDate = clock.getUTCNow();
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_BILLING, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_ENTITLEMENT, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_BILLING, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_ENTITLEMENT, effectiveDate));
-
-        SubscriptionEventOrdering.INSTANCE.reOrderSubscriptionEventsOnSameDateByType(events);
-
-        Assert.assertEquals(events.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
-        Assert.assertEquals(events.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
-        Assert.assertEquals(events.get(2).getSubscriptionEventType(), SubscriptionEventType.STOP_ENTITLEMENT);
-        Assert.assertEquals(events.get(3).getSubscriptionEventType(), SubscriptionEventType.STOP_BILLING);
-    }
-
-    @Test(groups = "fast")
-    public void testReOrderSubscriptionEventsOnInvalidOrder3() {
-        final TestSubscriptionBundleTimeline timeline = new TestSubscriptionBundleTimeline(null, null, null, new ArrayList<Entitlement>());
-
-        final List<SubscriptionEvent> events = new ArrayList<SubscriptionEvent>();
-        final UUID subscriptionId = UUID.randomUUID();
-        final DateTime effectiveDate = clock.getUTCNow();
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_BILLING, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_ENTITLEMENT, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_BILLING, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_ENTITLEMENT, effectiveDate));
-
-        SubscriptionEventOrdering.INSTANCE.reOrderSubscriptionEventsOnSameDateByType(events);
-
-        Assert.assertEquals(events.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
-        Assert.assertEquals(events.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
-        Assert.assertEquals(events.get(2).getSubscriptionEventType(), SubscriptionEventType.STOP_ENTITLEMENT);
-        Assert.assertEquals(events.get(3).getSubscriptionEventType(), SubscriptionEventType.STOP_BILLING);
-    }
-
-    @Test(groups = "fast")
-    public void testReOrderSubscriptionEventsOnInvalidOrderAndDifferentSubscriptionsSameDates1() {
-        final TestSubscriptionBundleTimeline timeline = new TestSubscriptionBundleTimeline(null, null, null, new ArrayList<Entitlement>());
-
-        final List<SubscriptionEvent> events = new ArrayList<SubscriptionEvent>();
-        final UUID subscriptionId = UUID.fromString("60b64e0c-cefd-48c3-8de9-c731a9558165");
-
-        final UUID otherSubscriptionId = UUID.fromString("35b3b340-31b2-46ea-b062-e9fc9fab3bc9");
-        final DateTime effectiveDate = clock.getUTCNow();
-
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_ENTITLEMENT, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_BILLING, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_BILLING, effectiveDate));
-        events.add(timeline.createEvent(otherSubscriptionId, SubscriptionEventType.STOP_BILLING, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_ENTITLEMENT, effectiveDate));
-
-        SubscriptionEventOrdering.INSTANCE.reOrderSubscriptionEventsOnSameDateByType(events);
-
-        Assert.assertEquals(events.get(0).getSubscriptionEventType(), SubscriptionEventType.STOP_BILLING);
-        Assert.assertEquals(events.get(0).getEntitlementId(), otherSubscriptionId);
-        Assert.assertEquals(events.get(1).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
-        Assert.assertEquals(events.get(2).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
-        Assert.assertEquals(events.get(3).getSubscriptionEventType(), SubscriptionEventType.STOP_ENTITLEMENT);
-        Assert.assertEquals(events.get(4).getSubscriptionEventType(), SubscriptionEventType.STOP_BILLING);
-        Assert.assertEquals(events.get(4).getEntitlementId(), subscriptionId);
-    }
-
-    @Test(groups = "fast")
-    public void testReOrderSubscriptionEventsOnInvalidOrderAndDifferentSubscriptionsSameDates2() {
-        final TestSubscriptionBundleTimeline timeline = new TestSubscriptionBundleTimeline(null, null, null, new ArrayList<Entitlement>());
-
-        final List<SubscriptionEvent> events = new ArrayList<SubscriptionEvent>();
-        final UUID subscriptionId = UUID.fromString("35b3b340-31b2-46ea-b062-e9fc9fab3bc9");
-        final UUID otherSubscriptionId = UUID.fromString("60b64e0c-cefd-48c3-8de9-c731a9558165");
-
-        final DateTime effectiveDate = clock.getUTCNow();
-
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_ENTITLEMENT, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_BILLING, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_BILLING, effectiveDate));
-        events.add(timeline.createEvent(otherSubscriptionId, SubscriptionEventType.STOP_BILLING, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_ENTITLEMENT, effectiveDate));
-
-        SubscriptionEventOrdering.INSTANCE.reOrderSubscriptionEventsOnSameDateByType(events);
-
-        Assert.assertEquals(events.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
-        Assert.assertEquals(events.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
-        Assert.assertEquals(events.get(2).getSubscriptionEventType(), SubscriptionEventType.STOP_ENTITLEMENT);
-        Assert.assertEquals(events.get(3).getSubscriptionEventType(), SubscriptionEventType.STOP_BILLING);
-        Assert.assertEquals(events.get(3).getEntitlementId(), subscriptionId);
-        Assert.assertEquals(events.get(4).getSubscriptionEventType(), SubscriptionEventType.STOP_BILLING);
-        Assert.assertEquals(events.get(4).getEntitlementId(), otherSubscriptionId);
-    }
-
-    @Test(groups = "fast")
-    public void testReOrderSubscriptionEventsOnInvalidOrderAndDifferentSubscriptionsDates() {
-        final TestSubscriptionBundleTimeline timeline = new TestSubscriptionBundleTimeline(null, null, null, new ArrayList<Entitlement>());
-
-        final List<SubscriptionEvent> events = new ArrayList<SubscriptionEvent>();
-        final UUID subscriptionId = UUID.randomUUID();
-
-        final UUID otherSubscriptionId = UUID.randomUUID();
-        final DateTime effectiveDate = clock.getUTCNow();
-        final DateTime otherEffectiveDate = clock.getUTCNow().plusDays(1);
-
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_BILLING, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_ENTITLEMENT, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_BILLING, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_ENTITLEMENT, effectiveDate));
-
-        events.add(timeline.createEvent(otherSubscriptionId, SubscriptionEventType.START_BILLING, otherEffectiveDate));
-        events.add(timeline.createEvent(otherSubscriptionId, SubscriptionEventType.START_ENTITLEMENT, otherEffectiveDate));
-        events.add(timeline.createEvent(otherSubscriptionId, SubscriptionEventType.STOP_ENTITLEMENT, otherEffectiveDate));
-        events.add(timeline.createEvent(otherSubscriptionId, SubscriptionEventType.STOP_BILLING, otherEffectiveDate));
-        events.add(timeline.createEvent(otherSubscriptionId, SubscriptionEventType.PAUSE_ENTITLEMENT, otherEffectiveDate));
-        events.add(timeline.createEvent(otherSubscriptionId, SubscriptionEventType.PAUSE_BILLING, otherEffectiveDate));
-
-        SubscriptionEventOrdering.INSTANCE.reOrderSubscriptionEventsOnSameDateByType(events);
-
-        Assert.assertEquals(events.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
-        Assert.assertEquals(events.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
-        Assert.assertEquals(events.get(2).getSubscriptionEventType(), SubscriptionEventType.STOP_ENTITLEMENT);
-        Assert.assertEquals(events.get(3).getSubscriptionEventType(), SubscriptionEventType.STOP_BILLING);
-
-        Assert.assertEquals(events.get(4).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
-        Assert.assertEquals(events.get(5).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
-        Assert.assertEquals(events.get(6).getSubscriptionEventType(), SubscriptionEventType.PAUSE_ENTITLEMENT);
-        Assert.assertEquals(events.get(7).getSubscriptionEventType(), SubscriptionEventType.PAUSE_BILLING);
-        Assert.assertEquals(events.get(8).getSubscriptionEventType(), SubscriptionEventType.STOP_ENTITLEMENT);
-        Assert.assertEquals(events.get(9).getSubscriptionEventType(), SubscriptionEventType.STOP_BILLING);
-    }
-
-    @Test(groups = "fast")
-    public void testReOrderSubscriptionEventsOnCorrectOrder() {
-        final TestSubscriptionBundleTimeline timeline = new TestSubscriptionBundleTimeline(null, null, null, new ArrayList<Entitlement>());
-
-        final List<SubscriptionEvent> events = new ArrayList<SubscriptionEvent>();
-        final UUID subscriptionId = UUID.randomUUID();
-        final DateTime effectiveDate = clock.getUTCNow();
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_ENTITLEMENT, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.START_BILLING, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_ENTITLEMENT, effectiveDate));
-        events.add(timeline.createEvent(subscriptionId, SubscriptionEventType.STOP_BILLING, effectiveDate));
-
-        SubscriptionEventOrdering.INSTANCE.reOrderSubscriptionEventsOnSameDateByType(events);
-
-        Assert.assertEquals(events.get(0).getSubscriptionEventType(), SubscriptionEventType.START_ENTITLEMENT);
-        Assert.assertEquals(events.get(1).getSubscriptionEventType(), SubscriptionEventType.START_BILLING);
-        Assert.assertEquals(events.get(2).getSubscriptionEventType(), SubscriptionEventType.STOP_ENTITLEMENT);
-        Assert.assertEquals(events.get(3).getSubscriptionEventType(), SubscriptionEventType.STOP_BILLING);
-    }
-
     @Test(groups = "fast")
     public void testOneSimpleEntitlement() throws CatalogApiException {
         testOneSimpleEntitlementImpl(false);
@@ -275,7 +71,6 @@ public class TestDefaultSubscriptionBundleTimeline extends EntitlementTestSuiteN
         testOneSimpleEntitlementImpl(true);
     }
 
-
     private void testOneSimpleEntitlementImpl(boolean regressionFlagForOlderVersionThan_0_17_X) throws CatalogApiException {
         clock.setDay(new LocalDate(2013, 1, 1));
 
@@ -348,18 +143,16 @@ public class TestDefaultSubscriptionBundleTimeline extends EntitlementTestSuiteN
         assertNull(events.get(3).getNextPhase());
     }
 
-
-    @Test(groups="fast", enabled = true)
+    @Test(groups="fast")
     public void testOneSimpleEntitlementCancelImmediately() throws CatalogApiException {
         testOneSimpleEntitlementCancelImmediatelyImpl(false);
     }
 
-    @Test(groups="fast", enabled = true)
+    @Test(groups="fast")
     public void testOneSimpleEntitlementCancelImmediatelyWithRegression() throws CatalogApiException {
         testOneSimpleEntitlementCancelImmediatelyImpl(true);
     }
 
-
     private void testOneSimpleEntitlementCancelImmediatelyImpl(boolean regressionFlagForOlderVersionThan_0_17_X) throws CatalogApiException {
 
         clock.setDay(new LocalDate(2013, 1, 1));
@@ -518,7 +311,6 @@ public class TestDefaultSubscriptionBundleTimeline extends EntitlementTestSuiteN
         testOneEntitlementWithPauseResumeImpl(true);
     }
 
-
     private void testOneEntitlementWithPauseResumeImpl(final boolean regressionFlagForOlderVersionThan_0_17_X) throws CatalogApiException {
         clock.setDay(new LocalDate(2013, 1, 1));
 
diff --git a/junction/src/main/java/org/killbill/billing/junction/plumbing/billing/BlockingCalculator.java b/junction/src/main/java/org/killbill/billing/junction/plumbing/billing/BlockingCalculator.java
index 3ff89c4..286d1c7 100644
--- a/junction/src/main/java/org/killbill/billing/junction/plumbing/billing/BlockingCalculator.java
+++ b/junction/src/main/java/org/killbill/billing/junction/plumbing/billing/BlockingCalculator.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 
 import org.joda.time.DateTime;
-import org.joda.time.Days;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.api.BillingPeriod;
 import org.killbill.billing.catalog.api.Catalog;
@@ -51,10 +50,13 @@ import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
 import com.google.inject.Inject;
 
 public class BlockingCalculator {
@@ -64,29 +66,6 @@ public class BlockingCalculator {
     private final BlockingInternalApi blockingApi;
     private final CatalogInternalApi catalogInternalApi;
 
-    protected static class DisabledDuration {
-
-        private final DateTime start;
-        private DateTime end;
-
-        public DisabledDuration(final DateTime start, final DateTime end) {
-            this.start = start;
-            this.end = end;
-        }
-
-        public DateTime getStart() {
-            return start;
-        }
-
-        public DateTime getEnd() {
-            return end;
-        }
-
-        public void setEnd(final DateTime end) {
-            this.end = end;
-        }
-    }
-
     @Inject
     public BlockingCalculator(final BlockingInternalApi blockingApi, final CatalogInternalApi catalogInternalApi) {
         this.blockingApi = blockingApi;
@@ -134,8 +113,13 @@ public class BlockingCalculator {
                 final List<BlockingState> aggregateSubscriptionBlockingEvents = getAggregateBlockingEventsPerSubscription(subscription.getEndDate(), subscriptionBlockingEvents, bundleBlockingEvents, accountBlockingEvents);
                 final List<DisabledDuration> accountBlockingDurations = createBlockingDurations(aggregateSubscriptionBlockingEvents);
 
-                billingEventsToAdd.addAll(createNewEvents(accountBlockingDurations, billingEvents, subscription, context));
-                billingEventsToRemove.addAll(eventsToRemove(accountBlockingDurations, billingEvents, subscription));
+                final SortedSet<BillingEvent> subscriptionBillingEvents = filter(billingEvents, subscription);
+
+                final SortedSet<BillingEvent> newEvents = createNewEvents(accountBlockingDurations, subscriptionBillingEvents, context);
+                billingEventsToAdd.addAll(newEvents);
+
+                final SortedSet<BillingEvent> removedEvents = eventsToRemove(accountBlockingDurations, subscriptionBillingEvents);
+                billingEventsToRemove.addAll(removedEvents);
             }
         }
 
@@ -183,14 +167,13 @@ public class BlockingCalculator {
     }
 
     protected SortedSet<BillingEvent> eventsToRemove(final List<DisabledDuration> disabledDuration,
-                                                     final SortedSet<BillingEvent> billingEvents, final SubscriptionBase subscription) {
+                                                     final SortedSet<BillingEvent> subscriptionBillingEvents) {
         final SortedSet<BillingEvent> result = new TreeSet<BillingEvent>();
 
-        final SortedSet<BillingEvent> filteredBillingEvents = filter(billingEvents, subscription);
         for (final DisabledDuration duration : disabledDuration) {
-            for (final BillingEvent event : filteredBillingEvents) {
+            for (final BillingEvent event : subscriptionBillingEvents) {
                 if (duration.getEnd() == null || event.getEffectiveDate().isBefore(duration.getEnd())) {
-                    if (event.getEffectiveDate().isAfter(duration.getStart())) { //between the pair
+                    if (!event.getEffectiveDate().isBefore(duration.getStart())) {
                         result.add(event);
                     }
                 } else { //after the last event of the pair no need to keep checking
@@ -201,7 +184,7 @@ public class BlockingCalculator {
         return result;
     }
 
-    protected SortedSet<BillingEvent> createNewEvents(final List<DisabledDuration> disabledDuration, final SortedSet<BillingEvent> billingEvents, final SubscriptionBase subscription, final InternalTenantContext context) throws CatalogApiException {
+    protected SortedSet<BillingEvent> createNewEvents(final List<DisabledDuration> disabledDuration, final SortedSet<BillingEvent> subscriptionBillingEvents, final InternalTenantContext context) throws CatalogApiException {
 
         Preconditions.checkState(context.getAccountRecordId() != null);
 
@@ -210,12 +193,12 @@ public class BlockingCalculator {
 
         for (final DisabledDuration duration : disabledDuration) {
             // The first one before the blocked duration
-            final BillingEvent precedingInitialEvent = precedingBillingEventForSubscription(duration.getStart(), billingEvents, subscription);
+            final BillingEvent precedingInitialEvent = precedingBillingEventForSubscription(duration.getStart(), subscriptionBillingEvents);
             // The last one during of before the duration
-            final BillingEvent precedingFinalEvent = precedingBillingEventForSubscription(duration.getEnd(), billingEvents, subscription);
+            final BillingEvent precedingFinalEvent = precedingBillingEventForSubscription(duration.getEnd(), subscriptionBillingEvents);
 
             if (precedingInitialEvent != null) { // there is a preceding billing event
-                result.add(createNewDisableEvent(duration.getStart(), precedingInitialEvent, catalog, context));
+                result.add(createNewDisableEvent(duration.getStart(), precedingInitialEvent, catalog));
                 if (duration.getEnd() != null) { // no second event in the pair means they are still disabled (no re-enable)
                     result.add(createNewReenableEvent(duration.getEnd(), precedingFinalEvent, catalog, context));
                 }
@@ -227,43 +210,40 @@ public class BlockingCalculator {
         return result;
     }
 
-    protected BillingEvent precedingBillingEventForSubscription(final DateTime datetime, final SortedSet<BillingEvent> billingEvents, final SubscriptionBase subscription) {
-        if (datetime == null) { //second of a pair can be null if there's no re-enabling
-            return null;
-        }
-
-        final SortedSet<BillingEvent> filteredBillingEvents = filter(billingEvents, subscription);
-        BillingEvent result = filteredBillingEvents.first();
 
-        if (datetime.isBefore(result.getEffectiveDate())) {
-            //This case can happen, for example, if we have an add on and the bundle goes into disabled before the add on is created
+    protected BillingEvent precedingBillingEventForSubscription(final DateTime disabledDurationStart, final SortedSet<BillingEvent> subscriptionBillingEvents) {
+        if (disabledDurationStart == null) {
             return null;
         }
 
-        for (final BillingEvent event : filteredBillingEvents) {
-            if (!event.getEffectiveDate().isBefore(datetime)) { // found it its the previous event
-                return result;
-            } else { // still looking
-                result = event;
+        // We look for the first billingEvent strictly prior our disabledDurationStart or null if none
+        BillingEvent prev = null;
+        for (final BillingEvent event : subscriptionBillingEvents) {
+            if (!event.getEffectiveDate().isBefore(disabledDurationStart)) {
+                return prev;
+            } else {
+                prev = event;
             }
         }
-        return result;
+        return prev;
     }
 
+
     protected SortedSet<BillingEvent> filter(final SortedSet<BillingEvent> billingEvents, final SubscriptionBase subscription) {
         final SortedSet<BillingEvent> result = new TreeSet<BillingEvent>();
         for (final BillingEvent event : billingEvents) {
-            if (event.getSubscription() == subscription) {
+            if (event.getSubscription().getId().equals(subscription.getId())) {
                 result.add(event);
             }
         }
         return result;
     }
 
-    protected BillingEvent createNewDisableEvent(final DateTime odEventTime, final BillingEvent previousEvent, final Catalog catalog, final InternalTenantContext context) throws CatalogApiException {
+    protected BillingEvent createNewDisableEvent(final DateTime disabledDurationStart, final BillingEvent previousEvent, final Catalog catalog) throws CatalogApiException {
+
         final int billCycleDay = previousEvent.getBillCycleDayLocal();
         final SubscriptionBase subscription = previousEvent.getSubscription();
-        final DateTime effectiveDate = odEventTime;
+        final DateTime effectiveDate = disabledDurationStart;
         final PlanPhase planPhase = previousEvent.getPlanPhase();
         final Plan plan = previousEvent.getPlan();
 
@@ -320,55 +300,54 @@ public class BlockingCalculator {
     }
 
     // In ascending order
-    protected List<DisabledDuration> createBlockingDurations(final Iterable<BlockingState> overdueBundleEvents) {
-        final List<DisabledDuration> result = new ArrayList<BlockingCalculator.DisabledDuration>();
-        // Earliest blocking event
-        BlockingState first = null;
-
-        int blockedNesting = 0;
-        BlockingState lastOne = null;
-        for (final BlockingState e : overdueBundleEvents) {
-            lastOne = e;
-            if (e.isBlockBilling() && blockedNesting == 0) {
-                // First blocking event of contiguous series of blocking events
-                first = e;
-                blockedNesting++;
-            } else if (e.isBlockBilling() && blockedNesting > 0) {
-                // Nest blocking states
-                blockedNesting++;
-            } else if (!e.isBlockBilling() && blockedNesting > 0) {
-                blockedNesting--;
-                if (blockedNesting == 0) {
-                    // End of the interval
-                    addDisabledDuration(result, first, e);
-                    first = null;
-                }
+    protected List<DisabledDuration> createBlockingDurations(final Iterable<BlockingState> inputBundleEvents) {
+
+        final List<DisabledDuration> result = new ArrayList<DisabledDuration>();
+
+        final Set<String> services = ImmutableSet.copyOf(Iterables.transform(inputBundleEvents, new Function<BlockingState, String>() {
+            @Override
+            public String apply(final BlockingState input) {
+                return input.getService();
             }
-        }
+        }));
 
-        if (first != null) { // found a transition to disabled with no terminating event
-            addDisabledDuration(result, first, lastOne.isBlockBilling() ? null : lastOne);
+        final Map<String, BlockingStateService> svcBlockedMap = new HashMap<String, BlockingStateService>();
+        for (String svc : services) {
+            svcBlockedMap.put(svc, new BlockingStateService());
         }
 
-        return result;
-    }
-
-    private void addDisabledDuration(final List<DisabledDuration> result, final BlockingState firstBlocking, @Nullable final BlockingState firstNonBlocking) {
-        final DisabledDuration lastOne;
-        if (!result.isEmpty()) {
-            lastOne = result.get(result.size() - 1);
-        } else {
-            lastOne = null;
+        for (final BlockingState e : inputBundleEvents) {
+            svcBlockedMap.get(e.getService()).addBlockingState(e);
         }
 
-        final DateTime startDate = firstBlocking.getEffectiveDate();
-        final DateTime endDate = firstNonBlocking == null ? null : firstNonBlocking.getEffectiveDate();
-        if (lastOne != null && lastOne.getEnd().compareTo(startDate) == 0) {
-            lastOne.setEnd(endDate);
-        } else if (endDate == null || Days.daysBetween(startDate, endDate).getDays() >= 1) {
-            // Don't disable for periods less than a day (see https://github.com/killbill/killbill/issues/267)
-            result.add(new DisabledDuration(startDate, endDate));
+        final Iterable<DisabledDuration> unorderedDisabledDuration = Iterables.concat(Iterables.transform(svcBlockedMap.values(), new Function<BlockingStateService, List<DisabledDuration>>() {
+            @Override
+            public List<DisabledDuration> apply(final BlockingStateService input) {
+                return input.build();
+            }
+        }));
+
+        final List<DisabledDuration> sortedDisabledDuration = Ordering.natural().sortedCopy(unorderedDisabledDuration);
+
+        DisabledDuration prevDuration = null;
+        for (DisabledDuration d : sortedDisabledDuration) {
+            // isDisjoint
+            if (prevDuration == null) {
+                prevDuration = d;
+            } else {
+                if (prevDuration.isDisjoint(d)) {
+                    result.add(prevDuration);
+                    prevDuration = d;
+                } else {
+                    prevDuration = DisabledDuration.mergeDuration(prevDuration, d);
+                }
+            }
+        }
+        if (prevDuration != null) {
+            result.add(prevDuration);
         }
+
+        return result;
     }
 
     @VisibleForTesting
diff --git a/junction/src/main/java/org/killbill/billing/junction/plumbing/billing/BlockingStateService.java b/junction/src/main/java/org/killbill/billing/junction/plumbing/billing/BlockingStateService.java
new file mode 100644
index 0000000..2e2dc56
--- /dev/null
+++ b/junction/src/main/java/org/killbill/billing/junction/plumbing/billing/BlockingStateService.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.junction.plumbing.billing;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.DateTime;
+import org.joda.time.Days;
+import org.killbill.billing.entitlement.api.BlockingState;
+
+public class BlockingStateService {
+
+    final Map<UUID, BlockingState> perObjectTypeFirstBlockingState = new HashMap<UUID, BlockingState>();
+
+    private final List<DisabledDuration> result;
+
+    public BlockingStateService() {
+        this.result = new ArrayList<DisabledDuration>();
+    }
+
+    public List<DisabledDuration> build() {
+        for (BlockingState cur : perObjectTypeFirstBlockingState.values()) {
+            if (cur != null) {
+                addDisabledDuration(cur, null);
+            }
+        }
+        return result;
+    }
+
+    public void addBlockingState(final BlockingState currentBlockingState) {
+
+        final BlockingState firstBlockingState = perObjectTypeFirstBlockingState.get(currentBlockingState.getBlockedId());
+        if (currentBlockingState.isBlockBilling() && firstBlockingState == null) {
+            perObjectTypeFirstBlockingState.put(currentBlockingState.getBlockedId(), currentBlockingState);
+        } else if (!currentBlockingState.isBlockBilling() && firstBlockingState != null) {
+            addDisabledDuration(firstBlockingState, currentBlockingState.getEffectiveDate());
+            perObjectTypeFirstBlockingState.put(currentBlockingState.getBlockedId(), null);
+        }
+    }
+
+    private void addDisabledDuration(final BlockingState firstBlockingState, @Nullable final DateTime disableDurationEndDate) {
+
+        if (disableDurationEndDate == null || Days.daysBetween(firstBlockingState.getEffectiveDate(), disableDurationEndDate).getDays() >= 1) {
+            // Don't disable for periods less than a day (see https://github.com/killbill/killbill/issues/267)
+            result.add(new DisabledDuration(firstBlockingState.getEffectiveDate(), disableDurationEndDate));
+        }
+    }
+}
diff --git a/junction/src/main/java/org/killbill/billing/junction/plumbing/billing/DisabledDuration.java b/junction/src/main/java/org/killbill/billing/junction/plumbing/billing/DisabledDuration.java
new file mode 100644
index 0000000..d8f7870
--- /dev/null
+++ b/junction/src/main/java/org/killbill/billing/junction/plumbing/billing/DisabledDuration.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.junction.plumbing.billing;
+
+import org.joda.time.DateTime;
+
+import com.google.common.base.Preconditions;
+
+class DisabledDuration implements Comparable<DisabledDuration> {
+
+    private final DateTime start;
+    private DateTime end;
+
+    public DisabledDuration(final DateTime start, final DateTime end) {
+        this.start = start;
+        this.end = end;
+    }
+
+    public DateTime getStart() {
+        return start;
+    }
+
+    public DateTime getEnd() {
+        return end;
+    }
+
+    public void setEnd(final DateTime end) {
+        this.end = end;
+    }
+
+    // Order by start date first and then end date
+    @Override
+    public int compareTo(final DisabledDuration o) {
+        int result = start.compareTo(o.getStart());
+        if (result == 0) {
+            if (end == null && o.getEnd() == null) {
+                result = 0;
+            } else if (end != null && o.getEnd() != null) {
+                result = end.compareTo(o.getEnd());
+            } else if (o.getEnd() == null) {
+                return -1;
+            } else {
+                return 1;
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof DisabledDuration)) {
+            return false;
+        }
+
+        final DisabledDuration that = (DisabledDuration) o;
+
+        return compareTo(that) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = start != null ? start.hashCode() : 0;
+        result = 31 * result + (end != null ? end.hashCode() : 0);
+        return result;
+    }
+
+    //
+    //
+    //  Assumptions (based on ordering):
+    //   * this.start <= o.start
+    //   * this.end <= o.end when this.start == o.start
+    //
+    // Case 1: this contained into o => false
+    // |---------|       this
+    // |--------------|  o
+    //
+    // Case 2: this overlaps with o => false
+    // |---------|            this
+    //      |--------------|  o
+    //
+    // Case 3: o contains into this => false
+    // |---------| this
+    //      |---|  o
+    //
+    // Case 4: this and o are adjacent => false
+    // |---------| this
+    //           |---|  o
+    // Case 5: this and o are disjoint => true
+    // |---------| this
+    //             |---|  o
+    public boolean isDisjoint(final DisabledDuration o) {
+        return end!= null && end.compareTo(o.getStart()) < 0;
+    }
+
+    public static DisabledDuration mergeDuration(DisabledDuration d1, DisabledDuration d2) {
+        Preconditions.checkState(d1.getStart().compareTo(d2.getStart()) <=0 );
+        Preconditions.checkState(!d1.isDisjoint(d2));
+
+        final DateTime endDate = (d1.getEnd() != null && d2.getEnd() != null) ?
+                                 d1.getEnd().compareTo(d2.getEnd()) < 0 ? d2.getEnd() : d1.getEnd() :
+                                 null;
+
+        return new DisabledDuration(d1.getStart(), endDate);
+    }
+
+}
diff --git a/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBlockingCalculator.java b/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBlockingCalculator.java
index 2d17bdc..472c996 100644
--- a/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBlockingCalculator.java
+++ b/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBlockingCalculator.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -47,7 +47,6 @@ import org.killbill.billing.entitlement.dao.MockBlockingStateDao;
 import org.killbill.billing.junction.BillingEvent;
 import org.killbill.billing.junction.DefaultBlockingState;
 import org.killbill.billing.junction.JunctionTestSuiteNoDB;
-import org.killbill.billing.junction.plumbing.billing.BlockingCalculator.DisabledDuration;
 import org.killbill.billing.subscription.api.SubscriptionBase;
 import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
 import org.mockito.Mockito;
@@ -154,13 +153,13 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testEventsToRemoveOpenPrev() {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, null));
         billingEvents.add(createRealEvent(now.minusDays(1), subscription1));
 
-        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents, subscription1);
+        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents);
 
         assertEquals(results.size(), 0);
     }
@@ -170,7 +169,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testEventsToRemoveOpenPrevFollow() {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, null));
@@ -179,7 +178,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
         billingEvents.add(e1);
         billingEvents.add(e2);
 
-        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents, subscription1);
+        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents);
 
         assertEquals(results.size(), 1);
         assertEquals(results.first(), e2);
@@ -190,14 +189,32 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testEventsToRemoveOpenFollow() {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, null));
         final BillingEvent e1 = createRealEvent(now.plusDays(1), subscription1);
         billingEvents.add(e1);
 
-        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents, subscription1);
+        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents);
+
+        assertEquals(results.size(), 1);
+        assertEquals(results.first(), e1);
+    }
+
+    // Open with no previous event (only at the same time)
+    // -----[X-----------------------------
+    @Test(groups = "fast")
+    public void testEventsToRemoveOpenSameTime() {
+        final DateTime now = clock.getUTCNow();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
+        final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
+
+        disabledDuration.add(new DisabledDuration(now, null));
+        final BillingEvent e1 = createRealEvent(now, subscription1);
+        billingEvents.add(e1);
+
+        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents);
 
         assertEquals(results.size(), 1);
         assertEquals(results.first(), e1);
@@ -208,14 +225,14 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testEventsToRemoveClosedPrev() {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, now.plusDays(2)));
         final BillingEvent e1 = createRealEvent(now.minusDays(1), subscription1);
         billingEvents.add(e1);
 
-        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents, subscription1);
+        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents);
 
         assertEquals(results.size(), 0);
     }
@@ -225,7 +242,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testEventsToRemoveClosedPrevBetw() {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, now.plusDays(2)));
@@ -234,7 +251,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
         billingEvents.add(e1);
         billingEvents.add(e2);
 
-        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents, subscription1);
+        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents);
 
         assertEquals(results.size(), 1);
         assertEquals(results.first(), e2);
@@ -245,7 +262,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testEventsToRemoveClosedPrevBetwNext() {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, now.plusDays(2)));
@@ -256,7 +273,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
         billingEvents.add(e2);
         billingEvents.add(e3);
 
-        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents, subscription1);
+        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents);
 
         assertEquals(results.size(), 1);
         assertEquals(results.first(), e2);
@@ -267,14 +284,14 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testEventsToRemoveClosedBetwn() {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, now.plusDays(2)));
         final BillingEvent e2 = createRealEvent(now.plusDays(1), subscription1);
         billingEvents.add(e2);
 
-        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents, subscription1);
+        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents);
 
         assertEquals(results.size(), 1);
         assertEquals(results.first(), e2);
@@ -285,7 +302,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testEventsToRemoveClosedBetweenFollow() {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, now.plusDays(2)));
@@ -295,7 +312,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
         billingEvents.add(e2);
         billingEvents.add(e3);
 
-        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents, subscription1);
+        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents);
 
         assertEquals(results.size(), 1);
         assertEquals(results.first(), e2);
@@ -306,7 +323,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testEventsToRemoveClosedFollow() {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, now.plusDays(2)));
@@ -315,7 +332,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
 
         billingEvents.add(e3);
 
-        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents, subscription1);
+        final SortedSet<BillingEvent> results = blockingCalculator.eventsToRemove(disabledDuration, billingEvents);
 
         assertEquals(results.size(), 0);
     }
@@ -325,13 +342,13 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testCreateNewEventsOpenPrev() throws CatalogApiException {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, null));
         billingEvents.add(createRealEvent(now.minusDays(1), subscription1));
 
-        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, subscription1, internalCallContext);
+        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents,  internalCallContext);
 
         assertEquals(results.size(), 1);
         assertEquals(results.first().getEffectiveDate(), now);
@@ -346,14 +363,14 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testCreateNewEventsOpenPrevFollow() throws CatalogApiException {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, null));
         billingEvents.add(createRealEvent(now.minusDays(1), subscription1));
         billingEvents.add(createRealEvent(now.plusDays(1), subscription1));
 
-        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, subscription1, internalCallContext);
+        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, internalCallContext);
 
         assertEquals(results.size(), 1);
         assertEquals(results.first().getEffectiveDate(), now);
@@ -368,13 +385,29 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testCreateNewEventsOpenFollow() throws CatalogApiException {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, null));
         billingEvents.add(createRealEvent(now.plusDays(1), subscription1));
 
-        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, subscription1, internalCallContext);
+        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, internalCallContext);
+
+        assertEquals(results.size(), 0);
+    }
+
+    // Open with no previous event (only at the same time)
+    // -----[X-----------------------------
+    @Test(groups = "fast")
+    public void testCreateNewEventsOpenSameTime() throws CatalogApiException {
+        final DateTime now = clock.getUTCNow();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
+        final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
+
+        disabledDuration.add(new DisabledDuration(now, null));
+        billingEvents.add(createRealEvent(now, subscription1));
+
+        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, internalCallContext);
 
         assertEquals(results.size(), 0);
     }
@@ -384,13 +417,13 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testCreateNewEventsClosedPrev() throws CatalogApiException {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, now.plusDays(2)));
         billingEvents.add(createRealEvent(now.minusDays(1), subscription1));
 
-        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, subscription1, internalCallContext);
+        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, internalCallContext);
 
 
         assertEquals(results.size(), 2);
@@ -409,14 +442,14 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testCreateNewEventsClosedPrevBetw() throws CatalogApiException {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, now.plusDays(2)));
         billingEvents.add(createRealEvent(now.minusDays(1), subscription1));
         billingEvents.add(createRealEvent(now.plusDays(1), subscription1));
 
-        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, subscription1, internalCallContext);
+        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, internalCallContext);
 
         assertEquals(results.size(), 2);
         assertEquals(results.first().getEffectiveDate(), now);
@@ -434,7 +467,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testCreateNewEventsClosedPrevBetwNext() throws CatalogApiException {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, now.plusDays(2)));
@@ -442,7 +475,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
         billingEvents.add(createRealEvent(now.plusDays(1), subscription1));
         billingEvents.add(createRealEvent(now.plusDays(3), subscription1));
 
-        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, subscription1, internalCallContext);
+        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, internalCallContext);
 
         assertEquals(results.size(), 2);
         assertEquals(results.first().getEffectiveDate(), now);
@@ -460,13 +493,13 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testCreateNewEventsClosedBetwn() throws CatalogApiException {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, now.plusDays(2)));
         billingEvents.add(createRealEvent(now.plusDays(1), subscription1));
 
-        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, subscription1, internalCallContext);
+        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, internalCallContext);
 
         assertEquals(results.size(), 1);
         assertEquals(results.last().getEffectiveDate(), now.plusDays(2));
@@ -479,13 +512,13 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testCreateNewEventsClosedBetweenFollow() throws CatalogApiException {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, now.plusDays(2)));
         billingEvents.add(createRealEvent(now.plusDays(1), subscription1));
 
-        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, subscription1, internalCallContext);
+        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, internalCallContext);
 
         assertEquals(results.size(), 1);
         assertEquals(results.last().getEffectiveDate(), now.plusDays(2));
@@ -498,13 +531,13 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     @Test(groups = "fast")
     public void testCreateNewEventsClosedFollow() throws CatalogApiException {
         final DateTime now = clock.getUTCNow();
-        final List<DisabledDuration> disabledDuration = new ArrayList<BlockingCalculator.DisabledDuration>();
+        final List<DisabledDuration> disabledDuration = new ArrayList<DisabledDuration>();
         final SortedSet<BillingEvent> billingEvents = new TreeSet<BillingEvent>();
 
         disabledDuration.add(new DisabledDuration(now, now.plusDays(2)));
         billingEvents.add(createRealEvent(now.plusDays(3), subscription1));
 
-        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, subscription1, internalCallContext);
+        final SortedSet<BillingEvent> results = blockingCalculator.createNewEvents(disabledDuration, billingEvents, internalCallContext);
 
         assertEquals(results.size(), 0);
     }
@@ -520,10 +553,10 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
         events.add(createRealEvent(now.minusDays(5), subscription1));
         events.add(createRealEvent(now.minusDays(1), subscription1));
 
-        final BillingEvent minus11 = blockingCalculator.precedingBillingEventForSubscription(now.minusDays(11), events, subscription1);
+        final BillingEvent minus11 = blockingCalculator.precedingBillingEventForSubscription(now.minusDays(11), events);
         assertNull(minus11);
 
-        final BillingEvent minus5andAHalf = blockingCalculator.precedingBillingEventForSubscription(now.minusDays(5).minusHours(12), events, subscription1);
+        final BillingEvent minus5andAHalf = blockingCalculator.precedingBillingEventForSubscription(now.minusDays(5).minusHours(12), events);
         assertNotNull(minus5andAHalf);
         assertEquals(minus5andAHalf.getEffectiveDate(), now.minusDays(6));
 
@@ -594,7 +627,7 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
         final DateTime now = clock.getUTCNow();
         final BillingEvent event = new MockBillingEvent();
 
-        final BillingEvent result = blockingCalculator.createNewDisableEvent(now, event, null, internalCallContext);
+        final BillingEvent result = blockingCalculator.createNewDisableEvent(now, event, null);
         assertEquals(result.getBillCycleDayLocal(), event.getBillCycleDayLocal());
         assertEquals(result.getEffectiveDate(), now);
         assertEquals(result.getPlanPhase(), event.getPlanPhase());
@@ -759,6 +792,41 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
     }
 
     @Test(groups = "fast")
+    public void testCreateAndMergeDisablePairs() {
+        final List<BlockingState> blockingEvents = new ArrayList<BlockingState>();
+        final UUID ovdId = UUID.randomUUID();
+        final DateTime entitlementStartDate = clock.getUTCNow();
+        final DateTime blockEffectiveDate = entitlementStartDate.plusSeconds(1);
+        final DateTime unblockEffectiveDate = blockEffectiveDate.plusDays(2);
+
+        // Similar to an entitlement start event
+        blockingEvents.add(new DefaultBlockingState(ovdId, BlockingStateType.SUBSCRIPTION_BUNDLE, CLEAR_BUNDLE, "test", false, false, false, entitlementStartDate));
+        List<DisabledDuration> pairs = blockingCalculator.createBlockingDurations(blockingEvents);
+        assertEquals(pairs.size(), 0);
+
+        blockingEvents.add(new DefaultBlockingState(ovdId, BlockingStateType.SUBSCRIPTION_BUNDLE, DISABLED_BUNDLE, "test", true, true, true, blockEffectiveDate));
+
+        pairs = blockingCalculator.createBlockingDurations(blockingEvents);
+        assertEquals(pairs.size(), 1);
+        assertEquals(pairs.get(0).getStart().compareTo(blockEffectiveDate), 0);
+        assertNull(pairs.get(0).getEnd());
+
+        blockingEvents.add(new DefaultBlockingState(ovdId, BlockingStateType.SUBSCRIPTION_BUNDLE, CLEAR_BUNDLE, "test", false, false, false, unblockEffectiveDate));
+
+        pairs = blockingCalculator.createBlockingDurations(blockingEvents);
+        assertEquals(pairs.size(), 1);
+        assertEquals(pairs.get(0).getStart().compareTo(blockEffectiveDate), 0);
+        assertEquals(pairs.get(0).getEnd().compareTo(unblockEffectiveDate), 0);
+
+        blockingEvents.add(new DefaultBlockingState(ovdId, BlockingStateType.SUBSCRIPTION_BUNDLE, DISABLED_BUNDLE, "test", true, true, true, unblockEffectiveDate));
+
+        pairs = blockingCalculator.createBlockingDurations(blockingEvents);
+        assertEquals(pairs.size(), 1);
+        assertEquals(pairs.get(0).getStart().compareTo(blockEffectiveDate), 0);
+        assertNull(pairs.get(0).getEnd());
+    }
+
+    @Test(groups = "fast")
     public void testSimpleWithClearBlockingDuration() throws Exception {
 
         final BillingEvent trial = createRealEvent(new LocalDate(2012, 5, 1).toDateTimeAtStartOfDay(DateTimeZone.UTC), subscription1, SubscriptionBaseTransitionType.CREATE);
@@ -793,6 +861,5 @@ public class TestBlockingCalculator extends JunctionTestSuiteNoDB {
         assertEquals(events.get(3).getEffectiveDate(), new LocalDate(2012, 7, 25).toDateTimeAtStartOfDay(DateTimeZone.UTC));
         assertEquals(events.get(3).getTransitionType(), SubscriptionBaseTransitionType.END_BILLING_DISABLED);
         assertEquals(events.get(4).getEffectiveDate(), new LocalDate(2012, 7, 25).toDateTimeAtStartOfDay(DateTimeZone.UTC));
-        assertEquals(events.get(4).getTransitionType(), SubscriptionBaseTransitionType.CHANGE);
-    }
+        assertEquals(events.get(4).getTransitionType(), SubscriptionBaseTransitionType.CHANGE);    }
 }
diff --git a/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBlockingStateService.java b/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBlockingStateService.java
new file mode 100644
index 0000000..0013ab8
--- /dev/null
+++ b/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestBlockingStateService.java
@@ -0,0 +1,399 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.junction.plumbing.billing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.killbill.billing.entitlement.api.BlockingState;
+import org.killbill.billing.entitlement.api.BlockingStateType;
+import org.killbill.billing.junction.DefaultBlockingState;
+import org.killbill.billing.junction.JunctionTestSuiteNoDB;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class TestBlockingStateService extends JunctionTestSuiteNoDB {
+
+
+    private UUID accountId;
+    private UUID bundleId;
+    private UUID subscriptionId;
+
+    @BeforeMethod(groups = "fast")
+    public void beforeMethod() throws Exception {
+       super.beforeMethod();
+        this.accountId = UUID.randomUUID();
+        this.bundleId = UUID.randomUUID();
+        this.subscriptionId = UUID.randomUUID();
+    }
+
+    //
+    // In all tests:
+    // * Events are B(locked) or U(nblocked)
+    // * Types are (A(ccount), B(undle), S(ubscription))
+
+    //             B    B     U     U
+    //             |----|-----|-----|
+    //             A    B     A     B
+    //
+    //  Expected:  B----------------U
+    //
+    @Test(groups = "fast")
+    public void testInterlaceTypes() throws Exception {
+
+        final List<BlockingState> input = new ArrayList<BlockingState>();
+
+        final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
+        final DateTime testInit = new DateTime(2017, 04, 29, 14, 15, 53, tz);
+        clock.setTime(testInit);
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit));
+        input.add(createBillingBlockingState(BlockingStateType.SUBSCRIPTION_BUNDLE, true, testInit.plusDays(1)));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, false, testInit.plusDays(2)));
+        input.add(createBillingBlockingState(BlockingStateType.SUBSCRIPTION_BUNDLE, false, testInit.plusDays(3)));
+
+        final BlockingStateService test = new BlockingStateService();
+        for (BlockingState cur : input) {
+            test.addBlockingState(cur);
+        }
+        final List<DisabledDuration> result = test.build();
+
+        final List<DisabledDuration> expected = ImmutableList.of(new DisabledDuration(testInit, testInit.plusDays(2)),
+                                                                 new DisabledDuration(testInit.plusDays(1), testInit.plusDays(3)));
+
+        verify(result, expected);
+    }
+
+
+    //               B    B     U
+    //               |----|-----|-----
+    //               A    B     A
+    //
+    // Expected:     B-------------------
+    //
+    @Test(groups = "fast")
+    public void testInterlaceTypesWithNoEnd() throws Exception {
+
+        final List<BlockingState> input = new ArrayList<BlockingState>();
+
+        final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
+        final DateTime testInit = new DateTime(2017, 04, 29, 14, 15, 53, tz);
+        clock.setTime(testInit);
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit));
+        input.add(createBillingBlockingState(BlockingStateType.SUBSCRIPTION_BUNDLE, true, testInit.plusDays(1)));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, false, testInit.plusDays(2)));
+
+        final BlockingStateService test = new BlockingStateService();
+        for (BlockingState cur : input) {
+            test.addBlockingState(cur);
+        }
+        final List<DisabledDuration> result = test.build();
+
+        final List<DisabledDuration> expected = ImmutableList.of(new DisabledDuration(testInit, testInit.plusDays(2)),
+                                                                 new DisabledDuration(testInit.plusDays(1), null));
+
+        verify(result, expected);
+    }
+
+    //             B    U     B     U
+    //             |----|-----|-----|
+    //             A    A     A     A
+    //
+    //  Expected:  B----------------U
+    //
+    @Test(groups = "fast")
+    public void testMultipleDisabledDurations() throws Exception {
+
+        final List<BlockingState> input = new ArrayList<BlockingState>();
+
+        final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
+        final DateTime testInit = new DateTime(2017, 04, 29, 14, 15, 53, tz);
+        clock.setTime(testInit);
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, false, testInit.plusDays(1)));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit.plusDays(2)));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, false, testInit.plusDays(3)));
+
+        final BlockingStateService test = new BlockingStateService();
+        for (BlockingState cur : input) {
+            test.addBlockingState(cur);
+        }
+        final List<DisabledDuration> result = test.build();
+
+        final List<DisabledDuration> expected = ImmutableList.of(new DisabledDuration(testInit, testInit.plusDays(1)),
+                                                                 new DisabledDuration(testInit.plusDays(2), testInit.plusDays(3)));
+
+        verify(result, expected);
+    }
+
+
+    //             B    U     U
+    //             |----|-----|
+    //             AB   B     A
+    //
+    //  Expected:  B----------U
+    //
+    @Test(groups = "fast")
+    public void testSameBlockingDates() throws Exception {
+
+        final List<BlockingState> input = new ArrayList<BlockingState>();
+
+        final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
+        final DateTime testInit = new DateTime(2017, 04, 29, 14, 15, 53, tz);
+        clock.setTime(testInit);
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit));
+        input.add(createBillingBlockingState(BlockingStateType.SUBSCRIPTION_BUNDLE, true, testInit));
+        input.add(createBillingBlockingState(BlockingStateType.SUBSCRIPTION_BUNDLE, false, testInit.plusDays(1)));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, false, testInit.plusDays(2)));
+
+        final BlockingStateService test = new BlockingStateService();
+        for (BlockingState cur : input) {
+            test.addBlockingState(cur);
+        }
+        final List<DisabledDuration> result = test.build();
+
+        final List<DisabledDuration> expected = ImmutableList.of(new DisabledDuration(testInit, testInit.plusDays(1)),
+                                                                 new DisabledDuration(testInit, testInit.plusDays(2)));
+
+        verify(result, expected);
+    }
+
+
+    //             BU
+    //             |
+    //             AA
+    //
+    //  Expected:  None
+    //
+    @Test(groups = "fast")
+    public void testSameBlockingUnblockingDates() throws Exception {
+
+        final List<BlockingState> input = new ArrayList<BlockingState>();
+
+        final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
+        final DateTime testInit = new DateTime(2017, 04, 29, 14, 15, 53, tz);
+        clock.setTime(testInit);
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, false, testInit));
+
+        final BlockingStateService test = new BlockingStateService();
+        for (BlockingState cur : input) {
+            test.addBlockingState(cur);
+        }
+        final List<DisabledDuration> result = test.build();
+
+        final List<DisabledDuration> expected = ImmutableList.of();
+
+        verify(result, expected);
+    }
+
+
+    //             B U
+    //             |-|
+    //             A A
+    //
+    //  Expected:  None
+    //
+    @Test(groups = "fast")
+    public void testBlockingUnblockingDatesLessThanADay1() throws Exception {
+
+        final List<BlockingState> input = new ArrayList<BlockingState>();
+
+        final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
+        final DateTime testInit = new DateTime(2017, 04, 29, 14, 15, 53, tz);
+        clock.setTime(testInit);
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, false, testInit.plusHours(10)));
+
+        final BlockingStateService test = new BlockingStateService();
+        for (BlockingState cur : input) {
+            test.addBlockingState(cur);
+        }
+        final List<DisabledDuration> result = test.build();
+
+        final List<DisabledDuration> expected = ImmutableList.of();
+
+        verify(result, expected);
+    }
+
+
+    //              B       BU
+    //              |-------|
+    //              A       AA
+    //
+    //  Expected:   B--------
+    //
+    @Test(groups = "fast")
+    public void testBlockingUnblockingDatesLessThanADay2() throws Exception {
+
+        final List<BlockingState> input = new ArrayList<BlockingState>();
+
+        final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
+        final DateTime testInit = new DateTime(2017, 04, 29, 14, 15, 53, tz);
+        clock.setTime(testInit);
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, false, testInit.plusDays(1)));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit.plusDays(1)));
+
+        final BlockingStateService test = new BlockingStateService();
+        for (BlockingState cur : input) {
+            test.addBlockingState(cur);
+        }
+        final List<DisabledDuration> result = test.build();
+
+        final List<DisabledDuration> expected = ImmutableList.of(new DisabledDuration(testInit, testInit.plusDays(1)),
+                                                                 new DisabledDuration(testInit.plusDays(1), null));
+
+        verify(result, expected);
+    }
+
+
+    //              B       BU
+    //              |-------|
+    //              A       AA
+    //
+    //  Expected:   B--------
+    //
+    @Test(groups = "fast")
+    public void testBlockingUnblockingDatesLessThanADay3() throws Exception {
+
+        final List<BlockingState> input = new ArrayList<BlockingState>();
+
+        final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
+        final DateTime testInit = new DateTime(2017, 04, 29, 14, 15, 53, tz);
+        clock.setTime(testInit);
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit.plusDays(1)));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, false, testInit.plusDays(1)));
+
+        final BlockingStateService test = new BlockingStateService();
+        for (BlockingState cur : input) {
+            test.addBlockingState(cur);
+        }
+        final List<DisabledDuration> result = test.build();
+
+        final List<DisabledDuration> expected = ImmutableList.of(new DisabledDuration(testInit, testInit.plusDays(1)));
+
+        verify(result, expected);
+    }
+
+
+    //              B       UB
+    //              |-------|
+    //              A       AA
+    //
+    //  Expected:   B--------
+    //
+    @Test(groups = "fast")
+    public void testBlockingUnblockingDatesLessThanADay4() throws Exception {
+
+        final List<BlockingState> input = new ArrayList<BlockingState>();
+
+        final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
+        final DateTime testInit = new DateTime(2017, 04, 29, 14, 15, 53, tz);
+        clock.setTime(testInit);
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, false, testInit.plusDays(1)));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit.plusDays(1)));
+
+        final BlockingStateService test = new BlockingStateService();
+        for (BlockingState cur : input) {
+            test.addBlockingState(cur);
+        }
+        final List<DisabledDuration> result = test.build();
+
+        final List<DisabledDuration> expected = ImmutableList.of(new DisabledDuration(testInit, testInit.plusDays(1)),
+                                                                 new DisabledDuration(testInit.plusDays(1), null));
+
+        verify(result, expected);
+    }
+
+
+    //              U       B    B
+    //              |-------|----|
+    //              B       A    B
+    //
+    //  Expected:           B--------
+    //
+    @Test(groups = "fast")
+    public void testStartingWithUnblock() throws Exception {
+
+        final List<BlockingState> input = new ArrayList<BlockingState>();
+
+        final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
+        final DateTime testInit = new DateTime(2017, 04, 29, 14, 15, 53, tz);
+        clock.setTime(testInit);
+        input.add(createBillingBlockingState(BlockingStateType.SUBSCRIPTION_BUNDLE, false, testInit));
+        input.add(createBillingBlockingState(BlockingStateType.ACCOUNT, true, testInit.plusDays(1)));
+        input.add(createBillingBlockingState(BlockingStateType.SUBSCRIPTION_BUNDLE, true, testInit.plusDays(2)));
+
+        final BlockingStateService test = new BlockingStateService();
+        for (BlockingState cur : input) {
+            test.addBlockingState(cur);
+        }
+        final List<DisabledDuration> result = test.build();
+
+        final List<DisabledDuration> expected = ImmutableList.of(new DisabledDuration(testInit.plusDays(2), null),
+                                                                 new DisabledDuration(testInit.plusDays(1), null));
+
+        verify(result, expected);
+    }
+
+
+
+
+    private void verify(final List<DisabledDuration> actual, final List<DisabledDuration> expected) {
+        assertEquals(expected.size(), actual.size());
+        for (int i = 0; i < actual.size(); i++) {
+            boolean found = false;
+            for (int j = 0; j < expected.size(); j++) {
+                if (actual.get(i).equals(expected.get(j))) {
+                    found = true;
+                    break;
+                }
+            }
+            assertTrue(found);
+        }
+    }
+
+    private BlockingState createBillingBlockingState(final BlockingStateType type, final boolean blockBilling, final DateTime effectiveDate) {
+        final UUID blockedId;
+        switch(type) {
+            case ACCOUNT:
+                blockedId = accountId;
+                break;
+            case SUBSCRIPTION_BUNDLE:
+                blockedId = bundleId;
+                break;
+            case SUBSCRIPTION:
+                blockedId = subscriptionId;
+                break;
+            default:
+                throw new IllegalStateException("Unexpexted type");
+        }
+        return new DefaultBlockingState(blockedId, type, UUID.randomUUID().toString(), "SVC", false, false, blockBilling, effectiveDate);
+    }
+
+}
\ No newline at end of file
diff --git a/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestDefaultInternalBillingApi.java b/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestDefaultInternalBillingApi.java
index f72fb2f..c8c99f1 100644
--- a/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestDefaultInternalBillingApi.java
+++ b/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestDefaultInternalBillingApi.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -20,17 +22,14 @@ import java.util.List;
 
 import org.joda.time.DateTime;
 import org.joda.time.LocalDate;
-import org.killbill.billing.payment.api.PluginProperty;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
+import org.joda.time.Period;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.api.TestApiListener.NextEvent;
-import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.catalog.api.BillingPeriod;
 import org.killbill.billing.catalog.api.PlanPhaseSpecifier;
 import org.killbill.billing.catalog.api.PriceListSet;
-import org.killbill.billing.catalog.api.ProductCategory;
 import org.killbill.billing.entitlement.EntitlementService;
 import org.killbill.billing.entitlement.api.BlockingStateType;
 import org.killbill.billing.entitlement.api.DefaultEntitlementApi;
@@ -38,8 +37,11 @@ import org.killbill.billing.entitlement.api.Entitlement;
 import org.killbill.billing.junction.BillingEvent;
 import org.killbill.billing.junction.DefaultBlockingState;
 import org.killbill.billing.junction.JunctionTestSuiteWithEmbeddedDB;
+import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.subscription.api.SubscriptionBase;
 import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableList;
 
@@ -49,7 +51,7 @@ public class TestDefaultInternalBillingApi extends JunctionTestSuiteWithEmbedded
     // The invocationCount > 0 was to trigger an issue where events would come out-of-order randomly.
     // While the bug shouldn't occur anymore, we're keeping it just in case (the test will also try to insert the events out-of-order manually).
     // This test also checks we don't generate billing events for blocking durations less than a day (https://github.com/killbill/killbill/issues/267).
-    @Test(groups = "slow", description = "Check blocking states with same effective date are correctly handled", invocationCount = 10, enabled = false)
+    @Test(groups = "slow", description = "Check blocking states with same effective date are correctly handled", invocationCount = 10)
     public void testBlockingStatesWithSameEffectiveDate() throws Exception {
         final LocalDate initialDate = new LocalDate(2013, 8, 7);
         clock.setDay(initialDate);
@@ -187,6 +189,15 @@ public class TestDefaultInternalBillingApi extends JunctionTestSuiteWithEmbedded
     // See https://github.com/killbill/killbill/commit/92042843e38a67f75495b207385e4c1f9ca60990#commitcomment-4749967
     @Test(groups = "slow", description = "Check unblock then block states with same effective date are correctly handled", invocationCount = 10)
     public void testUnblockThenBlockBlockingStatesWithSameEffectiveDate() throws Exception {
+        testUnblockThenBlockBlockingStatesWithSimilarEffectiveDate(Seconds.ZERO);
+    }
+
+    @Test(groups = "slow", description = "Check unblock then block states with almost the same effective date are correctly handled", invocationCount = 10)
+    public void testUnblockThenBlockBlockingStatesWithAlmostSameEffectiveDate() throws Exception {
+        testUnblockThenBlockBlockingStatesWithSimilarEffectiveDate(Seconds.ONE);
+    }
+
+    private void testUnblockThenBlockBlockingStatesWithSimilarEffectiveDate(final ReadablePeriod delay) throws Exception {
         final LocalDate initialDate = new LocalDate(2013, 8, 7);
         clock.setDay(initialDate);
 
@@ -198,7 +209,10 @@ public class TestDefaultInternalBillingApi extends JunctionTestSuiteWithEmbedded
         final SubscriptionBase subscription = subscriptionInternalApi.getSubscriptionFromId(entitlement.getId(), internalCallContext);
         assertListenerStatus();
 
-        final DateTime block1Date = clock.getUTCNow();
+        final DateTime block1Date = subscription.getStartDate().plus(delay);
+        // Make sure to update the clock here, because we don't disable for periods less than a day
+        clock.setTime(block1Date);
+
         testListener.pushExpectedEvents(NextEvent.BLOCK);
         final DefaultBlockingState state1 = new DefaultBlockingState(account.getId(),
                                                                      BlockingStateType.ACCOUNT,
@@ -239,13 +253,17 @@ public class TestDefaultInternalBillingApi extends JunctionTestSuiteWithEmbedded
         clock.addDays(3);
         assertListenerStatus();
 
-        // Expected blocking duration:
-        // * 2013-08-07 to now [2013-08-07 to 2013-08-08 then 2013-08-08 to now]
         final List<BillingEvent> events = ImmutableList.<BillingEvent>copyOf(billingInternalApi.getBillingEventsForAccountAndUpdateAccountBCD(account.getId(), null, internalCallContext));
-        Assert.assertEquals(events.size(), 2);
-        Assert.assertEquals(events.get(0).getTransitionType(), SubscriptionBaseTransitionType.CREATE);
-        Assert.assertEquals(events.get(0).getEffectiveDate(), subscription.getStartDate());
-        Assert.assertEquals(events.get(1).getTransitionType(), SubscriptionBaseTransitionType.START_BILLING_DISABLED);
-        Assert.assertEquals(events.get(1).getEffectiveDate(), block1Date);
+        if (delay.toPeriod().toStandardDuration().compareTo(Period.ZERO.toStandardDuration()) == 0) {
+            Assert.assertEquals(events.size(), 0);
+        } else {
+            // Expected blocking duration:
+            // * 2013-08-07 to now [2013-08-07 to 2013-08-08 then 2013-08-08 to now]
+            Assert.assertEquals(events.size(), 2);
+            Assert.assertEquals(events.get(0).getTransitionType(), SubscriptionBaseTransitionType.CREATE);
+            Assert.assertEquals(events.get(0).getEffectiveDate(), subscription.getStartDate());
+            Assert.assertEquals(events.get(1).getTransitionType(), SubscriptionBaseTransitionType.START_BILLING_DISABLED);
+            Assert.assertEquals(events.get(1).getEffectiveDate(), block1Date);
+        }
     }
 }
diff --git a/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestDisabledDuration.java b/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestDisabledDuration.java
new file mode 100644
index 0000000..64f323b
--- /dev/null
+++ b/junction/src/test/java/org/killbill/billing/junction/plumbing/billing/TestDisabledDuration.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.junction.plumbing.billing;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.junction.JunctionTestSuiteNoDB;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+public class TestDisabledDuration extends JunctionTestSuiteNoDB {
+
+    @Test(groups = "fast")
+    public void testCompare0() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusHours(10));
+        final DisabledDuration d2 = new DisabledDuration(now, now.plusHours(10));
+        assertEquals(d1.compareTo(d2), 0);
+        assertTrue(d1.equals(d2));
+    }
+
+    @Test(groups = "fast")
+    public void testCompare1() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusHours(10));
+        final DisabledDuration d2 = new DisabledDuration(now.plusSeconds(1), now.plusHours(10));
+        assertEquals(d1.compareTo(d2), -1);
+        assertEquals(d2.compareTo(d1), 1);
+    }
+
+    @Test(groups = "fast")
+    public void testCompare2() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusHours(10));
+        final DisabledDuration d2 = new DisabledDuration(now.plusSeconds(1), now.plusHours(10));
+        assertEquals(d1.compareTo(d2), -1);
+        assertEquals(d2.compareTo(d1), 1);
+    }
+
+    @Test(groups = "fast")
+    public void testCompare3() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusDays(1));
+        final DisabledDuration d2 = new DisabledDuration(now, now.plusDays(2));
+        assertEquals(d1.compareTo(d2), -1);
+        assertEquals(d2.compareTo(d1), 1);
+    }
+
+    @Test(groups = "fast")
+    public void testCompare4() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusDays(1));
+        final DisabledDuration d2 = new DisabledDuration(now, null);
+        assertEquals(d1.compareTo(d2), -1);
+        assertEquals(d2.compareTo(d1), 1);
+    }
+
+    @Test(groups = "fast")
+    public void testCompare5() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, null);
+        final DisabledDuration d2 = new DisabledDuration(now, now.plusDays(1));
+        assertEquals(d1.compareTo(d2), 1);
+        assertEquals(d2.compareTo(d1), -1);
+    }
+
+
+
+    // Case 1: this contained into o => false
+    // |---------|       this
+    // |--------------|  o
+    @Test(groups = "fast")
+    public void testDisjoint1() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusDays(1));
+        final DisabledDuration d2 = new DisabledDuration(now, now.plusDays(2));
+        assertFalse(d1.isDisjoint(d2));
+    }
+
+    // Case 2: this overlaps with o => false
+    // |---------|            this
+    //      |--------------|  o
+    @Test(groups = "fast")
+    public void testDisjoint2() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusDays(10));
+        final DisabledDuration d2 = new DisabledDuration(now.plusDays(1), now.plusDays(12));
+        assertFalse(d1.isDisjoint(d2));
+    }
+
+    // Case 3: o contains into this => false
+    // |---------| this
+    //      |---|  o
+    @Test(groups = "fast")
+    public void testDisjoint3() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusDays(10));
+        final DisabledDuration d2 = new DisabledDuration(now.plusDays(1), now.plusDays(4));
+        assertFalse(d1.isDisjoint(d2));
+    }
+
+    // Case 4: this and o are adjacent => false
+    // |---------| this
+    //           |---|  o
+    @Test(groups = "fast")
+    public void testDisjoint4() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusDays(10));
+        final DisabledDuration d2 = new DisabledDuration(now.plusDays(10), now.plusDays(15));
+        assertFalse(d1.isDisjoint(d2));
+    }
+
+    // Case 5: this and o are disjoint => true
+    // |---------| this
+    //             |---|  o
+    @Test(groups = "fast")
+    public void testDisjoint5() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusDays(10));
+        final DisabledDuration d2 = new DisabledDuration(now.plusDays(11), now.plusDays(15));
+        assertTrue(d1.isDisjoint(d2));
+    }
+
+
+    @Test(groups = "fast")
+    public void testMergeDuration1() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusDays(10));
+        final DisabledDuration d2 = new DisabledDuration(now.plusDays(1), now.plusDays(15));
+
+        final DisabledDuration result = DisabledDuration.mergeDuration(d1, d2);
+        assertEquals(result.getStart().compareTo(now), 0);
+        assertEquals(result.getEnd().compareTo(now.plusDays(15)), 0);
+    }
+
+    @Test(groups = "fast")
+    public void testMergeDuration2() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusDays(15));
+        final DisabledDuration d2 = new DisabledDuration(now.plusDays(1), now.plusDays(10));
+
+        final DisabledDuration result = DisabledDuration.mergeDuration(d1, d2);
+        assertEquals(result.getStart().compareTo(now), 0);
+        assertEquals(result.getEnd().compareTo(now.plusDays(15)), 0);
+    }
+
+    @Test(groups = "fast")
+    public void testMergeDuration3() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, null);
+        final DisabledDuration d2 = new DisabledDuration(now.plusDays(1), now.plusDays(10));
+
+        final DisabledDuration result = DisabledDuration.mergeDuration(d1, d2);
+        assertEquals(result.getStart().compareTo(now), 0);
+        assertNull(result.getEnd());
+    }
+
+    @Test(groups = "fast")
+    public void testMergeDuration4() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusDays(10));
+        final DisabledDuration d2 = new DisabledDuration(now.plusDays(1), null);
+
+        final DisabledDuration result = DisabledDuration.mergeDuration(d1, d2);
+        assertEquals(result.getStart().compareTo(now), 0);
+        assertNull(result.getEnd());
+    }
+
+    @Test(groups = "fast", expectedExceptions = IllegalStateException.class)
+    public void testMergeDurationInvalid1() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now, now.plusDays(10));
+        final DisabledDuration d2 = new DisabledDuration(now.plusDays(11), null);
+
+        DisabledDuration.mergeDuration(d1, d2);
+    }
+
+    @Test(groups = "fast", expectedExceptions = IllegalStateException.class)
+    public void testMergeDurationInvalid2() throws Exception {
+        final DateTime now = clock.getUTCNow();
+        final DisabledDuration d1 = new DisabledDuration(now.plusDays(1), now.plusDays(10));
+        final DisabledDuration d2 = new DisabledDuration(now, null);
+
+        DisabledDuration.mergeDuration(d1, d2);
+    }
+
+}
\ No newline at end of file
diff --git a/subscription/src/test/java/org/killbill/billing/subscription/api/user/TestUserApiChangePlan.java b/subscription/src/test/java/org/killbill/billing/subscription/api/user/TestUserApiChangePlan.java
index 613bb69..57d5026 100644
--- a/subscription/src/test/java/org/killbill/billing/subscription/api/user/TestUserApiChangePlan.java
+++ b/subscription/src/test/java/org/killbill/billing/subscription/api/user/TestUserApiChangePlan.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -16,12 +18,17 @@
 
 package org.killbill.billing.subscription.api.user;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.LocalDate;
 import org.killbill.billing.ErrorCode;
 import org.killbill.billing.api.TestApiListener.NextEvent;
-import org.killbill.billing.catalog.api.BillingActionPolicy;
 import org.killbill.billing.catalog.api.BillingPeriod;
 import org.killbill.billing.catalog.api.Duration;
 import org.killbill.billing.catalog.api.PhaseType;
@@ -49,14 +56,8 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -477,7 +478,6 @@ public class TestUserApiChangePlan extends SubscriptionTestSuiteWithEmbeddedDB {
         }
     }
 
-
     @Test(groups = "slow")
     public void testChangePlanOnPendingSubscription() throws SubscriptionBaseApiException {
 
@@ -559,4 +559,40 @@ public class TestUserApiChangePlan extends SubscriptionTestSuiteWithEmbeddedDB {
         assertEquals(subscription2.getEvents().size(), subscription.getEvents().size());
     }
 
+    @Test(groups = "slow")
+    public void testChangePlanOnCreate() throws SubscriptionBaseApiException {
+        final DefaultSubscriptionBase subscription = testUtil.createSubscription(bundle, "Shotgun", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME);
+
+        // CHANGE PLAN IMMEDIATELY: the CHANGE event will be transformed into a CREATE
+        testListener.pushExpectedEvent(NextEvent.CREATE);
+        subscription.changePlanWithDate(new PlanSpecifier("Pistol", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME), null, subscription.getStartDate(), callContext);
+        assertListenerStatus();
+
+        checkChangePlan(subscription, "Pistol", ProductCategory.BASE, BillingPeriod.MONTHLY, PhaseType.TRIAL);
+
+        final SubscriptionBase refreshedSubscription = subscriptionInternalApi.getSubscriptionFromId(subscription.getId(), internalCallContext);
+        assertEquals(refreshedSubscription.getAllTransitions().size(), 2);
+        assertEquals(refreshedSubscription.getAllTransitions().get(0).getTransitionType(), SubscriptionBaseTransitionType.CREATE);
+        assertEquals(refreshedSubscription.getAllTransitions().get(1).getTransitionType(), SubscriptionBaseTransitionType.PHASE);
+    }
+
+    @Test(groups = "slow")
+    public void testChangePlanRightAfterCreate() throws SubscriptionBaseApiException {
+        final DefaultSubscriptionBase subscription = testUtil.createSubscription(bundle, "Shotgun", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME);
+
+        clock.setTime(clock.getUTCNow().plusSeconds(1));
+
+        // CHANGE PLAN ALMOST IMMEDIATELY
+        testListener.pushExpectedEvent(NextEvent.CHANGE);
+        subscription.changePlan(new PlanSpecifier("Pistol", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME), null, callContext);
+        assertListenerStatus();
+
+        checkChangePlan(subscription, "Pistol", ProductCategory.BASE, BillingPeriod.MONTHLY, PhaseType.TRIAL);
+
+        final SubscriptionBase refreshedSubscription = subscriptionInternalApi.getSubscriptionFromId(subscription.getId(), internalCallContext);
+        assertEquals(refreshedSubscription.getAllTransitions().size(), 3);
+        assertEquals(refreshedSubscription.getAllTransitions().get(0).getTransitionType(), SubscriptionBaseTransitionType.CREATE);
+        assertEquals(refreshedSubscription.getAllTransitions().get(1).getTransitionType(), SubscriptionBaseTransitionType.CHANGE);
+        assertEquals(refreshedSubscription.getAllTransitions().get(2).getTransitionType(), SubscriptionBaseTransitionType.PHASE);
+    }
 }