killbill-memoizeit

invoice: implement mechanism to park accounts Details at

12/22/2016 1:18:52 PM

Details

diff --git a/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java b/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
index a9aa2dc..8f56d81 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
@@ -22,6 +22,7 @@ import org.killbill.billing.glue.InvoiceModule;
 import org.killbill.billing.invoice.InvoiceDispatcher;
 import org.killbill.billing.invoice.InvoiceListener;
 import org.killbill.billing.invoice.InvoiceTagHandler;
+import org.killbill.billing.invoice.ParkedAccountsManager;
 import org.killbill.billing.invoice.api.DefaultInvoiceService;
 import org.killbill.billing.invoice.api.InvoiceApiHelper;
 import org.killbill.billing.invoice.api.InvoiceInternalApi;
@@ -164,5 +165,6 @@ public class DefaultInvoiceModule extends KillBillModule implements InvoiceModul
         installResourceBundleFactory();
         bind(RawUsageOptimizer.class).asEagerSingleton();
         bind(InvoiceApiHelper.class).asEagerSingleton();
+        bind(ParkedAccountsManager.class).asEagerSingleton();
     }
 }
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
index cd2c7da..4ad8b4f 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
@@ -93,6 +93,7 @@ import org.killbill.billing.subscription.api.SubscriptionBaseInternalApi;
 import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
 import org.killbill.billing.subscription.api.user.SubscriptionBaseApiException;
 import org.killbill.billing.util.UUIDs;
+import org.killbill.billing.util.api.TagApiException;
 import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.TenantContext;
@@ -144,6 +145,7 @@ public class InvoiceDispatcher {
     private final Clock clock;
     private final NotificationQueueService notificationQueueService;
     private final InvoiceConfig invoiceConfig;
+    private final ParkedAccountsManager parkedAccountsManager;
 
     @Inject
     public InvoiceDispatcher(final InvoiceGenerator generator,
@@ -158,7 +160,8 @@ public class InvoiceDispatcher {
                              final PersistentBus eventBus,
                              final NotificationQueueService notificationQueueService,
                              final InvoiceConfig invoiceConfig,
-                             final Clock clock) {
+                             final Clock clock,
+                             final ParkedAccountsManager parkedAccountsManager) {
         this.generator = generator;
         this.billingApi = billingApi;
         this.subscriptionApi = SubscriptionApi;
@@ -172,6 +175,7 @@ public class InvoiceDispatcher {
         this.clock = clock;
         this.notificationQueueService = notificationQueueService;
         this.invoiceConfig = invoiceConfig;
+        this.parkedAccountsManager = parkedAccountsManager;
     }
 
     public void processSubscriptionForInvoiceGeneration(final EffectiveSubscriptionInternalEvent transition,
@@ -215,13 +219,34 @@ public class InvoiceDispatcher {
         }
     }
 
-    public Invoice processAccount(final UUID accountId, @Nullable final LocalDate targetDate,
-                                  @Nullable final DryRunArguments dryRunArguments, final InternalCallContext context) throws InvoiceApiException {
+    public Invoice processAccount(final UUID accountId,
+                                  @Nullable final LocalDate targetDate,
+                                  @Nullable final DryRunArguments dryRunArguments,
+                                  final InternalCallContext context) throws InvoiceApiException {
+        return processAccount(false, accountId, targetDate, dryRunArguments, context);
+    }
+
+    public Invoice processAccount(final boolean isApiCall,
+                                  final UUID accountId,
+                                  @Nullable final LocalDate targetDate,
+                                  @Nullable final DryRunArguments dryRunArguments,
+                                  final InternalCallContext context) throws InvoiceApiException {
+        boolean parkedAccount = false;
+        try {
+            parkedAccount = parkedAccountsManager.isParked(accountId, context);
+            if (parkedAccount && !isApiCall) {
+                log.warn("Ignoring invoice generation process for accountId='{}', targetDate='{}', account is parked", accountId.toString(), targetDate);
+                return null;
+            }
+        } catch (final TagApiException e) {
+            log.warn("Unable to determine parking state for accountId='{}'", accountId);
+        }
+
         GlobalLock lock = null;
         try {
             lock = locker.lockWithNumberOfTries(LockerType.ACCNT_INV_PAY.toString(), accountId.toString(), invoiceConfig.getMaxGlobalLockRetries());
 
-            return processAccountWithLock(accountId, targetDate, dryRunArguments, context);
+            return processAccountWithLock(parkedAccount, accountId, targetDate, dryRunArguments, context);
         } catch (final LockFailedException e) {
             log.warn("Failed to process invoice for accountId='{}', targetDate='{}'", accountId.toString(), targetDate, e);
         } finally {
@@ -232,8 +257,11 @@ public class InvoiceDispatcher {
         return null;
     }
 
-    private Invoice processAccountWithLock(final UUID accountId, @Nullable final LocalDate inputTargetDateMaybeNull,
-                                           @Nullable final DryRunArguments dryRunArguments, final InternalCallContext context) throws InvoiceApiException {
+    private Invoice processAccountWithLock(final boolean parkedAccount,
+                                           final UUID accountId,
+                                           @Nullable final LocalDate inputTargetDateMaybeNull,
+                                           @Nullable final DryRunArguments dryRunArguments,
+                                           final InternalCallContext context) throws InvoiceApiException {
         final boolean isDryRun = dryRunArguments != null;
         final boolean upcomingInvoiceDryRun = isDryRun && DryRunType.UPCOMING_INVOICE.equals(dryRunArguments.getDryRunType());
 
@@ -258,6 +286,16 @@ public class InvoiceDispatcher {
                 final Invoice invoice = processAccountWithLockAndInputTargetDate(accountId, curTargetDate, billingEvents, isDryRun, context);
                 if (invoice != null) {
                     filterInvoiceItemsForDryRun(filteredSubscriptionIdsForDryRun, invoice);
+
+                    if (!isDryRun && parkedAccount) {
+                        try {
+                            log.info("Illegal invoicing state fixed for accountId='{}', unparking account", accountId);
+                            parkedAccountsManager.unparkAccount(accountId, context);
+                        } catch (final TagApiException ignored) {
+                            log.warn("Unable to unpark account", ignored);
+                        }
+                    }
+
                     return invoice;
                 }
             }
@@ -271,6 +309,16 @@ public class InvoiceDispatcher {
         } catch (final SubscriptionBaseApiException e) {
             log.warn("Failed to retrieve BillingEvents for accountId='{}', dryRunArguments='{}'", accountId, dryRunArguments, e);
             return null;
+        } catch (final InvoiceApiException e) {
+            if (e.getCode() == ErrorCode.UNEXPECTED_ERROR.getCode() && !isDryRun) {
+                log.warn("Illegal invoicing state detected for accountId='{}', dryRunArguments='{}', parking account", accountId, dryRunArguments, e);
+                try {
+                    parkedAccountsManager.parkAccount(accountId, context);
+                } catch (final TagApiException ignored) {
+                    log.warn("Unable to park account", ignored);
+                }
+            }
+            throw e;
         }
     }
 
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/ParkedAccountsManager.java b/invoice/src/main/java/org/killbill/billing/invoice/ParkedAccountsManager.java
new file mode 100644
index 0000000..c03c993
--- /dev/null
+++ b/invoice/src/main/java/org/killbill/billing/invoice/ParkedAccountsManager.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.invoice;
+
+import java.util.UUID;
+
+import org.killbill.billing.ObjectType;
+import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.util.api.TagApiException;
+import org.killbill.billing.util.api.TagDefinitionApiException;
+import org.killbill.billing.util.api.TagUserApi;
+import org.killbill.billing.util.cache.Cachable.CacheType;
+import org.killbill.billing.util.cache.CacheControllerDispatcher;
+import org.killbill.billing.util.callcontext.CallContext;
+import org.killbill.billing.util.callcontext.CallOrigin;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.billing.util.dao.NonEntityDao;
+import org.killbill.billing.util.tag.Tag;
+import org.killbill.billing.util.tag.dao.TagDefinitionDao;
+import org.killbill.billing.util.tag.dao.TagDefinitionModelDao;
+import org.killbill.clock.Clock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.inject.Inject;
+
+public class ParkedAccountsManager {
+
+    @VisibleForTesting
+    static final String PARK = "__PARK__";
+
+    private final TagUserApi tagUserApi;
+    private final TagDefinitionDao tagDefinitionDao;
+    private final NonEntityDao nonEntityDao;
+    private final CacheControllerDispatcher cacheControllerDispatcher;
+    private /* final */ UUID tagDefinitionId;
+
+    @Inject
+    public ParkedAccountsManager(final TagUserApi tagUserApi,
+                                 final TagDefinitionDao tagDefinitionDao,
+                                 final NonEntityDao nonEntityDao,
+                                 final CacheControllerDispatcher cacheControllerDispatcher,
+                                 final Clock clock) throws TagDefinitionApiException {
+        this.tagUserApi = tagUserApi;
+        this.tagDefinitionDao = tagDefinitionDao;
+        this.nonEntityDao = nonEntityDao;
+        this.cacheControllerDispatcher = cacheControllerDispatcher;
+
+        retrieveOrCreateParkTagDefinition(clock);
+    }
+
+    public void parkAccount(final UUID accountId, final InternalCallContext internalCallContext) throws TagApiException {
+        final CallContext callContext = createCallContext(internalCallContext);
+        tagUserApi.addTag(accountId, ObjectType.ACCOUNT, tagDefinitionId, callContext);
+    }
+
+    public void unparkAccount(final UUID accountId, final InternalCallContext internalCallContext) throws TagApiException {
+        final CallContext callContext = createCallContext(internalCallContext);
+        tagUserApi.removeTag(accountId, ObjectType.ACCOUNT, tagDefinitionId, callContext);
+    }
+
+    public boolean isParked(final UUID accountId, final InternalCallContext internalCallContext) throws TagApiException {
+        final CallContext callContext = createCallContext(internalCallContext);
+        return Iterables.<Tag>tryFind(tagUserApi.getTagsForAccount(accountId, false, callContext),
+                                      new Predicate<Tag>() {
+                                          @Override
+                                          public boolean apply(final Tag input) {
+                                              return tagDefinitionId.equals(input.getTagDefinitionId());
+                                          }
+                                      }).orNull() != null;
+    }
+
+    // TODO Consider creating a tag internal API to avoid this
+    private CallContext createCallContext(final InternalCallContext internalCallContext) {
+        final UUID tenantId = nonEntityDao.retrieveIdFromObject(internalCallContext.getTenantRecordId(),
+                                                                ObjectType.TENANT,
+                                                                cacheControllerDispatcher.getCacheController(CacheType.OBJECT_ID));
+        return internalCallContext.toCallContext(tenantId);
+    }
+
+    @VisibleForTesting
+    void retrieveOrCreateParkTagDefinition(final Clock clock) throws TagDefinitionApiException {
+        final InternalCallContext callContext = new InternalCallContext(InternalCallContextFactory.INTERNAL_TENANT_RECORD_ID,
+                                                                        null,
+                                                                        null,
+                                                                        null,
+                                                                        UUID.randomUUID(),
+                                                                        ParkedAccountsManager.class.getName(),
+                                                                        CallOrigin.INTERNAL,
+                                                                        UserType.SYSTEM,
+                                                                        null,
+                                                                        null,
+                                                                        clock.getUTCNow(),
+                                                                        clock.getUTCNow());
+        // Need to use the DAO directly to bypass validations
+        TagDefinitionModelDao tagDefinitionModelDao = tagDefinitionDao.getByName(PARK, callContext);
+        if (tagDefinitionModelDao == null) {
+            tagDefinitionModelDao = tagDefinitionDao.create(PARK, "Accounts with invalid invoicing state", callContext);
+        }
+        this.tagDefinitionId = tagDefinitionModelDao.getId();
+    }
+}
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/glue/TestInvoiceModule.java b/invoice/src/test/java/org/killbill/billing/invoice/glue/TestInvoiceModule.java
index 049446e..bc4ddeb 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/glue/TestInvoiceModule.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/glue/TestInvoiceModule.java
@@ -60,7 +60,6 @@ public class TestInvoiceModule extends DefaultInvoiceModule {
         install(new MockTenantModule(configSource));
 
 
-        install(new TagStoreModule(configSource));
         install(new CustomFieldModule(configSource));
         install(new UsageModule(configSource));
         installExternalApis();
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/glue/TestInvoiceModuleNoDB.java b/invoice/src/test/java/org/killbill/billing/invoice/glue/TestInvoiceModuleNoDB.java
index cf36e03..278a808 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/glue/TestInvoiceModuleNoDB.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/glue/TestInvoiceModuleNoDB.java
@@ -37,6 +37,7 @@ import org.killbill.billing.invoice.dao.MockInvoiceDao;
 import org.killbill.billing.mock.api.MockAccountUserApi;
 import org.killbill.billing.mock.glue.MockAccountModule;
 import org.killbill.billing.mock.glue.MockNonEntityDaoModule;
+import org.killbill.billing.mock.glue.MockTagModule;
 import org.killbill.billing.platform.api.KillbillConfigSource;
 import org.mockito.Mockito;
 
@@ -55,7 +56,7 @@ public class TestInvoiceModuleNoDB extends TestInvoiceModule {
         super.configure();
         install(new GuicyKillbillTestNoDBModule(configSource));
         install(new MockNonEntityDaoModule(configSource));
-
+        install(new MockTagModule(configSource));
         install(new MockAccountModule(configSource));
 
         installCurrencyConversionApi();
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/glue/TestInvoiceModuleWithEmbeddedDb.java b/invoice/src/test/java/org/killbill/billing/invoice/glue/TestInvoiceModuleWithEmbeddedDb.java
index 0e161e0..626a257 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/glue/TestInvoiceModuleWithEmbeddedDb.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/glue/TestInvoiceModuleWithEmbeddedDb.java
@@ -25,6 +25,7 @@ import org.killbill.billing.invoice.InvoiceListener;
 import org.killbill.billing.invoice.TestInvoiceNotificationQListener;
 import org.killbill.billing.platform.api.KillbillConfigSource;
 import org.killbill.billing.util.glue.NonEntityDaoModule;
+import org.killbill.billing.util.glue.TagStoreModule;
 import org.mockito.Mockito;
 
 public class TestInvoiceModuleWithEmbeddedDb extends TestInvoiceModule {
@@ -45,6 +46,7 @@ public class TestInvoiceModuleWithEmbeddedDb extends TestInvoiceModule {
         install(new DefaultAccountModule(configSource));
         install(new GuicyKillbillTestWithEmbeddedDBModule(configSource));
         install(new NonEntityDaoModule(configSource));
+        install(new TagStoreModule(configSource));
 
         bind(CurrencyConversionApi.class).toInstance(Mockito.mock(CurrencyConversionApi.class));
     }
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/InvoiceTestSuiteWithEmbeddedDB.java b/invoice/src/test/java/org/killbill/billing/invoice/InvoiceTestSuiteWithEmbeddedDB.java
index 487fd07..f5eceaa 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/InvoiceTestSuiteWithEmbeddedDB.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/InvoiceTestSuiteWithEmbeddedDB.java
@@ -109,6 +109,8 @@ public abstract class InvoiceTestSuiteWithEmbeddedDB extends GuicyKillbillTestSu
     protected InvoicePluginDispatcher invoicePluginDispatcher;
     @Inject
     protected InvoiceConfig invoiceConfig;
+    @Inject
+    protected ParkedAccountsManager parkedAccountsManager;
 
     @Override
     protected KillbillConfigSource getConfigSource() {
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceDispatcher.java b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceDispatcher.java
index cfedd3c..f22a72b 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceDispatcher.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceDispatcher.java
@@ -24,8 +24,10 @@ import java.util.UUID;
 
 import org.joda.time.DateTime;
 import org.joda.time.LocalDate;
+import org.killbill.billing.ErrorCode;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.account.api.AccountApiException;
+import org.killbill.billing.callcontext.DefaultTenantContext;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.catalog.MockPlan;
 import org.killbill.billing.catalog.MockPlanPhase;
@@ -36,6 +38,8 @@ import org.killbill.billing.catalog.api.Currency;
 import org.killbill.billing.catalog.api.PhaseType;
 import org.killbill.billing.catalog.api.Plan;
 import org.killbill.billing.catalog.api.PlanPhase;
+import org.killbill.billing.invoice.InvoiceDispatcher.FutureAccountNotifications;
+import org.killbill.billing.invoice.InvoiceDispatcher.FutureAccountNotifications.SubscriptionNotification;
 import org.killbill.billing.invoice.TestInvoiceHelper.DryRunFutureDateArguments;
 import org.killbill.billing.invoice.api.DryRunArguments;
 import org.killbill.billing.invoice.api.Invoice;
@@ -43,17 +47,26 @@ import org.killbill.billing.invoice.api.InvoiceApiException;
 import org.killbill.billing.invoice.api.InvoiceItem;
 import org.killbill.billing.invoice.api.InvoiceItemType;
 import org.killbill.billing.invoice.api.InvoiceNotifier;
+import org.killbill.billing.invoice.dao.InvoiceItemModelDao;
 import org.killbill.billing.invoice.dao.InvoiceModelDao;
 import org.killbill.billing.invoice.notification.NullInvoiceNotifier;
 import org.killbill.billing.junction.BillingEventSet;
 import org.killbill.billing.subscription.api.SubscriptionBase;
 import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
 import org.killbill.billing.subscription.api.user.SubscriptionBaseApiException;
+import org.killbill.billing.util.api.TagDefinitionApiException;
+import org.killbill.billing.util.tag.Tag;
+import org.killbill.billing.util.tag.TagDefinition;
 import org.mockito.Mockito;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
 public class TestInvoiceDispatcher extends InvoiceTestSuiteWithEmbeddedDB {
 
     private Account account;
@@ -67,6 +80,8 @@ public class TestInvoiceDispatcher extends InvoiceTestSuiteWithEmbeddedDB {
         account = invoiceUtil.createAccount(callContext);
         subscription = invoiceUtil.createSubscription();
         context = internalCallContextFactory.createInternalCallContext(account.getId(), callContext);
+        // Tests will delete the database entries, yet ParkedAccountsManager is injected once
+        parkedAccountsManager.retrieveOrCreateParkTagDefinition(clock);
     }
 
     @Test(groups = "slow")
@@ -90,7 +105,7 @@ public class TestInvoiceDispatcher extends InvoiceTestSuiteWithEmbeddedDB {
         final InvoiceNotifier invoiceNotifier = new NullInvoiceNotifier();
         final InvoiceDispatcher dispatcher = new InvoiceDispatcher(generator, accountApi, billingApi, subscriptionApi, invoiceDao,
                                                                    internalCallContextFactory, invoiceNotifier, invoicePluginDispatcher, locker, busService.getBus(),
-                                                                   null, invoiceConfig, clock);
+                                                                   null, invoiceConfig, clock, parkedAccountsManager);
 
         Invoice invoice = dispatcher.processAccount(accountId, target, new DryRunFutureDateArguments(), context);
         Assert.assertNotNull(invoice);
@@ -114,6 +129,150 @@ public class TestInvoiceDispatcher extends InvoiceTestSuiteWithEmbeddedDB {
     }
 
     @Test(groups = "slow")
+    public void testWithParking() throws InvoiceApiException, AccountApiException, CatalogApiException, SubscriptionBaseApiException, TagDefinitionApiException {
+        final UUID accountId = account.getId();
+
+        final BillingEventSet events = new MockBillingEventSet();
+        final Plan plan = MockPlan.createBicycleNoTrialEvergreen1USD();
+        final PlanPhase planPhase = MockPlanPhase.create1USDMonthlyEvergreen();
+        final DateTime effectiveDate = clock.getUTCNow().minusDays(1);
+        final Currency currency = Currency.USD;
+        final BigDecimal fixedPrice = null;
+        events.add(invoiceUtil.createMockBillingEvent(account, subscription, effectiveDate, plan, planPhase,
+                                                      fixedPrice, BigDecimal.ONE, currency, BillingPeriod.MONTHLY, 1,
+                                                      BillingMode.IN_ADVANCE, "", 1L, SubscriptionBaseTransitionType.CREATE));
+
+        Mockito.when(billingApi.getBillingEventsForAccountAndUpdateAccountBCD(Mockito.<UUID>any(), Mockito.<DryRunArguments>any(), Mockito.<InternalCallContext>any())).thenReturn(events);
+
+        final LocalDate target = internalCallContext.toLocalDate(effectiveDate);
+
+        final InvoiceNotifier invoiceNotifier = new NullInvoiceNotifier();
+        final InvoiceDispatcher dispatcher = new InvoiceDispatcher(generator, accountApi, billingApi, subscriptionApi, invoiceDao,
+                                                                   internalCallContextFactory, invoiceNotifier, invoicePluginDispatcher, locker, busService.getBus(),
+                                                                   null, invoiceConfig, clock, parkedAccountsManager);
+
+        // Verify the __PARK__ tag definition has been created
+        final TagDefinition tagDefinition = tagUserApi.getTagDefinitionForName(ParkedAccountsManager.PARK, new DefaultTenantContext(null));
+        Assert.assertNotNull(tagDefinition);
+
+        // Verify initial tags state for account
+        Assert.assertTrue(tagUserApi.getTagsForAccount(accountId, true, callContext).isEmpty());
+
+        // Create chaos on disk
+        final InvoiceModelDao invoiceModelDao = new InvoiceModelDao(accountId,
+                                                                    target,
+                                                                    target,
+                                                                    currency,
+                                                                    false);
+        final InvoiceItemModelDao invoiceItemModelDao1 = new InvoiceItemModelDao(clock.getUTCNow(),
+                                                                                 InvoiceItemType.RECURRING,
+                                                                                 invoiceModelDao.getId(),
+                                                                                 accountId,
+                                                                                 subscription.getBundleId(),
+                                                                                 subscription.getId(),
+                                                                                 "Bad data",
+                                                                                 plan.getName(),
+                                                                                 planPhase.getName(),
+                                                                                 null,
+                                                                                 effectiveDate.toLocalDate(),
+                                                                                 effectiveDate.plusMonths(1).toLocalDate(),
+                                                                                 BigDecimal.TEN,
+                                                                                 BigDecimal.ONE,
+                                                                                 currency,
+                                                                                 null);
+        final InvoiceItemModelDao invoiceItemModelDao2 = new InvoiceItemModelDao(clock.getUTCNow(),
+                                                                                 InvoiceItemType.RECURRING,
+                                                                                 invoiceModelDao.getId(),
+                                                                                 accountId,
+                                                                                 subscription.getBundleId(),
+                                                                                 subscription.getId(),
+                                                                                 "Bad data",
+                                                                                 plan.getName(),
+                                                                                 planPhase.getName(),
+                                                                                 null,
+                                                                                 effectiveDate.plusDays(1).toLocalDate(),
+                                                                                 effectiveDate.plusMonths(1).toLocalDate(),
+                                                                                 BigDecimal.TEN,
+                                                                                 BigDecimal.ONE,
+                                                                                 currency,
+                                                                                 null);
+        invoiceDao.createInvoice(invoiceModelDao,
+                                 ImmutableList.<InvoiceItemModelDao>of(invoiceItemModelDao1, invoiceItemModelDao2),
+                                 true,
+                                 new FutureAccountNotifications(ImmutableMap.<UUID, List<SubscriptionNotification>>of()),
+                                 context);
+
+        try {
+            dispatcher.processAccount(accountId, target, new DryRunFutureDateArguments(), context);
+            Assert.fail();
+        } catch (final InvoiceApiException e) {
+            Assert.assertEquals(e.getCode(), ErrorCode.UNEXPECTED_ERROR.getCode());
+            Assert.assertTrue(e.getCause().getMessage().startsWith("Double billing detected"));
+        }
+        // Dry-run: no side effect on disk
+        Assert.assertEquals(invoiceDao.getInvoicesByAccount(context).size(), 1);
+        Assert.assertTrue(tagUserApi.getTagsForAccount(accountId, true, callContext).isEmpty());
+
+        try {
+            dispatcher.processAccount(accountId, target, null, context);
+            Assert.fail();
+        } catch (final InvoiceApiException e) {
+            Assert.assertEquals(e.getCode(), ErrorCode.UNEXPECTED_ERROR.getCode());
+            Assert.assertTrue(e.getCause().getMessage().startsWith("Double billing detected"));
+        }
+        Assert.assertEquals(invoiceDao.getInvoicesByAccount(context).size(), 1);
+        // No dry-run: account is parked
+        final List<Tag> tags = tagUserApi.getTagsForAccount(accountId, false, callContext);
+        Assert.assertEquals(tags.size(), 1);
+        Assert.assertEquals(tags.get(0).getTagDefinitionId(), tagDefinition.getId());
+
+        // isApiCall=false
+        final Invoice nullInvoice1 = dispatcher.processAccount(accountId, target, null, context);
+        Assert.assertNull(nullInvoice1);
+
+        // No dry-run and isApiCall=true
+        try {
+            dispatcher.processAccount(true, accountId, target, null, context);
+            Assert.fail();
+        } catch (final InvoiceApiException e) {
+            Assert.assertEquals(e.getCode(), ErrorCode.UNEXPECTED_ERROR.getCode());
+            Assert.assertTrue(e.getCause().getMessage().startsWith("Double billing detected"));
+        }
+        // Idempotency
+        Assert.assertEquals(invoiceDao.getInvoicesByAccount(context).size(), 1);
+        Assert.assertEquals(tagUserApi.getTagsForAccount(accountId, false, callContext), tags);
+
+        // Fix state
+        dbi.withHandle(new HandleCallback<Void>() {
+            @Override
+            public Void withHandle(final Handle handle) throws Exception {
+                handle.execute("delete from invoices");
+                handle.execute("delete from invoice_items");
+                return null;
+            }
+        });
+
+        // Dry-run and isApiCall=false: still parked
+        final Invoice nullInvoice2 = dispatcher.processAccount(accountId, target, new DryRunFutureDateArguments(), context);
+        Assert.assertNull(nullInvoice2);
+
+        // Dry-run and isApiCall=true: call goes through
+        final Invoice invoice1 = dispatcher.processAccount(true, accountId, target, new DryRunFutureDateArguments(), context);
+        Assert.assertNotNull(invoice1);
+        Assert.assertEquals(invoiceDao.getInvoicesByAccount(context).size(), 0);
+        // Dry-run: still parked
+        Assert.assertEquals(tagUserApi.getTagsForAccount(accountId, false, callContext).size(), 1);
+
+        // No dry-run and isApiCall=true: call goes through
+        final Invoice invoice2 = dispatcher.processAccount(true, accountId, target, null, context);
+        Assert.assertNotNull(invoice2);
+        Assert.assertEquals(invoiceDao.getInvoicesByAccount(context).size(), 1);
+        // No dry-run: now unparked
+        Assert.assertEquals(tagUserApi.getTagsForAccount(accountId, false, callContext).size(), 0);
+        Assert.assertEquals(tagUserApi.getTagsForAccount(accountId, true, callContext).size(), 1);
+    }
+
+    @Test(groups = "slow")
     public void testWithOverdueEvents() throws Exception {
         final BillingEventSet events = new MockBillingEventSet();
 
@@ -143,7 +302,7 @@ public class TestInvoiceDispatcher extends InvoiceTestSuiteWithEmbeddedDB {
         final InvoiceNotifier invoiceNotifier = new NullInvoiceNotifier();
         final InvoiceDispatcher dispatcher = new InvoiceDispatcher(generator, accountApi, billingApi, subscriptionApi, invoiceDao,
                                                                    internalCallContextFactory, invoiceNotifier, invoicePluginDispatcher, locker, busService.getBus(),
-                                                                   null, invoiceConfig, clock);
+                                                                   null, invoiceConfig, clock, parkedAccountsManager);
 
         final Invoice invoice = dispatcher.processAccount(account.getId(), new LocalDate("2012-07-30"), null, context);
         Assert.assertNotNull(invoice);
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceHelper.java b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceHelper.java
index 8e22146..a7ec5de 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceHelper.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceHelper.java
@@ -162,6 +162,7 @@ public class TestInvoiceHelper {
     private final GlobalLocker locker;
     private final Clock clock;
     private final NonEntityDao nonEntityDao;
+    private final ParkedAccountsManager parkedAccountsManager;
     private final MutableInternalCallContext internalCallContext;
     private final InternalCallContextFactory internalCallContextFactory;
     private final InvoiceConfig invoiceConfig;
@@ -174,7 +175,7 @@ public class TestInvoiceHelper {
     public TestInvoiceHelper(final InvoiceGenerator generator, final IDBI dbi,
                              final BillingInternalApi billingApi, final AccountInternalApi accountApi, final ImmutableAccountInternalApi immutableAccountApi, final InvoicePluginDispatcher invoicePluginDispatcher, final AccountUserApi accountUserApi, final SubscriptionBaseInternalApi subscriptionApi, final BusService busService,
                              final InvoiceDao invoiceDao, final GlobalLocker locker, final Clock clock, final NonEntityDao nonEntityDao, final CacheControllerDispatcher cacheControllerDispatcher, final MutableInternalCallContext internalCallContext, final InvoiceConfig invoiceConfig,
-                             final InternalCallContextFactory internalCallContextFactory) {
+                             final ParkedAccountsManager parkedAccountsManager, final InternalCallContextFactory internalCallContextFactory) {
         this.generator = generator;
         this.billingApi = billingApi;
         this.accountApi = accountApi;
@@ -187,6 +188,7 @@ public class TestInvoiceHelper {
         this.locker = locker;
         this.clock = clock;
         this.nonEntityDao = nonEntityDao;
+        this.parkedAccountsManager = parkedAccountsManager;
         this.internalCallContext = internalCallContext;
         this.internalCallContextFactory = internalCallContextFactory;
         this.invoiceItemSqlDao = dbi.onDemand(InvoiceItemSqlDao.class);
@@ -213,7 +215,7 @@ public class TestInvoiceHelper {
         final InvoiceNotifier invoiceNotifier = new NullInvoiceNotifier();
         final InvoiceDispatcher dispatcher = new InvoiceDispatcher(generator, accountApi, billingApi, subscriptionApi,
                                                                    invoiceDao, internalCallContextFactory, invoiceNotifier, invoicePluginDispatcher, locker, busService.getBus(),
-                                                                   null, invoiceConfig, clock);
+                                                                   null, invoiceConfig, clock, parkedAccountsManager);
 
         Invoice invoice = dispatcher.processAccount(account.getId(), targetDate, new DryRunFutureDateArguments(), internalCallContext);
         Assert.assertNotNull(invoice);