killbill-memoizeit

Changes

account/pom.xml 2(+1 -1)

api/pom.xml 2(+1 -1)

beatrix/pom.xml 2(+1 -1)

catalog/pom.xml 2(+1 -1)

currency/pom.xml 2(+1 -1)

invoice/pom.xml 2(+1 -1)

jaxrs/pom.xml 2(+1 -1)

junction/pom.xml 2(+1 -1)

NEWS 3(+3 -0)

overdue/pom.xml 2(+1 -1)

payment/pom.xml 2(+1 -1)

payment/src/main/resources/org/killbill/billing/payment/dao/RefundSqlDao.sql.stg 87(+0 -87)

pom.xml 2(+1 -1)

profiles/pom.xml 2(+1 -1)

tenant/pom.xml 2(+1 -1)

usage/pom.xml 2(+1 -1)

util/pom.xml 2(+1 -1)

Details

account/pom.xml 2(+1 -1)

diff --git a/account/pom.xml b/account/pom.xml
index 549b678..78a4147 100644
--- a/account/pom.xml
+++ b/account/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-account</artifactId>
diff --git a/account/src/main/java/org/killbill/billing/account/api/DefaultAccount.java b/account/src/main/java/org/killbill/billing/account/api/DefaultAccount.java
index 8c452f1..ca107c9 100644
--- a/account/src/main/java/org/killbill/billing/account/api/DefaultAccount.java
+++ b/account/src/main/java/org/killbill/billing/account/api/DefaultAccount.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -359,10 +359,6 @@ public class DefaultAccount extends EntityBase implements Account {
         return new DefaultAccount(currentAccount.getId(), accountData);
     }
 
-    public ImmutableAccountData toImmutableAccountData() {
-        return new DefaultImmutableAccountData(this);
-    }
-
     @Override
     public DateTimeZone getFixedOffsetTimeZone() {
         return AccountDateTimeUtils.getFixedOffsetTimeZone(this);
diff --git a/account/src/main/java/org/killbill/billing/account/api/DefaultImmutableAccountData.java b/account/src/main/java/org/killbill/billing/account/api/DefaultImmutableAccountData.java
index 4090248..3bd4afc 100644
--- a/account/src/main/java/org/killbill/billing/account/api/DefaultImmutableAccountData.java
+++ b/account/src/main/java/org/killbill/billing/account/api/DefaultImmutableAccountData.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -30,26 +30,25 @@ public class DefaultImmutableAccountData implements ImmutableAccountData {
     private final String externalKey;
     private final Currency currency;
     private final DateTimeZone dateTimeZone;
-    private final UUID parentAccountId;
-    private final boolean isPaymentDelegatedToParent;
     private final DateTimeZone fixedOffsetDateTimeZone;
     private final DateTime referenceTime;
 
-    public DefaultImmutableAccountData(final UUID id, final String externalKey, final Currency currency, final DateTimeZone dateTimeZone, final DateTimeZone fixedOffsetDateTimeZone, final DateTime referenceTime, final UUID parentAccountId, final boolean isPaymentDelegatedToParent) {
+    public DefaultImmutableAccountData(final UUID id, final String externalKey, final Currency currency, final DateTimeZone dateTimeZone, final DateTimeZone fixedOffsetDateTimeZone, final DateTime referenceTime) {
         this.id = id;
         this.externalKey = externalKey;
         this.currency = currency;
         this.dateTimeZone = dateTimeZone;
         this.fixedOffsetDateTimeZone = fixedOffsetDateTimeZone;
         this.referenceTime = referenceTime;
-        this.parentAccountId = parentAccountId;
-        this.isPaymentDelegatedToParent = isPaymentDelegatedToParent;
     }
 
     public DefaultImmutableAccountData(final Account account) {
-        this(account.getId(), account.getExternalKey(), account.getCurrency(), account.getTimeZone(),
-             AccountDateTimeUtils.getFixedOffsetTimeZone(account), AccountDateTimeUtils.getReferenceDateTime(account),
-             account.getParentAccountId(), account.isPaymentDelegatedToParent());
+        this(account.getId(),
+             account.getExternalKey(),
+             account.getCurrency(),
+             account.getTimeZone(),
+             AccountDateTimeUtils.getFixedOffsetTimeZone(account),
+             AccountDateTimeUtils.getReferenceDateTime(account));
     }
 
     @Override
@@ -73,13 +72,17 @@ public class DefaultImmutableAccountData implements ImmutableAccountData {
     }
 
     @Override
+    @Deprecated
     public UUID getParentAccountId() {
-        return parentAccountId;
+        // Should only be used internally by ImmutableAccountInternalApi
+        throw new UnsupportedOperationException("WILL BE REMOVED IN 0.20.0");
     }
 
     @Override
+    @Deprecated
     public Boolean isPaymentDelegatedToParent() {
-        return isPaymentDelegatedToParent;
+        // Should only be used internally by ImmutableAccountInternalApi
+        throw new UnsupportedOperationException("WILL BE REMOVED IN 0.20.0");
     }
 
     public DateTimeZone getFixedOffsetTimeZone() {
diff --git a/account/src/main/java/org/killbill/billing/account/dao/DefaultAccountDao.java b/account/src/main/java/org/killbill/billing/account/dao/DefaultAccountDao.java
index b959cb7..6ca2fa3 100644
--- a/account/src/main/java/org/killbill/billing/account/dao/DefaultAccountDao.java
+++ b/account/src/main/java/org/killbill/billing/account/dao/DefaultAccountDao.java
@@ -40,6 +40,7 @@ import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.dao.NonEntityDao;
 import org.killbill.billing.util.entity.DefaultPagination;
 import org.killbill.billing.util.entity.Pagination;
+import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.Ordering;
 import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.PaginationIteratorBuilder;
 import org.killbill.billing.util.entity.dao.EntityDaoBase;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
@@ -139,8 +140,8 @@ public class DefaultAccountDao extends EntityDaoBase<AccountModelDao, Account, A
                                                   }
 
                                                   @Override
-                                                  public Iterator<AccountModelDao> build(final AccountSqlDao accountSqlDao, final Long limit, final InternalTenantContext context) {
-                                                      return accountSqlDao.search(searchKey, String.format("%%%s%%", searchKey), offset, limit, context);
+                                                  public Iterator<AccountModelDao> build(final AccountSqlDao accountSqlDao, final Long offset, final Long limit, final Ordering ordering, final InternalTenantContext context) {
+                                                      return accountSqlDao.search(searchKey, String.format("%%%s%%", searchKey), offset, limit, ordering.toString(), context);
                                                   }
                                               },
                                               offset,
diff --git a/account/src/main/resources/org/killbill/billing/account/dao/AccountSqlDao.sql.stg b/account/src/main/resources/org/killbill/billing/account/dao/AccountSqlDao.sql.stg
index f9ddfa5..937b12f 100644
--- a/account/src/main/resources/org/killbill/billing/account/dao/AccountSqlDao.sql.stg
+++ b/account/src/main/resources/org/killbill/billing/account/dao/AccountSqlDao.sql.stg
@@ -67,14 +67,32 @@ accountRecordIdFieldWithComma(prefix) ::= ""
 accountRecordIdValueWithComma(prefix) ::= ""
 
 update() ::= <<
-    UPDATE accounts
-    SET email = :email, name = :name, first_name_length = :firstNameLength,
-        currency = :currency, billing_cycle_day_local = :billingCycleDayLocal,
-        payment_method_id = :paymentMethodId, time_zone = :timeZone, locale = :locale,
-        address1 = :address1, address2 = :address2, company_name = :companyName, city = :city, state_or_province = :stateOrProvince,
-        country = :country, postal_code = :postalCode, phone = :phone, notes = :notes,
-        is_notified_for_invoices = :isNotifiedForInvoices, updated_date = :updatedDate, updated_by = :updatedBy
-    WHERE id = :id <AND_CHECK_TENANT()>;
+update accounts set
+  email = :email
+, name = :name
+, first_name_length = :firstNameLength
+, currency = :currency
+, billing_cycle_day_local = :billingCycleDayLocal
+, parent_account_id = :parentAccountId
+, is_payment_delegated_to_parent = :isPaymentDelegatedToParent
+, payment_method_id = :paymentMethodId
+, time_zone = :timeZone
+, locale = :locale
+, address1 = :address1
+, address2 = :address2
+, company_name = :companyName
+, city = :city
+, state_or_province = :stateOrProvince
+, country = :country
+, postal_code = :postalCode
+, phone = :phone
+, notes = :notes
+, is_notified_for_invoices = :isNotifiedForInvoices
+, updated_date = :updatedDate
+, updated_by = :updatedBy
+where id = :id
+<AND_CHECK_TENANT()>
+;
 >>
 
 
diff --git a/account/src/test/java/org/killbill/billing/account/api/user/TestDefaultAccountUserApi.java b/account/src/test/java/org/killbill/billing/account/api/user/TestDefaultAccountUserApi.java
index 3fa19fc..0a0d2ac 100644
--- a/account/src/test/java/org/killbill/billing/account/api/user/TestDefaultAccountUserApi.java
+++ b/account/src/test/java/org/killbill/billing/account/api/user/TestDefaultAccountUserApi.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -20,10 +20,8 @@ package org.killbill.billing.account.api.user;
 
 import java.util.LinkedList;
 import java.util.List;
-import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.spi.TimeZoneNameProvider;
 
 import org.joda.time.DateTimeZone;
 import org.killbill.billing.ErrorCode;
@@ -356,8 +354,73 @@ public class TestDefaultAccountUserApi extends AccountTestSuiteWithEmbeddedDB {
 
     }
 
+    @Test(groups = "slow", description = "Test un- and re-parenting")
+    public void testUnAndReParenting() throws Exception {
+        // Create child1
+        final AccountModelDao childAccountModelDao1 = createTestAccount();
+        Account childAccount1 = accountUserApi.createAccount(new DefaultAccount(childAccountModelDao1), callContext);
+        Assert.assertNull(childAccount1.getParentAccountId());
+        Assert.assertFalse(childAccount1.isPaymentDelegatedToParent());
+
+        // Create parent
+        final Account parentAccount = accountUserApi.createAccount(new DefaultAccount(createTestAccount()), callContext);
+        Assert.assertNull(parentAccount.getParentAccountId());
+        Assert.assertFalse(parentAccount.isPaymentDelegatedToParent());
+        List<Account> childrenAccounts = accountUserApi.getChildrenAccounts(parentAccount.getId(), callContext);
+        Assert.assertEquals(childrenAccounts.size(), 0);
+
+        // Associate child1 to parent
+        childAccountModelDao1.setId(childAccount1.getId());
+        childAccountModelDao1.setParentAccountId(parentAccount.getId());
+        childAccountModelDao1.setIsPaymentDelegatedToParent(true);
+        accountUserApi.updateAccount(new DefaultAccount(childAccountModelDao1), callContext);
+
+        // Verify mapping
+        childAccount1 = accountUserApi.getAccountById(childAccount1.getId(), callContext);
+        Assert.assertEquals(childAccount1.getParentAccountId(), parentAccount.getId());
+        Assert.assertTrue(childAccount1.isPaymentDelegatedToParent());
+        childrenAccounts = accountUserApi.getChildrenAccounts(parentAccount.getId(), callContext);
+        Assert.assertEquals(childrenAccounts.size(), 1);
+        Assert.assertEquals(childrenAccounts.get(0).getId(), childAccount1.getId());
+
+        // Un-parent child1 from parent
+        childAccountModelDao1.setParentAccountId(null);
+        childAccountModelDao1.setIsPaymentDelegatedToParent(false);
+        accountUserApi.updateAccount(new DefaultAccount(childAccountModelDao1), callContext);
+
+        // Verify mapping
+        childAccount1 = accountUserApi.getAccountById(childAccount1.getId(), callContext);
+        Assert.assertNull(childAccount1.getParentAccountId());
+        Assert.assertFalse(childAccount1.isPaymentDelegatedToParent());
+        childrenAccounts = accountUserApi.getChildrenAccounts(parentAccount.getId(), callContext);
+        Assert.assertEquals(childrenAccounts.size(), 0);
+
+        // Create child2
+        final AccountModelDao childAccountModelDao2 = createTestAccount();
+        Account childAccount2 = accountUserApi.createAccount(new DefaultAccount(childAccountModelDao2), callContext);
+        Assert.assertNull(childAccount2.getParentAccountId());
+        Assert.assertFalse(childAccount2.isPaymentDelegatedToParent());
+
+        // Associate child2 to parent
+        childAccountModelDao2.setId(childAccount2.getId());
+        childAccountModelDao2.setParentAccountId(parentAccount.getId());
+        childAccountModelDao2.setIsPaymentDelegatedToParent(true);
+        accountUserApi.updateAccount(new DefaultAccount(childAccountModelDao2), callContext);
+
+        // Verify mapping
+        childAccount1 = accountUserApi.getAccountById(childAccount1.getId(), callContext);
+        Assert.assertNull(childAccount1.getParentAccountId());
+        Assert.assertFalse(childAccount1.isPaymentDelegatedToParent());
+        childAccount2 = accountUserApi.getAccountById(childAccount2.getId(), callContext);
+        Assert.assertEquals(childAccount2.getParentAccountId(), parentAccount.getId());
+        Assert.assertTrue(childAccount2.isPaymentDelegatedToParent());
+        childrenAccounts = accountUserApi.getChildrenAccounts(parentAccount.getId(), callContext);
+        Assert.assertEquals(childrenAccounts.size(), 1);
+        Assert.assertEquals(childrenAccounts.get(0).getId(), childAccount2.getId());
+    }
+
     @Test(groups = "slow", description = "Test Account creation with External Key over limit")
-        public void testCreateAccountWithExternalKeyOverLimit() throws Exception {
+    public void testCreateAccountWithExternalKeyOverLimit() throws Exception {
         AccountModelDao accountModelDao = createTestAccount();
         // Set an externalKey of 256 characters (over limit)
         accountModelDao.setExternalKey("Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec quam felis, ultricies nec, pellentesque eu, pretium quis,.");

api/pom.xml 2(+1 -1)

diff --git a/api/pom.xml b/api/pom.xml
index 60fcf7c..c8dbd0c 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-internal-api</artifactId>
diff --git a/api/src/main/java/org/killbill/billing/entitlement/EventsStream.java b/api/src/main/java/org/killbill/billing/entitlement/EventsStream.java
index 42d9f4a..be4eac4 100644
--- a/api/src/main/java/org/killbill/billing/entitlement/EventsStream.java
+++ b/api/src/main/java/org/killbill/billing/entitlement/EventsStream.java
@@ -56,6 +56,8 @@ public interface EventsStream {
 
     boolean isEntitlementActive();
 
+    boolean isEntitlementPending();
+
     boolean isBlockChange();
 
     boolean isEntitlementCancelled();

beatrix/pom.xml 2(+1 -1)

diff --git a/beatrix/pom.xml b/beatrix/pom.xml
index 2bbe74a..3b45ccc 100644
--- a/beatrix/pom.xml
+++ b/beatrix/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-beatrix</artifactId>
diff --git a/beatrix/src/main/resources/org/killbill/billing/beatrix/ddl.sql b/beatrix/src/main/resources/org/killbill/billing/beatrix/ddl.sql
index 96f32e8..d5ccb71 100644
--- a/beatrix/src/main/resources/org/killbill/billing/beatrix/ddl.sql
+++ b/beatrix/src/main/resources/org/killbill/billing/beatrix/ddl.sql
@@ -37,3 +37,4 @@ CREATE TABLE bus_ext_events_history (
     search_key2 bigint /*! unsigned */ not null default 0,
     PRIMARY KEY(record_id)
 ) /*! CHARACTER SET utf8 COLLATE utf8_bin */;
+CREATE INDEX bus_ext_events_history_tenant_account_record_id ON bus_ext_events_history(search_key2, search_key1);
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java
index 5ed3690..f32d4bf 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationBase.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -114,6 +114,7 @@ import org.killbill.billing.util.api.TagDefinitionApiException;
 import org.killbill.billing.util.api.TagUserApi;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.config.definition.InvoiceConfig;
+import org.killbill.billing.util.config.definition.PaymentConfig;
 import org.killbill.billing.util.dao.NonEntityDao;
 import org.killbill.billing.util.nodes.KillbillNodesApi;
 import org.killbill.billing.util.tag.ControlTagType;
@@ -291,6 +292,9 @@ public class TestIntegrationBase extends BeatrixTestSuiteWithEmbeddedDB {
     @Inject
     protected ParkedAccountsManager parkedAccountsManager;
 
+    @Inject
+    protected PaymentConfig paymentConfig;
+
     protected ConfigurableInvoiceConfig invoiceConfig;
 
     protected void assertListenerStatus() {
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationParentInvoice.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationParentInvoice.java
index e759270..1cf0586 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationParentInvoice.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationParentInvoice.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -23,6 +23,8 @@ import java.util.List;
 import org.joda.time.DateTime;
 import org.joda.time.LocalDate;
 import org.killbill.billing.account.api.Account;
+import org.killbill.billing.account.api.DefaultAccount;
+import org.killbill.billing.account.dao.AccountModelDao;
 import org.killbill.billing.api.TestApiListener.NextEvent;
 import org.killbill.billing.catalog.api.BillingActionPolicy;
 import org.killbill.billing.catalog.api.BillingPeriod;
@@ -36,15 +38,18 @@ import org.killbill.billing.invoice.api.InvoiceItem;
 import org.killbill.billing.invoice.api.InvoiceItemType;
 import org.killbill.billing.invoice.api.InvoiceStatus;
 import org.killbill.billing.payment.api.Payment;
+import org.killbill.billing.payment.api.PluginProperty;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
 public class TestIntegrationParentInvoice extends TestIntegrationBase {
 
-
-
     @Test(groups = "slow")
     public void testParentInvoiceGeneration() throws Exception {
 
@@ -584,7 +589,7 @@ public class TestIntegrationParentInvoice extends TestIntegrationBase {
 
         final List<Invoice> parentInvoices = invoiceUserApi.getInvoicesByAccount(parentAccount.getId(), false, callContext);
         final List<Invoice> childInvoices = invoiceUserApi.getInvoicesByAccount(childAccount.getId(), false, callContext);
-        
+
         // get last child invoice
         Invoice childInvoice = childInvoices.get(1);
         assertEquals(childInvoice.getNumberOfItems(), 1);
@@ -1032,4 +1037,241 @@ public class TestIntegrationParentInvoice extends TestIntegrationBase {
 
     }
 
+    @Test(groups = "slow")
+    public void testUnParentingWithUnpaidInvoice() throws Exception {
+        final int billingDay = 14;
+        final DateTime initialCreationDate = new DateTime(2015, 5, 15, 0, 0, 0, 0, testTimeZone);
+        // set clock to the initial start date
+        clock.setTime(initialCreationDate);
+
+        final Account parentAccount = createAccountWithNonOsgiPaymentMethod(getAccountData(billingDay));
+        Account childAccount = createAccountWithNonOsgiPaymentMethod(getChildAccountData(billingDay, parentAccount.getId(), true));
+
+        // Verify mapping
+        childAccount = accountUserApi.getAccountById(childAccount.getId(), callContext);
+        assertEquals(childAccount.getParentAccountId(), parentAccount.getId());
+        assertTrue(childAccount.isPaymentDelegatedToParent());
+        List<Account> childrenAccounts = accountUserApi.getChildrenAccounts(parentAccount.getId(), callContext);
+        assertEquals(childrenAccounts.size(), 1);
+        assertEquals(childrenAccounts.get(0).getId(), childAccount.getId());
+
+        // Create subscription
+        createBaseEntitlementAndCheckForCompletion(childAccount.getId(), "bundleKey1", "Shotgun", ProductCategory.BASE, BillingPeriod.MONTHLY, NextEvent.CREATE, NextEvent.BLOCK, NextEvent.INVOICE);
+
+        // Moving a day the NotificationQ calls the commitInvoice. No payment is expected
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        clock.addDays(1);
+        assertListenerStatus();
+
+        // First Parent invoice over TRIAL period
+        List<Invoice> parentInvoices = invoiceUserApi.getInvoicesByAccount(parentAccount.getId(), false, callContext);
+        assertEquals(parentInvoices.size(), 1);
+        Invoice parentInvoice = parentInvoices.get(0);
+        assertEquals(parentInvoice.getNumberOfItems(), 1);
+        assertEquals(parentInvoice.getStatus(), InvoiceStatus.COMMITTED);
+        assertTrue(parentInvoice.isParentInvoice());
+        assertEquals(parentInvoice.getBalance().compareTo(BigDecimal.ZERO), 0);
+
+        // First child invoice over TRIAL period
+        List<Invoice> childInvoices = invoiceUserApi.getInvoicesByAccount(childAccount.getId(), false, callContext);
+        assertEquals(childInvoices.size(), 1);
+        assertEquals(childInvoices.get(0).getBalance().compareTo(BigDecimal.ZERO), 0);
+
+        busHandler.pushExpectedEvents(NextEvent.PHASE, NextEvent.INVOICE);
+        clock.addDays(29);
+        assertListenerStatus();
+
+        paymentPlugin.makeNextPaymentFailWithError();
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.PAYMENT_ERROR, NextEvent.INVOICE_PAYMENT_ERROR);
+        clock.addDays(1);
+        assertListenerStatus();
+
+        // Second Parent invoice over Recurring period
+        parentInvoices = invoiceUserApi.getInvoicesByAccount(parentAccount.getId(), false, callContext);
+        assertEquals(parentInvoices.size(), 2);
+        parentInvoice = parentInvoices.get(1);
+        assertEquals(parentInvoice.getNumberOfItems(), 1);
+        assertEquals(parentInvoice.getStatus(), InvoiceStatus.COMMITTED);
+        assertTrue(parentInvoice.isParentInvoice());
+        assertEquals(parentInvoice.getBalance().compareTo(new BigDecimal("249.95")), 0);
+        // The parent has attempted to pay the child invoice
+        assertEquals(parentInvoice.getPayments().size(), 1);
+        assertEquals(paymentApi.getPayment(parentInvoice.getPayments().get(0).getPaymentId(), false, false, ImmutableList.<PluginProperty>of(), callContext).getPaymentMethodId(),
+                     parentAccount.getPaymentMethodId());
+
+        // Second child invoice over Recurring period
+        childInvoices = invoiceUserApi.getInvoicesByAccount(childAccount.getId(), false, callContext);
+        assertEquals(childInvoices.size(), 2);
+        assertEquals(childInvoices.get(1).getBalance().compareTo(new BigDecimal("249.95")), 0);
+
+        // Verify balances
+        assertEquals(invoiceUserApi.getAccountBalance(parentAccount.getId(), callContext).compareTo(new BigDecimal("249.95")), 0);
+        assertEquals(invoiceUserApi.getAccountBalance(childAccount.getId(), callContext).compareTo(new BigDecimal("249.95")), 0);
+
+        // Un-parent the child
+        final AccountModelDao childAccountModelDao = new AccountModelDao(childAccount.getId(), childAccount);
+        childAccountModelDao.setParentAccountId(null);
+        childAccountModelDao.setIsPaymentDelegatedToParent(false);
+        accountUserApi.updateAccount(new DefaultAccount(childAccountModelDao), callContext);
+
+        // Verify mapping
+        childAccount = accountUserApi.getAccountById(childAccount.getId(), callContext);
+        assertNull(childAccount.getParentAccountId());
+        assertFalse(childAccount.isPaymentDelegatedToParent());
+        childrenAccounts = accountUserApi.getChildrenAccounts(parentAccount.getId(), callContext);
+        assertEquals(childrenAccounts.size(), 0);
+
+        // Verify balances
+        // TODO Should we automatically adjust the invoice at the parent level or should it be the responsibility of the user?
+        assertEquals(invoiceUserApi.getAccountBalance(parentAccount.getId(), callContext).compareTo(new BigDecimal("249.95")), 0);
+        assertEquals(invoiceUserApi.getAccountBalance(childAccount.getId(), callContext).compareTo(new BigDecimal("249.95")), 0);
+
+        final int nbDaysBeforeRetry = paymentConfig.getPaymentFailureRetryDays(internalCallContext).get(0);
+
+        // Move time for retry to happen
+        busHandler.pushExpectedEvents(NextEvent.PAYMENT, NextEvent.INVOICE_PAYMENT);
+        clock.addDays(nbDaysBeforeRetry + 1);
+        assertListenerStatus();
+
+        // Second Parent invoice over Recurring period
+        parentInvoices = invoiceUserApi.getInvoicesByAccount(parentAccount.getId(), false, callContext);
+        // Note that the parent still owns both invoices
+        assertEquals(parentInvoices.size(), 2);
+        parentInvoice = parentInvoices.get(1);
+        assertEquals(parentInvoice.getNumberOfItems(), 1);
+        assertEquals(parentInvoice.getStatus(), InvoiceStatus.COMMITTED);
+        assertTrue(parentInvoice.isParentInvoice());
+        assertEquals(parentInvoice.getBalance().compareTo(BigDecimal.ZERO), 0);
+        // Even if the child-parent mapping has been removed, the parent has retried and successfully paid the summary invoice
+        // TODO Should we automatically disable payment retries when un-parenting?
+        assertEquals(parentInvoice.getPayments().size(), 1);
+        assertEquals(paymentApi.getPayment(parentInvoice.getPayments().get(0).getPaymentId(), false, false, ImmutableList.<PluginProperty>of(), callContext).getPaymentMethodId(),
+                     parentAccount.getPaymentMethodId());
+
+        // Second child invoice over Recurring period
+        childInvoices = invoiceUserApi.getInvoicesByAccount(childAccount.getId(), false, callContext);
+        assertEquals(childInvoices.size(), 2);
+        assertEquals(childInvoices.get(1).getBalance().compareTo(BigDecimal.ZERO), 0);
+
+        // Verify balances (the parent has paid the summary invoice, so the child invoice is automatically paid)
+        assertEquals(invoiceUserApi.getAccountBalance(parentAccount.getId(), callContext).compareTo(BigDecimal.ZERO), 0);
+        assertEquals(invoiceUserApi.getAccountBalance(childAccount.getId(), callContext).compareTo(BigDecimal.ZERO), 0);
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.addDays(29 - nbDaysBeforeRetry - 1);
+        assertListenerStatus();
+
+        // No new invoice for the parent
+        parentInvoices = invoiceUserApi.getInvoicesByAccount(parentAccount.getId(), false, callContext);
+        assertEquals(parentInvoices.size(), 2);
+
+        // Third child invoice over second Recurring period
+        childInvoices = invoiceUserApi.getInvoicesByAccount(childAccount.getId(), false, callContext);
+        assertEquals(childInvoices.size(), 3);
+        assertEquals(childInvoices.get(2).getBalance().compareTo(BigDecimal.ZERO), 0);
+        // Verify the child paid the invoice this time
+        assertEquals(childInvoices.get(2).getPayments().size(), 1);
+        assertEquals(paymentApi.getPayment(childInvoices.get(2).getPayments().get(0).getPaymentId(), false, false, ImmutableList.<PluginProperty>of(), callContext).getPaymentMethodId(),
+                     childAccount.getPaymentMethodId());
+
+        // Verify balances
+        assertEquals(invoiceUserApi.getAccountBalance(parentAccount.getId(), callContext).compareTo(BigDecimal.ZERO), 0);
+        assertEquals(invoiceUserApi.getAccountBalance(childAccount.getId(), callContext).compareTo(BigDecimal.ZERO), 0);
+    }
+
+    @Test(groups = "slow")
+    public void testParentingWithFuturePhaseEvent() throws Exception {
+        final int billingDay = 14;
+        final DateTime initialCreationDate = new DateTime(2015, 5, 15, 0, 0, 0, 0, testTimeZone);
+        // set clock to the initial start date
+        clock.setTime(initialCreationDate);
+
+        final Account parentAccount = createAccountWithNonOsgiPaymentMethod(getAccountData(billingDay));
+        Account childAccount = createAccountWithNonOsgiPaymentMethod(getAccountData(billingDay));
+
+        // Verify mapping
+        childAccount = accountUserApi.getAccountById(childAccount.getId(), callContext);
+        assertNull(childAccount.getParentAccountId());
+        assertFalse(childAccount.isPaymentDelegatedToParent());
+        List<Account> childrenAccounts = accountUserApi.getChildrenAccounts(parentAccount.getId(), callContext);
+        assertEquals(childrenAccounts.size(), 0);
+
+        // Create subscription
+        createBaseEntitlementAndCheckForCompletion(childAccount.getId(), "bundleKey1", "Shotgun", ProductCategory.BASE, BillingPeriod.MONTHLY, NextEvent.CREATE, NextEvent.BLOCK, NextEvent.INVOICE);
+
+        // First child invoice over TRIAL period
+        List<Invoice> childInvoices = invoiceUserApi.getInvoicesByAccount(childAccount.getId(), false, callContext);
+        assertEquals(childInvoices.size(), 1);
+        assertEquals(childInvoices.get(0).getBalance().compareTo(BigDecimal.ZERO), 0);
+
+        // Add parent to the child -- the child still pays its invoices though
+        AccountModelDao childAccountModelDao = new AccountModelDao(childAccount.getId(), childAccount);
+        childAccountModelDao.setParentAccountId(parentAccount.getId());
+        childAccountModelDao.setIsPaymentDelegatedToParent(false);
+        accountUserApi.updateAccount(new DefaultAccount(childAccountModelDao), callContext);
+
+        // Verify mapping
+        childAccount = accountUserApi.getAccountById(childAccount.getId(), callContext);
+        assertEquals(childAccount.getParentAccountId(), parentAccount.getId());
+        assertFalse(childAccount.isPaymentDelegatedToParent());
+        childrenAccounts = accountUserApi.getChildrenAccounts(parentAccount.getId(), callContext);
+        assertEquals(childrenAccounts.size(), 1);
+        assertEquals(childrenAccounts.get(0).getId(), childAccount.getId());
+
+        busHandler.pushExpectedEvents(NextEvent.PHASE, NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.addDays(30);
+        assertListenerStatus();
+
+        // The parent still has no invoice
+        List<Invoice> parentInvoices = invoiceUserApi.getInvoicesByAccount(parentAccount.getId(), false, callContext);
+        assertEquals(parentInvoices.size(), 0);
+
+        // Second child invoice over Recurring period
+        childInvoices = invoiceUserApi.getInvoicesByAccount(childAccount.getId(), false, callContext);
+        assertEquals(childInvoices.size(), 2);
+        assertEquals(childInvoices.get(1).getBalance().compareTo(BigDecimal.ZERO), 0);
+        assertEquals(childInvoices.get(1).getPayments().size(), 1);
+        assertEquals(paymentApi.getPayment(childInvoices.get(1).getPayments().get(0).getPaymentId(), false, false, ImmutableList.<PluginProperty>of(), callContext).getPaymentMethodId(),
+                     childAccount.getPaymentMethodId());
+
+        // The child now delegates its payments
+        childAccountModelDao = new AccountModelDao(childAccount.getId(), childAccount);
+        childAccountModelDao.setIsPaymentDelegatedToParent(true);
+        accountUserApi.updateAccount(new DefaultAccount(childAccountModelDao), callContext);
+
+        // Verify mapping
+        childAccount = accountUserApi.getAccountById(childAccount.getId(), callContext);
+        assertEquals(childAccount.getParentAccountId(), parentAccount.getId());
+        assertTrue(childAccount.isPaymentDelegatedToParent());
+        childrenAccounts = accountUserApi.getChildrenAccounts(parentAccount.getId(), callContext);
+        assertEquals(childrenAccounts.size(), 1);
+        assertEquals(childrenAccounts.get(0).getId(), childAccount.getId());
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE);
+        clock.addDays(30);
+        assertListenerStatus();
+
+        // Moving a day the NotificationQ calls the commitInvoice. No payment is expected
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.addDays(1);
+        assertListenerStatus();
+
+        // The parent now owns the invoice
+        parentInvoices = invoiceUserApi.getInvoicesByAccount(parentAccount.getId(), false, callContext);
+        assertEquals(parentInvoices.size(), 1);
+        final Invoice parentInvoice = parentInvoices.get(0);
+        assertEquals(parentInvoice.getNumberOfItems(), 1);
+        assertEquals(parentInvoice.getStatus(), InvoiceStatus.COMMITTED);
+        assertTrue(parentInvoice.isParentInvoice());
+        assertEquals(parentInvoice.getBalance().compareTo(BigDecimal.ZERO), 0);
+        assertEquals(parentInvoice.getPayments().size(), 1);
+        assertEquals(paymentApi.getPayment(parentInvoice.getPayments().get(0).getPaymentId(), false, false, ImmutableList.<PluginProperty>of(), callContext).getPaymentMethodId(),
+                     parentAccount.getPaymentMethodId());
+
+        // Third child invoice over Recurring period
+        childInvoices = invoiceUserApi.getInvoicesByAccount(childAccount.getId(), false, callContext);
+        assertEquals(childInvoices.size(), 3);
+        assertEquals(childInvoices.get(2).getBalance().compareTo(BigDecimal.ZERO), 0);
+    }
 }
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationWithAutoPayOff.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationWithAutoPayOff.java
index 4d911f6..afad9a3 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationWithAutoPayOff.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestIntegrationWithAutoPayOff.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -44,9 +44,6 @@ import static org.testng.Assert.assertTrue;
 
 public class TestIntegrationWithAutoPayOff extends TestIntegrationBase {
 
-    @Inject
-    private PaymentConfig paymentConfig;
-
     private Account account;
     private SubscriptionBaseBundle bundle;
     private String productName;

catalog/pom.xml 2(+1 -1)

diff --git a/catalog/pom.xml b/catalog/pom.xml
index f8ff03e..77babaa 100644
--- a/catalog/pom.xml
+++ b/catalog/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-catalog</artifactId>
diff --git a/catalog/src/main/java/org/killbill/billing/catalog/DefaultLimit.java b/catalog/src/main/java/org/killbill/billing/catalog/DefaultLimit.java
index b594c03..10b11af 100644
--- a/catalog/src/main/java/org/killbill/billing/catalog/DefaultLimit.java
+++ b/catalog/src/main/java/org/killbill/billing/catalog/DefaultLimit.java
@@ -69,9 +69,7 @@ public class DefaultLimit extends ValidatingConfig<StandaloneCatalog> implements
 
     @Override
     public ValidationErrors validate(StandaloneCatalog root, ValidationErrors errors) {
-        if (max == CatalogSafetyInitializer.DEFAULT_NON_REQUIRED_DOUBLE_FIELD_VALUE && min == CatalogSafetyInitializer.DEFAULT_NON_REQUIRED_DOUBLE_FIELD_VALUE) {
-            errors.add(new ValidationError("max and min cannot both be ommitted", root.getCatalogURI(), Limit.class, ""));
-        } else if (max != CatalogSafetyInitializer.DEFAULT_NON_REQUIRED_DOUBLE_FIELD_VALUE &&
+        if (max != CatalogSafetyInitializer.DEFAULT_NON_REQUIRED_DOUBLE_FIELD_VALUE &&
                    min != CatalogSafetyInitializer.DEFAULT_NON_REQUIRED_DOUBLE_FIELD_VALUE &&
                    max.doubleValue() < min.doubleValue()) {
             errors.add(new ValidationError("max must be greater than min", root.getCatalogURI(), Limit.class, ""));
diff --git a/catalog/src/main/java/org/killbill/billing/catalog/DefaultUsage.java b/catalog/src/main/java/org/killbill/billing/catalog/DefaultUsage.java
index d0dba33..1619710 100644
--- a/catalog/src/main/java/org/killbill/billing/catalog/DefaultUsage.java
+++ b/catalog/src/main/java/org/killbill/billing/catalog/DefaultUsage.java
@@ -66,17 +66,14 @@ public class DefaultUsage extends ValidatingConfig<StandaloneCatalog> implements
     @XmlElement(required = true)
     private BillingPeriod billingPeriod;
 
-    // Used for when billing usage IN_ADVANCE & CAPACITY
     @XmlElementWrapper(name = "limits", required = false)
     @XmlElement(name = "limit", required = false)
     private DefaultLimit[] limits;
 
-    // Used for when billing usage IN_ADVANCE & CONSUMABLE
     @XmlElementWrapper(name = "blocks", required = false)
     @XmlElement(name = "block", required = false)
     private DefaultBlock[] blocks;
 
-    // Used for when billing usage IN_ARREAR
     @XmlElementWrapper(name = "tiers", required = false)
     @XmlElement(name = "tier", required = false)
     private DefaultTier[] tiers;

currency/pom.xml 2(+1 -1)

diff --git a/currency/pom.xml b/currency/pom.xml
index f9003f4..26b69c6 100644
--- a/currency/pom.xml
+++ b/currency/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-currency</artifactId>
diff --git a/entitlement/pom.xml b/entitlement/pom.xml
index c691cc9..2c34d9c 100644
--- a/entitlement/pom.xml
+++ b/entitlement/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-entitlement</artifactId>
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/api/DefaultEntitlementApi.java b/entitlement/src/main/java/org/killbill/billing/entitlement/api/DefaultEntitlementApi.java
index 8df759d..7ea0b3b 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/api/DefaultEntitlementApi.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/api/DefaultEntitlementApi.java
@@ -299,8 +299,10 @@ public class DefaultEntitlementApi extends DefaultEntitlementApiBase implements 
             public Entitlement doCall(final EntitlementApi entitlementApi, final EntitlementContext updatedPluginContext) throws EntitlementApiException {
                 final EventsStream eventsStreamForBaseSubscription = eventsStreamBuilder.buildForBaseSubscription(bundleId, callContext);
 
-                // Check the base entitlement state is active
-                if (!eventsStreamForBaseSubscription.isEntitlementActive()) {
+                if (eventsStreamForBaseSubscription.isEntitlementCancelled() ||
+                    (eventsStreamForBaseSubscription.isEntitlementPending() &&
+                     (baseEntitlementWithAddOnsSpecifier.getEntitlementEffectiveDate() == null ||
+                      baseEntitlementWithAddOnsSpecifier.getEntitlementEffectiveDate().compareTo(eventsStreamForBaseSubscription.getEntitlementEffectiveStartDate()) < 0))) {
                     throw new EntitlementApiException(ErrorCode.SUB_GET_NO_SUCH_BASE_SUBSCRIPTION, bundleId);
                 }
 
diff --git a/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/DefaultEventsStream.java b/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/DefaultEventsStream.java
index 19d7cb9..eded306 100644
--- a/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/DefaultEventsStream.java
+++ b/entitlement/src/main/java/org/killbill/billing/entitlement/engine/core/DefaultEventsStream.java
@@ -188,6 +188,11 @@ public class DefaultEventsStream implements EventsStream {
     }
 
     @Override
+    public boolean isEntitlementPending() {
+        return entitlementState == EntitlementState.PENDING;
+    }
+
+    @Override
     public boolean isEntitlementCancelled() {
         return entitlementState == EntitlementState.CANCELLED;
     }
diff --git a/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestDefaultEntitlementApi.java b/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestDefaultEntitlementApi.java
index be7245c..0d5f109 100644
--- a/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestDefaultEntitlementApi.java
+++ b/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestDefaultEntitlementApi.java
@@ -279,6 +279,57 @@ public class TestDefaultEntitlementApi extends EntitlementTestSuiteWithEmbeddedD
     }
 
     @Test(groups = "slow")
+    public void testAddEntitlementOnPendingBase() throws AccountApiException, EntitlementApiException {
+        final LocalDate initialDate = new LocalDate(2013, 8, 7);
+        clock.setDay(initialDate);
+
+        final Account account = createAccount(getAccountData(7));
+
+        final PlanPhaseSpecifier spec = new PlanPhaseSpecifier("Shotgun", BillingPeriod.ANNUAL, PriceListSet.DEFAULT_PRICELIST_NAME, null);
+
+        // Create entitlement and check each field
+        final LocalDate startDate = initialDate.plusDays(10);
+        final Entitlement baseEntitlement = entitlementApi.createBaseEntitlement(account.getId(), spec, account.getExternalKey(), null, startDate, startDate, false, ImmutableList.<PluginProperty>of(), callContext);
+
+        // Add ADD_ON immediately. Because BASE is PENDING should fail
+        final PlanPhaseSpecifier spec1 = new PlanPhaseSpecifier("Telescopic-Scope", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME, null);
+
+        try {
+            entitlementApi.addEntitlement(baseEntitlement.getBundleId(), spec1, null, initialDate, initialDate, false, ImmutableList.<PluginProperty>of(), callContext);
+            fail("Should not succeed to create ADD_On prior BASE is active");
+        } catch (final EntitlementApiException e) {
+            assertEquals(e.getCode(), ErrorCode.SUB_GET_NO_SUCH_BASE_SUBSCRIPTION.getCode());
+        }
+
+
+        // Add ADD_ON with a startDate similar to BASE
+        final Entitlement telescopicEntitlement = entitlementApi.addEntitlement(baseEntitlement.getBundleId(), spec1, null, startDate, startDate, false, ImmutableList.<PluginProperty>of(), callContext);
+
+        testListener.pushExpectedEvents(NextEvent.CREATE, NextEvent.CREATE, NextEvent.BLOCK, NextEvent.BLOCK);
+        clock.addDays(10);
+        assertListenerStatus();
+
+
+        assertEquals(telescopicEntitlement.getAccountId(), account.getId());
+        assertEquals(telescopicEntitlement.getExternalKey(), account.getExternalKey());
+
+        assertEquals(telescopicEntitlement.getEffectiveStartDate(), startDate);
+        assertNull(telescopicEntitlement.getEffectiveEndDate());
+
+        assertEquals(telescopicEntitlement.getLastActivePriceList().getName(), PriceListSet.DEFAULT_PRICELIST_NAME);
+        assertEquals(telescopicEntitlement.getLastActiveProduct().getName(), "Telescopic-Scope");
+        assertEquals(telescopicEntitlement.getLastActivePhase().getName(), "telescopic-scope-monthly-discount");
+        assertEquals(telescopicEntitlement.getLastActivePlan().getName(), "telescopic-scope-monthly");
+        assertEquals(telescopicEntitlement.getLastActiveProductCategory(), ProductCategory.ADD_ON);
+
+        List<Entitlement> bundleEntitlements = entitlementApi.getAllEntitlementsForBundle(telescopicEntitlement.getBundleId(), callContext);
+        assertEquals(bundleEntitlements.size(), 2);
+
+        bundleEntitlements = entitlementApi.getAllEntitlementsForAccountIdAndExternalKey(account.getId(), account.getExternalKey(), callContext);
+        assertEquals(bundleEntitlements.size(), 2);
+    }
+
+    @Test(groups = "slow")
     public void testPauseUnpause() throws AccountApiException, EntitlementApiException {
         final LocalDate initialDate = new LocalDate(2013, 8, 7);
         clock.setDay(initialDate);
@@ -319,12 +370,12 @@ public class TestDefaultEntitlementApi extends EntitlementTestSuiteWithEmbeddedD
             assertEquals(cur.getState(), EntitlementState.BLOCKED);
         }
 
-        // Try to add an ADD_ON, it should fail
+        // Try to add an ADD_ON, it should fail because BASE is blocked
         try {
             final PlanPhaseSpecifier spec3 = new PlanPhaseSpecifier("Telescopic-Scope", BillingPeriod.MONTHLY, PriceListSet.DEFAULT_PRICELIST_NAME, null);
-            final Entitlement telescopicEntitlement3 = entitlementApi.addEntitlement(baseEntitlement.getBundleId(), spec1, null, effectiveDateSpec1, effectiveDateSpec1, false, ImmutableList.<PluginProperty>of(), callContext);
+            entitlementApi.addEntitlement(baseEntitlement.getBundleId(), spec3, null, effectiveDateSpec1, effectiveDateSpec1, false, ImmutableList.<PluginProperty>of(), callContext);
         } catch (EntitlementApiException e) {
-            assertEquals(e.getCode(), ErrorCode.SUB_GET_NO_SUCH_BASE_SUBSCRIPTION.getCode());
+            assertEquals(e.getCode(), ErrorCode.BLOCK_BLOCKED_ACTION.getCode());
         }
 
         clock.addDays(3);

invoice/pom.xml 2(+1 -1)

diff --git a/invoice/pom.xml b/invoice/pom.xml
index e713886..74f81c4 100644
--- a/invoice/pom.xml
+++ b/invoice/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-invoice</artifactId>
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/org/killbill/billing/invoice/dao/DefaultInvoiceDao.java
index c452e53..4d64d57 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/dao/DefaultInvoiceDao.java
@@ -61,6 +61,7 @@ import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.config.definition.InvoiceConfig;
 import org.killbill.billing.util.dao.NonEntityDao;
 import org.killbill.billing.util.entity.Pagination;
+import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper;
 import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.PaginationIteratorBuilder;
 import org.killbill.billing.util.entity.dao.EntityDaoBase;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
@@ -371,11 +372,11 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
                                                   }
 
                                                   @Override
-                                                  public Iterator<InvoiceModelDao> build(final InvoiceSqlDao invoiceSqlDao, final Long limit, final InternalTenantContext context) {
+                                                  public Iterator<InvoiceModelDao> build(final InvoiceSqlDao invoiceSqlDao, final Long offset, final Long limit, final DefaultPaginationSqlDaoHelper.Ordering ordering, final InternalTenantContext context) {
                                                       try {
                                                           return invoiceNumber != null ?
                                                                  ImmutableList.<InvoiceModelDao>of(getByNumber(invoiceNumber, context)).iterator() :
-                                                                 invoiceSqlDao.search(searchKey, String.format("%%%s%%", searchKey), offset, limit, context);
+                                                                 invoiceSqlDao.search(searchKey, String.format("%%%s%%", searchKey), offset, limit, ordering.toString(), context);
                                                       } catch (final InvoiceApiException ignored) {
                                                           return Iterators.<InvoiceModelDao>emptyIterator();
                                                       }
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceDaoHelper.java b/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceDaoHelper.java
index 139ac52..27168cf 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceDaoHelper.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceDaoHelper.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -22,6 +22,7 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +48,7 @@ import org.killbill.billing.util.tag.Tag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Functions;
 import com.google.common.base.Objects;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
@@ -103,7 +105,6 @@ public class InvoiceDaoHelper {
         return outputItemIdsWithAmounts;
     }
 
-
     private static void computeItemAdjustmentsForTargetInvoiceItem(final InvoiceItemModelDao targetInvoiceItem, final List<InvoiceItemModelDao> adjustedOrRepairedItems, final Map<UUID, BigDecimal> inputAdjInvoiceItem, final Map<UUID, BigDecimal> outputAdjInvoiceItem) throws InvoiceApiException {
         final BigDecimal originalItemAmount = targetInvoiceItem.getAmount();
         final BigDecimal maxAdjLeftAmount = computeItemAdjustmentAmount(originalItemAmount, adjustedOrRepairedItems);
@@ -121,7 +122,7 @@ public class InvoiceDaoHelper {
 
     /**
      * @param requestedPositiveAmountToAdjust amount we are adjusting for that item
-     * @param adjustedOrRepairedItems        list of all adjusted or repaired linking to this item
+     * @param adjustedOrRepairedItems         list of all adjusted or repaired linking to this item
      * @return the amount we should really adjust based on whether or not the item got repaired
      */
     private static BigDecimal computeItemAdjustmentAmount(final BigDecimal requestedPositiveAmountToAdjust, final List<InvoiceItemModelDao> adjustedOrRepairedItems) {
@@ -224,17 +225,30 @@ public class InvoiceDaoHelper {
     }
 
     public void populateChildren(final InvoiceModelDao invoice, final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final InternalTenantContext context) {
-        getInvoiceItemsWithinTransaction(ImmutableList.<InvoiceModelDao>of(invoice), entitySqlDaoWrapperFactory, context);
-        getInvoicePaymentsWithinTransaction(ImmutableList.<InvoiceModelDao>of(invoice), entitySqlDaoWrapperFactory, context);
-        setInvoiceWrittenOff(invoice, context);
-        getParentInvoice(ImmutableList.<InvoiceModelDao>of(invoice), entitySqlDaoWrapperFactory, context);
+        populateChildren(ImmutableList.<InvoiceModelDao>of(invoice), entitySqlDaoWrapperFactory, context);
     }
 
     public void populateChildren(final Iterable<InvoiceModelDao> invoices, final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final InternalTenantContext context) {
+        if (Iterables.<InvoiceModelDao>isEmpty(invoices)) {
+            return;
+        }
+
         getInvoiceItemsWithinTransaction(invoices, entitySqlDaoWrapperFactory, context);
         getInvoicePaymentsWithinTransaction(invoices, entitySqlDaoWrapperFactory, context);
         setInvoicesWrittenOff(invoices, context);
-        getParentInvoice(invoices, entitySqlDaoWrapperFactory, context);
+
+        final Iterable<InvoiceModelDao> nonParentInvoices = Iterables.<InvoiceModelDao>filter(invoices,
+                                                                                              new Predicate<InvoiceModelDao>() {
+                                                                                                  @Override
+                                                                                                  public boolean apply(final InvoiceModelDao invoice) {
+                                                                                                      return !invoice.isParentInvoice();
+                                                                                                  }
+                                                                                              });
+        if (!Iterables.<InvoiceModelDao>isEmpty(nonParentInvoices)) {
+            setParentInvoice(nonParentInvoices,
+                             entitySqlDaoWrapperFactory,
+                             context);
+        }
     }
 
     public List<InvoiceModelDao> getAllInvoicesByAccountFromTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final InternalTenantContext context) {
@@ -270,7 +284,7 @@ public class InvoiceDaoHelper {
 
     private void getInvoicePaymentsWithinTransaction(final Iterable<InvoiceModelDao> invoices, final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final InternalTenantContext context) {
         final InvoicePaymentSqlDao invoicePaymentSqlDao = entitySqlDaoWrapperFactory.become(InvoicePaymentSqlDao.class);
-        final List<InvoicePaymentModelDao> invoicePaymentsForAccount = invoicePaymentSqlDao.getByAccountRecordId(context);;
+        final List<InvoicePaymentModelDao> invoicePaymentsForAccount = invoicePaymentSqlDao.getByAccountRecordId(context);
 
         final Map<UUID, List<InvoicePaymentModelDao>> invoicePaymentsPerInvoiceId = new HashMap<UUID, List<InvoicePaymentModelDao>>();
         for (final InvoicePaymentModelDao invoicePayment : invoicePaymentsForAccount) {
@@ -297,7 +311,6 @@ public class InvoiceDaoHelper {
     }
 
     private void setInvoicesWrittenOff(final Iterable<InvoiceModelDao> invoices, final InternalTenantContext internalTenantContext) {
-
         final List<Tag> tags = tagInternalApi.getTagsForAccountType(ObjectType.INVOICE, false, internalTenantContext);
         final Iterable<Tag> writtenOffTags = filterForWrittenOff(tags);
         for (final Tag cur : writtenOffTags) {
@@ -313,34 +326,71 @@ public class InvoiceDaoHelper {
         }
     }
 
-    private void setInvoiceWrittenOff(final InvoiceModelDao invoice, final InternalTenantContext internalTenantContext) {
-        final List<Tag> tags =  tagInternalApi.getTags(invoice.getId(), ObjectType.INVOICE, internalTenantContext);
-        final Iterable<Tag> writtenOffTags = filterForWrittenOff(tags);
-        invoice.setIsWrittenOff(writtenOffTags.iterator().hasNext());
-    }
-
     private Iterable<Tag> filterForWrittenOff(final List<Tag> tags) {
         return Iterables.filter(tags, new Predicate<Tag>() {
             @Override
             public boolean apply(final Tag input) {
-                return  input.getTagDefinitionId().equals(ControlTagType.WRITTEN_OFF.getId());
+                return input.getTagDefinitionId().equals(ControlTagType.WRITTEN_OFF.getId());
             }
         });
     }
 
-    private void getParentInvoice(final Iterable<InvoiceModelDao> invoices, final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final InternalTenantContext internalTenantContext) {
+    private void setParentInvoice(final Iterable<InvoiceModelDao> childInvoices, final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final InternalTenantContext childContext) {
+        final Collection<String> childInvoiceIds = new HashSet<String>();
+        for (final InvoiceModelDao childInvoice : childInvoices) {
+            childInvoiceIds.add(childInvoice.getId().toString());
+        }
+
+        // DAO: retrieve the mappings between parent and child invoices
+        final InvoiceParentChildrenSqlDao invoiceParentChildrenSqlDao = entitySqlDaoWrapperFactory.become(InvoiceParentChildrenSqlDao.class);
+        final List<InvoiceParentChildModelDao> mappings = invoiceParentChildrenSqlDao.getParentChildMappingsByChildInvoiceIds(childInvoiceIds, childContext);
+        if (mappings.isEmpty()) {
+            return;
+        }
 
+        final Map<UUID, InvoiceParentChildModelDao> mappingPerChildInvoiceId = new HashMap<UUID, InvoiceParentChildModelDao>();
+        final Collection<String> parentInvoiceIdsAsStrings = new HashSet<String>();
+        for (final InvoiceParentChildModelDao mapping : mappings) {
+            mappingPerChildInvoiceId.put(mapping.getChildInvoiceId(), mapping);
+            parentInvoiceIdsAsStrings.add(mapping.getParentInvoiceId().toString());
+        }
+
+        // DAO: retrieve all parents invoices in bulk, for all child invoices
         final InvoiceSqlDao invoiceSqlDao = entitySqlDaoWrapperFactory.become(InvoiceSqlDao.class);
-        for (InvoiceModelDao invoice : invoices) {
-            if (invoice.isParentInvoice()) continue;
-            final InvoiceModelDao parentInvoice = invoiceSqlDao.getParentInvoiceByChildInvoiceId(invoice.getId().toString(), internalTenantContext);
+        final List<InvoiceModelDao> parentInvoices = invoiceSqlDao.getByIds(parentInvoiceIdsAsStrings, childContext);
+
+        // Group the parent invoices by (parent) account id (most likely, we only have one parent account group, except in re-parenting cases)
+        final Map<UUID, List<InvoiceModelDao>> parentInvoicesGroupedByParentAccountId = new HashMap<UUID, List<InvoiceModelDao>>();
+        // Create also a convenient mapping (needed below later)
+        final Map<UUID, InvoiceModelDao> parentInvoiceByParentInvoiceId = new HashMap<UUID, InvoiceModelDao>();
+        for (final InvoiceModelDao parentInvoice : parentInvoices) {
+            if (parentInvoicesGroupedByParentAccountId.get(parentInvoice.getAccountId()) == null) {
+                parentInvoicesGroupedByParentAccountId.put(parentInvoice.getAccountId(), new LinkedList<InvoiceModelDao>());
+            }
+            parentInvoicesGroupedByParentAccountId.get(parentInvoice.getAccountId()).add(parentInvoice);
+
+            parentInvoiceByParentInvoiceId.put(parentInvoice.getId(), parentInvoice);
+        }
+
+        // DAO: populate the parent invoices in bulk
+        for (final UUID parentAccountId : parentInvoicesGroupedByParentAccountId.keySet()) {
+            final List<InvoiceModelDao> parentInvoicesForOneParentAccountId = parentInvoicesGroupedByParentAccountId.get(parentAccountId);
+            final Long parentAccountRecordId = internalCallContextFactory.getRecordIdFromObject(parentAccountId, ObjectType.ACCOUNT, internalCallContextFactory.createTenantContext(childContext));
+            final InternalTenantContext parentContext = internalCallContextFactory.createInternalTenantContext(childContext.getTenantRecordId(), parentAccountRecordId);
+            // Note the misnomer here, populateChildren simply populates the content of these invoices (unrelated to HA)
+            populateChildren(parentInvoicesForOneParentAccountId, entitySqlDaoWrapperFactory, parentContext);
+        }
+
+        for (final InvoiceModelDao invoice : childInvoices) {
+            final InvoiceParentChildModelDao mapping = mappingPerChildInvoiceId.get(invoice.getId());
+            if (mapping == null) {
+                continue;
+            }
+
+            final InvoiceModelDao parentInvoice = parentInvoiceByParentInvoiceId.get(mapping.getParentInvoiceId());
             if (parentInvoice != null) {
-                final Long parentAccountRecordId = internalCallContextFactory.getRecordIdFromObject(parentInvoice.getAccountId(), ObjectType.ACCOUNT, internalCallContextFactory.createTenantContext(internalTenantContext));
-                final InternalTenantContext parentContext = internalCallContextFactory.createInternalTenantContext(internalTenantContext.getTenantRecordId(), parentAccountRecordId);
-                populateChildren(parentInvoice, entitySqlDaoWrapperFactory, parentContext);
                 invoice.addParentInvoice(parentInvoice);
             }
         }
     }
-
 }
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceModelDao.java b/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceModelDao.java
index 8043b07..b3f5602 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceModelDao.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceModelDao.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -191,11 +191,12 @@ public class InvoiceModelDao extends EntityModelDaoBase implements EntityModelDa
         this.status = status;
     }
 
+    @SuppressWarnings("unused")
     public void setParentInvoice(final boolean isParentInvoice) {
         this.isParentInvoice = isParentInvoice;
     }
 
-    public void addParentInvoice(InvoiceModelDao parentInvoice) {
+    public void addParentInvoice(final InvoiceModelDao parentInvoice) {
         this.parentInvoice = parentInvoice;
     }
 
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceParentChildrenSqlDao.java b/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceParentChildrenSqlDao.java
index 2e82ba8..2973f53 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceParentChildrenSqlDao.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceParentChildrenSqlDao.java
@@ -17,12 +17,15 @@
 
 package org.killbill.billing.invoice.dao;
 
+import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.invoice.api.InvoiceParentChild;
 import org.killbill.billing.util.entity.dao.EntitySqlDao;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoStringTemplate;
+import org.killbill.billing.util.tag.dao.UUIDCollectionBinder;
 import org.skife.jdbi.v2.sqlobject.Bind;
 import org.skife.jdbi.v2.sqlobject.BindBean;
 import org.skife.jdbi.v2.sqlobject.SqlQuery;
@@ -34,5 +37,9 @@ public interface InvoiceParentChildrenSqlDao extends EntitySqlDao<InvoiceParentC
     List<InvoiceParentChildModelDao> getChildInvoicesByParentInvoiceId(@Bind("parentInvoiceId") final String parentInvoiceId,
                                                                        @BindBean final InternalTenantContext context);
 
+    @SqlQuery
+    List<InvoiceParentChildModelDao> getParentChildMappingsByChildInvoiceIds(@UUIDCollectionBinder final Collection<String> childInvoiceIds,
+                                                                             @BindBean final InternalTenantContext context);
+
 }
 
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceSqlDao.java b/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceSqlDao.java
index f4a4b98..9ed8f28 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceSqlDao.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/dao/InvoiceSqlDao.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -16,6 +18,7 @@
 
 package org.killbill.billing.invoice.dao;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 
@@ -26,6 +29,7 @@ import org.killbill.billing.util.audit.ChangeType;
 import org.killbill.billing.util.entity.dao.Audited;
 import org.killbill.billing.util.entity.dao.EntitySqlDao;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoStringTemplate;
+import org.killbill.billing.util.tag.dao.UUIDCollectionBinder;
 import org.skife.jdbi.v2.sqlobject.Bind;
 import org.skife.jdbi.v2.sqlobject.BindBean;
 import org.skife.jdbi.v2.sqlobject.SqlQuery;
@@ -45,15 +49,15 @@ public interface InvoiceSqlDao extends EntitySqlDao<InvoiceModelDao, Invoice> {
     @SqlUpdate
     @Audited(ChangeType.UPDATE)
     void updateStatus(@Bind("id") String invoiceId,
-                             @Bind("status") String status,
-                             @BindBean final InternalCallContext context);
+                      @Bind("status") String status,
+                      @BindBean final InternalCallContext context);
 
     @SqlQuery
     InvoiceModelDao getParentDraftInvoice(@Bind("accountId") final String parentAccountId,
-                               @BindBean final InternalTenantContext context);
+                                          @BindBean final InternalTenantContext context);
 
     @SqlQuery
-    InvoiceModelDao getParentInvoiceByChildInvoiceId(@Bind("childInvoiceId") final String childInvoiceId,
-                                                     @BindBean final InternalTenantContext context);
+    List<InvoiceModelDao> getByIds(@UUIDCollectionBinder final Collection<String> invoiceIds,
+                                   @BindBean final InternalTenantContext context);
 }
 
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/generator/UsageInvoiceItemGenerator.java b/invoice/src/main/java/org/killbill/billing/invoice/generator/UsageInvoiceItemGenerator.java
index 850b8a6..eb18aaa 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/generator/UsageInvoiceItemGenerator.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/generator/UsageInvoiceItemGenerator.java
@@ -34,7 +34,6 @@ import org.killbill.billing.catalog.api.BillingMode;
 import org.killbill.billing.catalog.api.CatalogApiException;
 import org.killbill.billing.catalog.api.Currency;
 import org.killbill.billing.catalog.api.Usage;
-import org.killbill.billing.catalog.api.UsageType;
 import org.killbill.billing.invoice.api.Invoice;
 import org.killbill.billing.invoice.api.InvoiceApiException;
 import org.killbill.billing.invoice.api.InvoiceItem;
@@ -42,8 +41,8 @@ import org.killbill.billing.invoice.api.InvoiceItemType;
 import org.killbill.billing.invoice.generator.InvoiceWithMetadata.SubscriptionFutureNotificationDates;
 import org.killbill.billing.invoice.usage.RawUsageOptimizer;
 import org.killbill.billing.invoice.usage.RawUsageOptimizer.RawUsageOptimizerResult;
-import org.killbill.billing.invoice.usage.SubscriptionConsumableInArrear;
-import org.killbill.billing.invoice.usage.SubscriptionConsumableInArrear.SubscriptionConsumableInArrearItemsAndNextNotificationDate;
+import org.killbill.billing.invoice.usage.SubscriptionUsageInArrear;
+import org.killbill.billing.invoice.usage.SubscriptionUsageInArrear.SubscriptionUsageInArrearItemsAndNextNotificationDate;
 import org.killbill.billing.junction.BillingEvent;
 import org.killbill.billing.junction.BillingEventSet;
 import org.slf4j.Logger;
@@ -77,7 +76,7 @@ public class UsageInvoiceItemGenerator extends InvoiceItemGenerator {
                                            final Currency targetCurrency,
                                            final Map<UUID, SubscriptionFutureNotificationDates> perSubscriptionFutureNotificationDates,
                                            final InternalCallContext internalCallContext) throws InvoiceApiException {
-        final Map<UUID, List<InvoiceItem>> perSubscriptionConsumableInArrearUsageItems = extractPerSubscriptionExistingConsumableInArrearUsageItems(eventSet.getUsages(), existingInvoices);
+        final Map<UUID, List<InvoiceItem>> perSubscriptionInArrearUsageItems = extractPerSubscriptionExistingInArrearUsageItems(eventSet.getUsages(), existingInvoices);
         try {
             // Pretty-print the generated invoice items from the junction events
             final InvoiceItemGeneratorLogger invoiceItemGeneratorLogger = new InvoiceItemGeneratorLogger(invoiceId, account.getId(), "usage", log);
@@ -103,24 +102,23 @@ public class UsageInvoiceItemGenerator extends InvoiceItemGenerator {
                     Iterables.any(event.getUsages(), new Predicate<Usage>() {
                         @Override
                         public boolean apply(@Nullable final Usage input) {
-                            return (input.getUsageType() == UsageType.CONSUMABLE &&
-                                    input.getBillingMode() == BillingMode.IN_ARREAR);
+                            return input.getBillingMode() == BillingMode.IN_ARREAR;
                         }
                     })) {
-                    rawUsageOptimizerResult = rawUsageOptimizer.getConsumableInArrearUsage(minBillingEventDate, targetDate, Iterables.concat(perSubscriptionConsumableInArrearUsageItems.values()), eventSet.getUsages(), internalCallContext);
+                    rawUsageOptimizerResult = rawUsageOptimizer.getInArrearUsage(minBillingEventDate, targetDate, Iterables.concat(perSubscriptionInArrearUsageItems.values()), eventSet.getUsages(), internalCallContext);
                 }
 
-                // None of the billing events report any usage (CONSUMABLE/IN_ARREAR) sections
+                // None of the billing events report any usage IN_ARREAR sections
                 if (rawUsageOptimizerResult == null) {
                     continue;
                 }
 
                 final UUID subscriptionId = event.getSubscription().getId();
                 if (curSubscriptionId != null && !curSubscriptionId.equals(subscriptionId)) {
-                    final SubscriptionConsumableInArrear subscriptionConsumableInArrear = new SubscriptionConsumableInArrear(account.getId(), invoiceId, curEvents, rawUsageOptimizerResult.getRawUsage(), targetDate, rawUsageOptimizerResult.getRawUsageStartDate(), internalCallContext);
-                    final List<InvoiceItem> consumableInUsageArrearItems = perSubscriptionConsumableInArrearUsageItems.get(curSubscriptionId);
+                    final SubscriptionUsageInArrear subscriptionUsageInArrear = new SubscriptionUsageInArrear(account.getId(), invoiceId, curEvents, rawUsageOptimizerResult.getRawUsage(), targetDate, rawUsageOptimizerResult.getRawUsageStartDate(), internalCallContext);
+                    final List<InvoiceItem> usageInArrearItems = perSubscriptionInArrearUsageItems.get(curSubscriptionId);
 
-                    final SubscriptionConsumableInArrearItemsAndNextNotificationDate subscriptionResult = subscriptionConsumableInArrear.computeMissingUsageInvoiceItems(consumableInUsageArrearItems != null ? consumableInUsageArrearItems : ImmutableList.<InvoiceItem>of(), invoiceItemGeneratorLogger);
+                    final SubscriptionUsageInArrearItemsAndNextNotificationDate subscriptionResult = subscriptionUsageInArrear.computeMissingUsageInvoiceItems(usageInArrearItems != null ? usageInArrearItems : ImmutableList.<InvoiceItem>of(), invoiceItemGeneratorLogger);
                     final List<InvoiceItem> newInArrearUsageItems = subscriptionResult.getInvoiceItems();
                     items.addAll(newInArrearUsageItems);
                     updatePerSubscriptionNextNotificationUsageDate(curSubscriptionId, subscriptionResult.getPerUsageNotificationDates(), BillingMode.IN_ARREAR, perSubscriptionFutureNotificationDates);
@@ -130,10 +128,10 @@ public class UsageInvoiceItemGenerator extends InvoiceItemGenerator {
                 curEvents.add(event);
             }
             if (curSubscriptionId != null) {
-                final SubscriptionConsumableInArrear subscriptionConsumableInArrear = new SubscriptionConsumableInArrear(account.getId(), invoiceId, curEvents, rawUsageOptimizerResult.getRawUsage(), targetDate, rawUsageOptimizerResult.getRawUsageStartDate(), internalCallContext);
-                final List<InvoiceItem> consumableInUsageArrearItems = perSubscriptionConsumableInArrearUsageItems.get(curSubscriptionId);
+                final SubscriptionUsageInArrear subscriptionUsageInArrear = new SubscriptionUsageInArrear(account.getId(), invoiceId, curEvents, rawUsageOptimizerResult.getRawUsage(), targetDate, rawUsageOptimizerResult.getRawUsageStartDate(), internalCallContext);
+                final List<InvoiceItem> usageInArrearItems = perSubscriptionInArrearUsageItems.get(curSubscriptionId);
 
-                final SubscriptionConsumableInArrearItemsAndNextNotificationDate subscriptionResult = subscriptionConsumableInArrear.computeMissingUsageInvoiceItems(consumableInUsageArrearItems != null ? consumableInUsageArrearItems : ImmutableList.<InvoiceItem>of(), invoiceItemGeneratorLogger);
+                final SubscriptionUsageInArrearItemsAndNextNotificationDate subscriptionResult = subscriptionUsageInArrear.computeMissingUsageInvoiceItems(usageInArrearItems != null ? usageInArrearItems : ImmutableList.<InvoiceItem>of(), invoiceItemGeneratorLogger);
                 final List<InvoiceItem> newInArrearUsageItems = subscriptionResult.getInvoiceItems();
                 items.addAll(newInArrearUsageItems);
                 updatePerSubscriptionNextNotificationUsageDate(curSubscriptionId, subscriptionResult.getPerUsageNotificationDates(), BillingMode.IN_ARREAR, perSubscriptionFutureNotificationDates);
@@ -172,13 +170,13 @@ public class UsageInvoiceItemGenerator extends InvoiceItemGenerator {
         }
     }
 
-    private Map<UUID, List<InvoiceItem>> extractPerSubscriptionExistingConsumableInArrearUsageItems(final Map<String, Usage> knownUsage, @Nullable final List<Invoice> existingInvoices) {
+    private Map<UUID, List<InvoiceItem>> extractPerSubscriptionExistingInArrearUsageItems(final Map<String, Usage> knownUsage, @Nullable final List<Invoice> existingInvoices) {
         if (existingInvoices == null || existingInvoices.isEmpty()) {
             return ImmutableMap.of();
         }
 
         final Map<UUID, List<InvoiceItem>> result = new HashMap<UUID, List<InvoiceItem>>();
-        final Iterable<InvoiceItem> usageConsumableInArrearItems = Iterables.concat(Iterables.transform(existingInvoices, new Function<Invoice, Iterable<InvoiceItem>>() {
+        final Iterable<InvoiceItem> usageInArrearItems = Iterables.concat(Iterables.transform(existingInvoices, new Function<Invoice, Iterable<InvoiceItem>>() {
             @Override
             public Iterable<InvoiceItem> apply(final Invoice input) {
 
@@ -187,7 +185,7 @@ public class UsageInvoiceItemGenerator extends InvoiceItemGenerator {
                     public boolean apply(final InvoiceItem input) {
                         if (input.getInvoiceItemType() == InvoiceItemType.USAGE) {
                             final Usage usage = knownUsage.get(input.getUsageName());
-                            return usage.getUsageType() == UsageType.CONSUMABLE && usage.getBillingMode() == BillingMode.IN_ARREAR;
+                            return usage.getBillingMode() == BillingMode.IN_ARREAR;
                         }
                         return false;
                     }
@@ -195,7 +193,7 @@ public class UsageInvoiceItemGenerator extends InvoiceItemGenerator {
             }
         }));
 
-        for (final InvoiceItem cur : usageConsumableInArrearItems) {
+        for (final InvoiceItem cur : usageInArrearItems) {
             List<InvoiceItem> perSubscriptionUsageItems = result.get(cur.getSubscriptionId());
             if (perSubscriptionUsageItems == null) {
                 perSubscriptionUsageItems = new LinkedList<InvoiceItem>();
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
index a338da9..cc85579 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -694,26 +695,21 @@ public class InvoiceDispatcher {
         try {
             final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
                                                                                                       DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
-            final List<NotificationEventWithMetadata<NextBillingDateNotificationKey>> futureNotifications = notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+            final Iterable<NotificationEventWithMetadata<NextBillingDateNotificationKey>> futureNotifications = notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
 
-            final Iterable<NotificationEventWithMetadata<NextBillingDateNotificationKey>> allUpcomingEvents = Iterables.filter(futureNotifications, new Predicate<NotificationEventWithMetadata<NextBillingDateNotificationKey>>() {
-                @Override
-                public boolean apply(@Nullable final NotificationEventWithMetadata<NextBillingDateNotificationKey> input) {
-                    final boolean isEventForSubscription = !filteredSubscriptionIds.iterator().hasNext() || Iterables.contains(filteredSubscriptionIds, input.getEvent().getUuidKey());
+            final Collection<DateTime> effectiveDates = new LinkedList<DateTime>();
+            for (final NotificationEventWithMetadata<NextBillingDateNotificationKey> input : futureNotifications) {
+                final boolean isEventForSubscription = !filteredSubscriptionIds.iterator().hasNext() || Iterables.contains(filteredSubscriptionIds, input.getEvent().getUuidKey());
 
-                    final boolean isEventDryRunForNotifications = input.getEvent().isDryRunForInvoiceNotification() != null ?
-                                                                  input.getEvent().isDryRunForInvoiceNotification() : false;
-                    return isEventForSubscription && !isEventDryRunForNotifications;
+                final boolean isEventDryRunForNotifications = input.getEvent().isDryRunForInvoiceNotification() != null ?
+                                                              input.getEvent().isDryRunForInvoiceNotification() : false;
+                if (isEventForSubscription && !isEventDryRunForNotifications) {
+                    effectiveDates.add(input.getEffectiveDate());
                 }
-            });
 
-            return Iterables.transform(allUpcomingEvents, new Function<NotificationEventWithMetadata<NextBillingDateNotificationKey>, DateTime>() {
-                @Nullable
-                @Override
-                public DateTime apply(@Nullable final NotificationEventWithMetadata<NextBillingDateNotificationKey> input) {
-                    return input.getEffectiveDate();
-                }
-            });
+            }
+
+            return effectiveDates;
         } catch (final NoSuchNotificationQueue noSuchNotificationQueue) {
             throw new IllegalStateException(noSuchNotificationQueue);
         }
@@ -770,35 +766,52 @@ public class InvoiceDispatcher {
         }
     }
 
-    public void processParentInvoiceForInvoiceGeneration(final ImmutableAccountData account, final UUID childInvoiceId, final InternalCallContext context) throws InvoiceApiException {
+    public void processParentInvoiceForInvoiceGeneration(final Account childAccount, final UUID childInvoiceId, final InternalCallContext context) throws InvoiceApiException {
+        GlobalLock lock = null;
+        try {
+            lock = locker.lockWithNumberOfTries(LockerType.ACCNT_INV_PAY.toString(), childAccount.getParentAccountId().toString(), invoiceConfig.getMaxGlobalLockRetries());
+
+            processParentInvoiceForInvoiceGenerationWithLock(childAccount, childInvoiceId, context);
+        } catch (final LockFailedException e) {
+            log.warn("Failed to process parent invoice for parentAccountId='{}'", childAccount.getParentAccountId().toString(), e);
+        } finally {
+            if (lock != null) {
+                lock.release();
+            }
+        }
+    }
 
+    private void processParentInvoiceForInvoiceGenerationWithLock(final Account childAccount, final UUID childInvoiceId, final InternalCallContext context) throws InvoiceApiException {
+        log.info("Processing parent invoice for parentAccountId='{}', childInvoiceId='{}'", childAccount.getParentAccountId(), childInvoiceId);
         final InvoiceModelDao childInvoiceModelDao = invoiceDao.getById(childInvoiceId, context);
         final Invoice childInvoice = new DefaultInvoice(childInvoiceModelDao);
 
-        final Long parentAccountRecordId = internalCallContextFactory.getRecordIdFromObject(account.getParentAccountId(), ObjectType.ACCOUNT, buildTenantContext(context));
+        final Long parentAccountRecordId = internalCallContextFactory.getRecordIdFromObject(childAccount.getParentAccountId(), ObjectType.ACCOUNT, buildTenantContext(context));
         final InternalCallContext parentContext = internalCallContextFactory.createInternalCallContext(parentAccountRecordId, context);
 
         BigDecimal childInvoiceAmount = InvoiceCalculatorUtils.computeChildInvoiceAmount(childInvoice.getCurrency(), childInvoice.getInvoiceItems());
-        InvoiceModelDao draftParentInvoice = invoiceDao.getParentDraftInvoice(account.getParentAccountId(), parentContext);
+        InvoiceModelDao draftParentInvoice = invoiceDao.getParentDraftInvoice(childAccount.getParentAccountId(), parentContext);
 
-        final String description = account.getExternalKey().concat(" summary");
+        final String description = childAccount.getExternalKey().concat(" summary");
         if (draftParentInvoice != null) {
-
             for (InvoiceItemModelDao item : draftParentInvoice.getInvoiceItems()) {
                 if ((item.getChildAccountId() != null) && item.getChildAccountId().equals(childInvoice.getAccountId())) {
                     // update child item amount for existing parent invoice item
                     BigDecimal newChildInvoiceAmount = childInvoiceAmount.add(item.getAmount());
+                    log.info("Updating existing itemId='{}', oldAmount='{}', newAmount='{}' on existing DRAFT invoiceId='{}'", item.getId(), item.getAmount(), newChildInvoiceAmount, draftParentInvoice.getId());
                     invoiceDao.updateInvoiceItemAmount(item.getId(), newChildInvoiceAmount, parentContext);
                     return;
                 }
             }
 
             // new item when the parent invoices does not have this child item yet
-            final ParentInvoiceItem newParentInvoiceItem = new ParentInvoiceItem(UUID.randomUUID(), context.getCreatedDate(), draftParentInvoice.getId(), account.getParentAccountId(), account.getId(), childInvoiceAmount, account.getCurrency(), description);
-            draftParentInvoice.addInvoiceItem(new InvoiceItemModelDao(newParentInvoiceItem));
+            final ParentInvoiceItem newParentInvoiceItem = new ParentInvoiceItem(UUID.randomUUID(), context.getCreatedDate(), draftParentInvoice.getId(), childAccount.getParentAccountId(), childAccount.getId(), childInvoiceAmount, childAccount.getCurrency(), description);
+            final InvoiceItemModelDao parentInvoiceItem = new InvoiceItemModelDao(newParentInvoiceItem);
+            draftParentInvoice.addInvoiceItem(parentInvoiceItem);
 
             List<InvoiceModelDao> invoices = new ArrayList<InvoiceModelDao>();
             invoices.add(draftParentInvoice);
+            log.info("Adding new itemId='{}', amount='{}' on existing DRAFT invoiceId='{}'", parentInvoiceItem.getId(), childInvoiceAmount, draftParentInvoice.getId());
             invoiceDao.createInvoices(invoices, parentContext);
         } else {
             if (shouldIgnoreChildInvoice(childInvoice, childInvoiceAmount)) {
@@ -806,20 +819,20 @@ public class InvoiceDispatcher {
             }
 
             final LocalDate invoiceDate = context.toLocalDate(context.getCreatedDate());
-            draftParentInvoice = new InvoiceModelDao(account.getParentAccountId(), invoiceDate, account.getCurrency(), InvoiceStatus.DRAFT, true);
-            final InvoiceItem parentInvoiceItem = new ParentInvoiceItem(UUID.randomUUID(), context.getCreatedDate(), draftParentInvoice.getId(), account.getParentAccountId(), account.getId(), childInvoiceAmount, account.getCurrency(), description);
+            draftParentInvoice = new InvoiceModelDao(childAccount.getParentAccountId(), invoiceDate, childAccount.getCurrency(), InvoiceStatus.DRAFT, true);
+            final InvoiceItem parentInvoiceItem = new ParentInvoiceItem(UUID.randomUUID(), context.getCreatedDate(), draftParentInvoice.getId(), childAccount.getParentAccountId(), childAccount.getId(), childInvoiceAmount, childAccount.getCurrency(), description);
             draftParentInvoice.addInvoiceItem(new InvoiceItemModelDao(parentInvoiceItem));
 
             // build account date time zone
             final FutureAccountNotifications futureAccountNotifications = new FutureAccountNotifications(ImmutableMap.<UUID, List<SubscriptionNotification>>of());
 
+            log.info("Adding new itemId='{}', amount='{}' on new DRAFT invoiceId='{}'", parentInvoiceItem.getId(), childInvoiceAmount, draftParentInvoice.getId());
             invoiceDao.createInvoice(draftParentInvoice, draftParentInvoice.getInvoiceItems(), true, futureAccountNotifications, parentContext);
         }
 
         // save parent child invoice relation
-        final InvoiceParentChildModelDao invoiceRelation = new InvoiceParentChildModelDao(draftParentInvoice.getId(), childInvoiceId, account.getId());
+        final InvoiceParentChildModelDao invoiceRelation = new InvoiceParentChildModelDao(draftParentInvoice.getId(), childInvoiceId, childAccount.getId());
         invoiceDao.createParentChildInvoiceRelation(invoiceRelation, parentContext);
-
     }
 
     private boolean shouldIgnoreChildInvoice(final Invoice childInvoice, final BigDecimal childInvoiceAmount) {
@@ -839,8 +852,22 @@ public class InvoiceDispatcher {
         return true;
     }
 
-    public void processParentInvoiceForAdjustments(final ImmutableAccountData account, final UUID childInvoiceId, final InternalCallContext context) throws InvoiceApiException {
+    public void processParentInvoiceForAdjustments(final Account childAccount, final UUID childInvoiceId, final InternalCallContext context) throws InvoiceApiException {
+        GlobalLock lock = null;
+        try {
+            lock = locker.lockWithNumberOfTries(LockerType.ACCNT_INV_PAY.toString(), childAccount.getParentAccountId().toString(), invoiceConfig.getMaxGlobalLockRetries());
+
+            processParentInvoiceForAdjustmentsWithLock(childAccount, childInvoiceId, context);
+        } catch (final LockFailedException e) {
+            log.warn("Failed to process parent invoice for parentAccountId='{}'", childAccount.getParentAccountId().toString(), e);
+        } finally {
+            if (lock != null) {
+                lock.release();
+            }
+        }
+    }
 
+    public void processParentInvoiceForAdjustmentsWithLock(final Account account, final UUID childInvoiceId, final InternalCallContext context) throws InvoiceApiException {
         final InvoiceModelDao childInvoiceModelDao = invoiceDao.getById(childInvoiceId, context);
         final InvoiceModelDao parentInvoiceModelDao = childInvoiceModelDao.getParentInvoice();
 
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
index 4f20b6d..155e941 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -21,9 +21,9 @@ package org.killbill.billing.invoice;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
+import org.killbill.billing.account.api.Account;
 import org.killbill.billing.account.api.AccountApiException;
 import org.killbill.billing.account.api.AccountInternalApi;
-import org.killbill.billing.account.api.ImmutableAccountData;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.events.BlockingTransitionInternalEvent;
 import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
@@ -83,7 +83,6 @@ public class InvoiceListener {
         }
     }
 
-
     @AllowConcurrentEvents
     @Subscribe
     public void handleBlockingStateTransition(final BlockingTransitionInternalEvent event) {
@@ -127,8 +126,7 @@ public class InvoiceListener {
 
         try {
             final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "CreateParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            // TODO it may change to Account - #459
-            final ImmutableAccountData account = accountApi.getImmutableAccountDataById(event.getAccountId(), context);
+            final Account account = accountApi.getAccountById(event.getAccountId(), context);
 
             // catch children invoices and populate the parent summary invoice
             if (isChildrenAccountAndPaymentDelegated(account)) {
@@ -142,7 +140,7 @@ public class InvoiceListener {
         }
     }
 
-    private boolean isChildrenAccountAndPaymentDelegated(final ImmutableAccountData account) {
+    private boolean isChildrenAccountAndPaymentDelegated(final Account account) {
         return account.getParentAccountId() != null && account.isPaymentDelegatedToParent();
     }
 
@@ -161,8 +159,7 @@ public class InvoiceListener {
 
         try {
             final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "AdjustParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            // TODO it may change to Account - #459
-            final ImmutableAccountData account = accountApi.getImmutableAccountDataById(event.getAccountId(), context);
+            final Account account = accountApi.getAccountById(event.getAccountId(), context);
 
             // catch children invoices and populate the parent summary invoice
             if (isChildrenAccountAndPaymentDelegated(account)) {
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java
index 9014bdb..6dac86e 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDatePoster.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -19,7 +19,6 @@
 package org.killbill.billing.invoice.notification;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -34,8 +33,6 @@ import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificatio
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 
 public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
@@ -71,23 +68,25 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
                                                                              DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
 
             // If we see existing notification for the same date (and isDryRunForInvoiceNotification mode), we don't insert a new notification
-            final List<NotificationEventWithMetadata<NextBillingDateNotificationKey>> futureNotifications = nextBillingQueue.getFutureNotificationFromTransactionForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
-            final NotificationEventWithMetadata<NextBillingDateNotificationKey> existingFutureNotificationWithSameDate = Iterables.tryFind(futureNotifications, new Predicate<NotificationEventWithMetadata<NextBillingDateNotificationKey>>() {
-                @Override
-                public boolean apply(final NotificationEventWithMetadata<NextBillingDateNotificationKey> input) {
-                    final boolean isEventDryRunForNotifications = input.getEvent().isDryRunForInvoiceNotification() != null ?
-                                                                  input.getEvent().isDryRunForInvoiceNotification() : false;
-
-                    final LocalDate notificationEffectiveLocaleDate = internalCallContext.toLocalDate(futureNotificationTime);
-                    final LocalDate eventEffectiveLocaleDate = internalCallContext.toLocalDate(input.getEffectiveDate());
-
-                    return notificationEffectiveLocaleDate.compareTo(eventEffectiveLocaleDate) == 0 &&
-                           ((isDryRunForInvoiceNotification && isEventDryRunForNotifications) ||
-                            (!isDryRunForInvoiceNotification && !isEventDryRunForNotifications));
+            final Iterable<NotificationEventWithMetadata<NextBillingDateNotificationKey>> futureNotifications = nextBillingQueue.getFutureNotificationFromTransactionForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
+
+            boolean existingFutureNotificationWithSameDate = false;
+            for (final NotificationEventWithMetadata<NextBillingDateNotificationKey> input : futureNotifications) {
+                final boolean isEventDryRunForNotifications = input.getEvent().isDryRunForInvoiceNotification() != null ?
+                                                              input.getEvent().isDryRunForInvoiceNotification() : false;
+
+                final LocalDate notificationEffectiveLocaleDate = internalCallContext.toLocalDate(futureNotificationTime);
+                final LocalDate eventEffectiveLocaleDate = internalCallContext.toLocalDate(input.getEffectiveDate());
+
+                if (notificationEffectiveLocaleDate.compareTo(eventEffectiveLocaleDate) == 0 &&
+                    ((isDryRunForInvoiceNotification && isEventDryRunForNotifications) ||
+                     (!isDryRunForInvoiceNotification && !isEventDryRunForNotifications))) {
+                    existingFutureNotificationWithSameDate = true;
                 }
-            }).orNull();
+                // Go through all results to close the connection
+            }
 
-            if (existingFutureNotificationWithSameDate == null) {
+            if (!existingFutureNotificationWithSameDate) {
                 log.info("Queuing next billing date notification at {} for subscriptionId {}", futureNotificationTime.toString(), subscriptionId.toString());
 
                 nextBillingQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), futureNotificationTime,
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java b/invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java
index afd8816..43870c3 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/notification/ParentInvoiceCommitmentPoster.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -18,7 +18,6 @@
 package org.killbill.billing.invoice.notification;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -33,8 +32,6 @@ import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificatio
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 
 public class ParentInvoiceCommitmentPoster {
@@ -58,19 +55,20 @@ public class ParentInvoiceCommitmentPoster {
                                                                                ParentInvoiceCommitmentNotifier.PARENT_INVOICE_COMMITMENT_NOTIFIER_QUEUE);
 
             // If we see existing notification for the same date we don't insert a new notification
-            final List<NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey>> futureNotifications = commitInvoiceQueue.getFutureNotificationFromTransactionForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
-            final NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey> existingFutureNotificationWithSameDate = Iterables.tryFind(futureNotifications, new Predicate<NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey>>() {
-                @Override
-                public boolean apply(final NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey> input) {
+            final Iterable<NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey>> futureNotifications = commitInvoiceQueue.getFutureNotificationFromTransactionForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
 
-                    final LocalDate notificationEffectiveLocaleDate = internalCallContext.toLocalDate(futureNotificationTime);
-                    final LocalDate eventEffectiveLocaleDate = internalCallContext.toLocalDate(input.getEffectiveDate());
+            boolean existingFutureNotificationWithSameDate = false;
+            for (final NotificationEventWithMetadata<ParentInvoiceCommitmentNotificationKey> input : futureNotifications) {
+                final LocalDate notificationEffectiveLocaleDate = internalCallContext.toLocalDate(futureNotificationTime);
+                final LocalDate eventEffectiveLocaleDate = internalCallContext.toLocalDate(input.getEffectiveDate());
 
-                    return notificationEffectiveLocaleDate.compareTo(eventEffectiveLocaleDate) == 0;
+                if (notificationEffectiveLocaleDate.compareTo(eventEffectiveLocaleDate) == 0) {
+                    existingFutureNotificationWithSameDate = true;
                 }
-            }).orNull();
+                // Go through all results to close the connection
+            }
 
-            if (existingFutureNotificationWithSameDate == null) {
+            if (!existingFutureNotificationWithSameDate) {
                 log.info("Queuing parent invoice commitment notification at {} for invoiceId {}", futureNotificationTime.toString(), invoiceId.toString());
 
                 commitInvoiceQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), futureNotificationTime,
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/usage/DefaultRolledUpUnit.java b/invoice/src/main/java/org/killbill/billing/invoice/usage/DefaultRolledUpUnit.java
index 0e1976c..6321fd5 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/usage/DefaultRolledUpUnit.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/usage/DefaultRolledUpUnit.java
@@ -38,4 +38,12 @@ public class DefaultRolledUpUnit implements RolledUpUnit {
     public Long getAmount() {
         return amount;
     }
+
+    @Override
+    public String toString() {
+        return "DefaultRolledUpUnit{" +
+               "unitType='" + unitType + '\'' +
+               ", amount=" + amount +
+               '}';
+    }
 }
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/usage/RawUsageOptimizer.java b/invoice/src/main/java/org/killbill/billing/invoice/usage/RawUsageOptimizer.java
index 1d3efeb..295c376 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/usage/RawUsageOptimizer.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/usage/RawUsageOptimizer.java
@@ -66,7 +66,7 @@ public class RawUsageOptimizer {
         this.config = config;
     }
 
-    public RawUsageOptimizerResult getConsumableInArrearUsage(final LocalDate firstEventStartDate, final LocalDate targetDate, final Iterable<InvoiceItem> existingUsageItems, final Map<String, Usage> knownUsage, final InternalCallContext internalCallContext) {
+    public RawUsageOptimizerResult getInArrearUsage(final LocalDate firstEventStartDate, final LocalDate targetDate, final Iterable<InvoiceItem> existingUsageItems, final Map<String, Usage> knownUsage, final InternalCallContext internalCallContext) {
         final LocalDate targetStartDate = config.getMaxRawUsagePreviousPeriod(internalCallContext) > 0 ? getOptimizedRawUsageStartDate(firstEventStartDate, targetDate, existingUsageItems, knownUsage, internalCallContext) : firstEventStartDate;
         log.debug("ConsumableInArrear accountRecordId='{}', rawUsageStartDate='{}', firstEventStartDate='{}'",
                   internalCallContext.getAccountRecordId(), targetStartDate, firstEventStartDate);
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/usage/UsageUtils.java b/invoice/src/main/java/org/killbill/billing/invoice/usage/UsageUtils.java
index b3a1f5a..0a39859 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/usage/UsageUtils.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/usage/UsageUtils.java
@@ -16,28 +16,19 @@
 
 package org.killbill.billing.invoice.usage;
 
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
-import javax.annotation.Nullable;
-
 import org.killbill.billing.catalog.api.BillingMode;
+import org.killbill.billing.catalog.api.Limit;
 import org.killbill.billing.catalog.api.Tier;
 import org.killbill.billing.catalog.api.TieredBlock;
 import org.killbill.billing.catalog.api.Usage;
 import org.killbill.billing.catalog.api.UsageType;
-import org.killbill.billing.junction.BillingEvent;
-import org.killbill.billing.junction.BillingEventSet;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 public class UsageUtils {
@@ -72,4 +63,28 @@ public class UsageUtils {
         return result;
     }
 
+
+    public static List<Tier> getCapacityInArrearTier(final Usage usage) {
+
+        Preconditions.checkArgument(usage.getBillingMode() == BillingMode.IN_ARREAR && usage.getUsageType() == UsageType.CAPACITY);
+        Preconditions.checkArgument(usage.getTiers().length > 0);
+        return ImmutableList.copyOf(usage.getTiers());
+    }
+
+
+    public static Set<String> getCapacityInArrearUnitTypes(final Usage usage) {
+
+        Preconditions.checkArgument(usage.getBillingMode() == BillingMode.IN_ARREAR && usage.getUsageType() == UsageType.CAPACITY);
+        Preconditions.checkArgument(usage.getTiers().length > 0);
+
+        final Set<String> result = new HashSet<String>();
+        for (Tier tier : usage.getTiers()) {
+            for (Limit limit : tier.getLimits()) {
+                result.add(limit.getUnit().getName());
+            }
+        }
+        return result;
+    }
+
+
 }
diff --git a/invoice/src/main/resources/org/killbill/billing/invoice/dao/InvoiceParentChildrenSqlDao.sql.stg b/invoice/src/main/resources/org/killbill/billing/invoice/dao/InvoiceParentChildrenSqlDao.sql.stg
index e4f9726..2a96b7a 100644
--- a/invoice/src/main/resources/org/killbill/billing/invoice/dao/InvoiceParentChildrenSqlDao.sql.stg
+++ b/invoice/src/main/resources/org/killbill/billing/invoice/dao/InvoiceParentChildrenSqlDao.sql.stg
@@ -30,8 +30,16 @@ allTableValues() ::= <<
 
 getChildInvoicesByParentInvoiceId() ::= <<
    SELECT <allTableFields()>
-     FROM <tableName()>
-    WHERE parent_invoice_id = :parentInvoiceId
-    <AND_CHECK_TENANT()>
-    <defaultOrderBy()>
+   FROM <tableName()>
+   WHERE parent_invoice_id = :parentInvoiceId
+   <AND_CHECK_TENANT()>
+   <defaultOrderBy()>
+ >>
+
+getParentChildMappingsByChildInvoiceIds(ids) ::= <<
+   SELECT <allTableFields()>
+   FROM <tableName()>
+   WHERE child_invoice_id in (<ids: {id | :id_<i0>}; separator="," >)
+   <AND_CHECK_TENANT()>
+   <defaultOrderBy()>
  >>
diff --git a/invoice/src/main/resources/org/killbill/billing/invoice/dao/InvoiceSqlDao.sql.stg b/invoice/src/main/resources/org/killbill/billing/invoice/dao/InvoiceSqlDao.sql.stg
index 02de5dc..812767f 100644
--- a/invoice/src/main/resources/org/killbill/billing/invoice/dao/InvoiceSqlDao.sql.stg
+++ b/invoice/src/main/resources/org/killbill/billing/invoice/dao/InvoiceSqlDao.sql.stg
@@ -71,11 +71,11 @@ getParentDraftInvoice() ::= <<
    <defaultOrderBy()>
 >>
 
-getParentInvoiceByChildInvoiceId() ::= <<
-   SELECT <allTableFields("i.")>
-     FROM <tableName()> i
-     INNER JOIN invoice_parent_children ipc ON i.id = ipc.parent_invoice_id
-    WHERE ipc.child_invoice_id = :childInvoiceId
-   <AND_CHECK_TENANT("i.")>
-   <AND_CHECK_TENANT("ipc.")>
- >>
\ No newline at end of file
+getByIds(ids) ::= <<
+select
+  <allTableFields("t.")>
+from <tableName()> t
+where <idField("t.")> in (<ids: {id | :id_<i0>}; separator="," >)
+<AND_CHECK_TENANT("t.")>
+;
+>>
diff --git a/invoice/src/main/resources/org/killbill/billing/invoice/ddl.sql b/invoice/src/main/resources/org/killbill/billing/invoice/ddl.sql
index 606f8bc..52f4ee7 100644
--- a/invoice/src/main/resources/org/killbill/billing/invoice/ddl.sql
+++ b/invoice/src/main/resources/org/killbill/billing/invoice/ddl.sql
@@ -81,8 +81,6 @@ CREATE INDEX invoice_payments_payment_id ON invoice_payments(payment_id);
 CREATE INDEX invoice_payments_payment_cookie_id ON invoice_payments(payment_cookie_id);
 CREATE INDEX invoice_payments_tenant_account_record_id ON invoice_payments(tenant_record_id, account_record_id);
 
-
-
 DROP TABLE IF EXISTS invoice_parent_children;
 CREATE TABLE invoice_parent_children (
     record_id serial unique,
@@ -99,3 +97,4 @@ CREATE TABLE invoice_parent_children (
 CREATE UNIQUE INDEX invoice_parent_children_id ON invoice_parent_children(id);
 CREATE INDEX invoice_parent_children_invoice_id ON invoice_parent_children(parent_invoice_id);
 CREATE INDEX invoice_parent_children_tenant_account_record_id ON invoice_parent_children(tenant_record_id, account_record_id);
+CREATE INDEX invoice_parent_children_child_invoice_id ON invoice_parent_children(child_invoice_id);
diff --git a/invoice/src/main/resources/org/killbill/billing/invoice/migration/V20170222063630__invoice_parent_children_child_invoice_id_idx.sql b/invoice/src/main/resources/org/killbill/billing/invoice/migration/V20170222063630__invoice_parent_children_child_invoice_id_idx.sql
new file mode 100644
index 0000000..5b75e31
--- /dev/null
+++ b/invoice/src/main/resources/org/killbill/billing/invoice/migration/V20170222063630__invoice_parent_children_child_invoice_id_idx.sql
@@ -0,0 +1 @@
+alter table invoice_parent_children add index invoice_parent_children_child_invoice_id(child_invoice_id);
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/usage/TestContiguousIntervalCapacityInArrear.java b/invoice/src/test/java/org/killbill/billing/invoice/usage/TestContiguousIntervalCapacityInArrear.java
new file mode 100644
index 0000000..f64b95e
--- /dev/null
+++ b/invoice/src/test/java/org/killbill/billing/invoice/usage/TestContiguousIntervalCapacityInArrear.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.invoice.usage;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+import org.killbill.billing.catalog.DefaultLimit;
+import org.killbill.billing.catalog.DefaultTier;
+import org.killbill.billing.catalog.DefaultTieredBlock;
+import org.killbill.billing.catalog.DefaultUnit;
+import org.killbill.billing.catalog.DefaultUsage;
+import org.killbill.billing.catalog.api.BillingPeriod;
+import org.killbill.billing.catalog.api.CatalogApiException;
+import org.killbill.billing.catalog.api.Currency;
+import org.killbill.billing.catalog.api.Usage;
+import org.killbill.billing.invoice.api.InvoiceItem;
+import org.killbill.billing.invoice.model.FixedPriceInvoiceItem;
+import org.killbill.billing.invoice.model.UsageInvoiceItem;
+import org.killbill.billing.invoice.usage.ContiguousIntervalUsageInArrear.UsageInArrearItemsAndNextNotificationDate;
+import org.killbill.billing.junction.BillingEvent;
+import org.killbill.billing.usage.RawUsage;
+import org.killbill.billing.usage.api.RolledUpUnit;
+import org.killbill.billing.usage.api.RolledUpUsage;
+import org.killbill.billing.usage.api.svcs.DefaultRawUsage;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class TestContiguousIntervalCapacityInArrear extends TestUsageInArrearBase {
+
+    @BeforeClass(groups = "fast")
+    protected void beforeClass() throws Exception {
+        super.beforeClass();
+    }
+
+    @BeforeMethod(groups = "fast")
+    public void beforeMethod() {
+        super.beforeMethod();
+    }
+
+    @Test(groups = "fast")
+    public void testComputeToBeBilledUsage() {
+
+        final LocalDate startDate = new LocalDate(2014, 03, 20);
+        final LocalDate endDate = new LocalDate(2014, 04, 20);
+
+        final DefaultUnit unit = new DefaultUnit().setName("unit");
+        final DefaultLimit limit = new DefaultLimit().setUnit(unit).setMax((double) 100);
+
+        final DefaultTier tier = createDefaultTierWithLimits(BigDecimal.TEN, limit);
+
+        final DefaultUsage usage = createCapacityInArrearUsage(usageName, BillingPeriod.MONTHLY, tier);
+
+        final LocalDate targetDate = startDate.plusDays(1);
+
+        final ContiguousIntervalUsageInArrear intervalCapacityInArrear = createContiguousIntervalConsumableInArrear(usage, ImmutableList.<RawUsage>of(), targetDate, false,
+                                                                                                                      createMockBillingEvent(targetDate.toDateTimeAtStartOfDay(DateTimeZone.UTC),
+                                                                                                                                             BillingPeriod.MONTHLY,
+                                                                                                                                             Collections.<Usage>emptyList())
+                                                                                                                     );
+
+        final List<InvoiceItem> existingUsage = Lists.newArrayList();
+        final UsageInvoiceItem ii1 = new UsageInvoiceItem(invoiceId, accountId, bundleId, subscriptionId, planName, phaseName, usage.getName(), startDate, endDate, BigDecimal.TEN, currency);
+        existingUsage.add(ii1);
+        final UsageInvoiceItem ii2 = new UsageInvoiceItem(invoiceId, accountId, bundleId, subscriptionId, planName, phaseName, usage.getName(), startDate, endDate, BigDecimal.TEN, currency);
+        existingUsage.add(ii2);
+
+        // Will be ignored as is starts one day earlier.
+        final UsageInvoiceItem ii3 = new UsageInvoiceItem(invoiceId, accountId, bundleId, subscriptionId, planName, phaseName, usage.getName(), startDate.minusDays(1), endDate, BigDecimal.TEN, currency);
+        existingUsage.add(ii3);
+
+        // Will be ignored as it is for a different udsage section
+        final UsageInvoiceItem ii4 = new UsageInvoiceItem(invoiceId, accountId, bundleId, subscriptionId, planName, phaseName, "other", startDate, endDate, BigDecimal.TEN, currency);
+        existingUsage.add(ii4);
+
+        // Will be ignored because non usage item
+        final FixedPriceInvoiceItem ii5 = new FixedPriceInvoiceItem(invoiceId, accountId, bundleId, subscriptionId, planName, phaseName, startDate, BigDecimal.TEN, currency);
+        existingUsage.add(ii5);
+
+        final Iterable<InvoiceItem> billedItems = intervalCapacityInArrear.getBilledItems(startDate, endDate, existingUsage);
+        final BigDecimal result = intervalCapacityInArrear.computeBilledUsage(billedItems);
+        assertEquals(result.compareTo(BigDecimal.TEN.add(BigDecimal.TEN)), 0);
+    }
+
+    @Test(groups = "fast")
+    public void testComputeBilledUsage() throws CatalogApiException {
+
+
+        final DefaultUnit unit1 = new DefaultUnit().setName("unit1");
+        final DefaultUnit unit2 = new DefaultUnit().setName("unit2");
+
+        final DefaultLimit limit1_1 = new DefaultLimit().setUnit(unit1).setMax((double) 100).setMin((double) -1);
+        final DefaultLimit limit1_2 = new DefaultLimit().setUnit(unit2).setMax((double) 1000).setMin((double) -1);
+        final DefaultTier tier1 = createDefaultTierWithLimits(BigDecimal.TEN, limit1_1, limit1_2);
+
+        final DefaultLimit limit2_1 = new DefaultLimit().setUnit(unit1).setMax((double) 200).setMin((double) -1);
+        final DefaultLimit limit2_2 = new DefaultLimit().setUnit(unit2).setMax((double) 2000).setMin((double) -1);
+        final DefaultTier tier2 = createDefaultTierWithLimits(new BigDecimal("20.0"), limit2_1, limit2_2);
+
+        // Don't define any max for last tier to allow any number
+        final DefaultLimit limit3_1 = new DefaultLimit().setUnit(unit1).setMin((double) -1).setMax((double) -1);
+        final DefaultLimit limit3_2 = new DefaultLimit().setUnit(unit2).setMin((double) -1).setMax((double) -1);
+        final DefaultTier tier3 = createDefaultTierWithLimits(new BigDecimal("30.0"), limit3_1, limit3_2);
+
+
+        final DefaultUsage usage = createCapacityInArrearUsage(usageName, BillingPeriod.MONTHLY, tier1, tier2, tier3);
+
+        final LocalDate targetDate = new LocalDate(2014, 03, 20);
+
+        final ContiguousIntervalUsageInArrear intervalCapacityInArrear = createContiguousIntervalConsumableInArrear(usage, ImmutableList.<RawUsage>of(), targetDate, false,
+                                                                                                                      createMockBillingEvent(targetDate.toDateTimeAtStartOfDay(DateTimeZone.UTC),
+                                                                                                                                                  BillingPeriod.MONTHLY,
+                                                                                                                                                  Collections.<Usage>emptyList())
+                                                                                                                     );
+        // Tier 1 (both units from tier 1)
+        BigDecimal result = intervalCapacityInArrear.computeToBeBilledCapacityInArrear(ImmutableList.<RolledUpUnit>of(new DefaultRolledUpUnit("unit1", 100L),
+                                                                                                                      new DefaultRolledUpUnit("unit2", 1000L)));
+        assertEquals(result, BigDecimal.TEN);
+
+        // Tier 2 (only one unit from tier 1)
+        result = intervalCapacityInArrear.computeToBeBilledCapacityInArrear(ImmutableList.<RolledUpUnit>of(new DefaultRolledUpUnit("unit1", 100L),
+                                                                                                           new DefaultRolledUpUnit("unit2", 1001L)));
+        assertEquals(result, new BigDecimal("20.0"));
+
+        // Tier 2 (only one unit from tier 1)
+        result = intervalCapacityInArrear.computeToBeBilledCapacityInArrear(ImmutableList.<RolledUpUnit>of(new DefaultRolledUpUnit("unit1", 101L),
+                                                                                                           new DefaultRolledUpUnit("unit2", 1000L)));
+        assertEquals(result, new BigDecimal("20.0"));
+
+
+        // Tier 2 (both units from tier 2)
+        result = intervalCapacityInArrear.computeToBeBilledCapacityInArrear(ImmutableList.<RolledUpUnit>of(new DefaultRolledUpUnit("unit1", 101L),
+                                                                                                           new DefaultRolledUpUnit("unit2", 1001L)));
+        assertEquals(result, new BigDecimal("20.0"));
+
+        // Tier 3 (only one unit from tier 3)
+        result = intervalCapacityInArrear.computeToBeBilledCapacityInArrear(ImmutableList.<RolledUpUnit>of(new DefaultRolledUpUnit("unit1", 10L),
+                                                                                                           new DefaultRolledUpUnit("unit2", 2001L)));
+        assertEquals(result, new BigDecimal("30.0"));
+    }
+
+    @Test(groups = "fast")
+    public void testComputeMissingItems() throws CatalogApiException {
+
+        final LocalDate startDate = new LocalDate(2014, 03, 20);
+        final LocalDate firstBCDDate = new LocalDate(2014, 04, 15);
+        final LocalDate endDate = new LocalDate(2014, 05, 15);
+
+        // 2 items for startDate - firstBCDDate
+        final List<RawUsage> rawUsages = new ArrayList<RawUsage>();
+        rawUsages.add(new DefaultRawUsage(subscriptionId, new LocalDate(2014, 03, 20), "unit", 130L));
+        rawUsages.add(new DefaultRawUsage(subscriptionId, new LocalDate(2014, 03, 21), "unit", 271L));
+        // 1 items for firstBCDDate - endDate
+        rawUsages.add(new DefaultRawUsage(subscriptionId, new LocalDate(2014, 04, 15), "unit", 199L));
+
+        final DefaultUnit unit = new DefaultUnit().setName("unit");
+        final DefaultLimit limit = new DefaultLimit().setUnit(unit).setMax((double) -1);
+
+        final DefaultTier tier = createDefaultTierWithLimits(BigDecimal.TEN, limit);
+
+        final DefaultUsage usage = createCapacityInArrearUsage(usageName, BillingPeriod.MONTHLY, tier);
+
+
+        final LocalDate targetDate = endDate;
+
+        final BillingEvent event1 = createMockBillingEvent(startDate.toDateTimeAtStartOfDay(DateTimeZone.UTC),BillingPeriod.MONTHLY, Collections.<Usage>emptyList());
+        final BillingEvent event2 = createMockBillingEvent(endDate.toDateTimeAtStartOfDay(DateTimeZone.UTC), BillingPeriod.MONTHLY, Collections.<Usage>emptyList());
+
+        final ContiguousIntervalUsageInArrear intervalConsumableInArrear = createContiguousIntervalConsumableInArrear(usage, rawUsages, targetDate, true, event1, event2);
+
+        final List<InvoiceItem> invoiceItems = new ArrayList<InvoiceItem>();
+        final InvoiceItem ii1 = new UsageInvoiceItem(invoiceId, accountId, bundleId, subscriptionId, planName, phaseName, usage.getName(), startDate, firstBCDDate, BigDecimal.ONE, currency);
+        invoiceItems.add(ii1);
+
+        final InvoiceItem ii2 = new UsageInvoiceItem(invoiceId, accountId, bundleId, subscriptionId, planName, phaseName, usage.getName(), firstBCDDate, endDate, BigDecimal.ONE, currency);
+        invoiceItems.add(ii2);
+
+        final UsageInArrearItemsAndNextNotificationDate usageResult = intervalConsumableInArrear.computeMissingItemsAndNextNotificationDate(invoiceItems);
+        final List<InvoiceItem> rawResults = usageResult.getInvoiceItems();
+        assertEquals(rawResults.size(), 4);
+
+        final List<InvoiceItem> result = ImmutableList.copyOf(Iterables.filter(rawResults, new Predicate<InvoiceItem>() {
+            @Override
+            public boolean apply(final InvoiceItem input) {
+                return input.getAmount().compareTo(BigDecimal.ZERO) > 0;
+            }
+        }));
+
+
+        assertEquals(result.get(0).getAmount().compareTo(new BigDecimal("9.0")), 0, String.format("%s != 9.0", result.get(0).getAmount()));
+        assertEquals(result.get(0).getCurrency(), Currency.BTC);
+        assertEquals(result.get(0).getAccountId(), accountId);
+        assertEquals(result.get(0).getBundleId(), bundleId);
+        assertEquals(result.get(0).getSubscriptionId(), subscriptionId);
+        assertEquals(result.get(0).getPlanName(), planName);
+        assertEquals(result.get(0).getPhaseName(), phaseName);
+        assertEquals(result.get(0).getUsageName(), usage.getName());
+        assertTrue(result.get(0).getStartDate().compareTo(startDate) == 0);
+        assertTrue(result.get(0).getEndDate().compareTo(firstBCDDate) == 0);
+
+        assertEquals(result.get(1).getAmount().compareTo(new BigDecimal("9.0")), 0, String.format("%s != 9.0", result.get(0).getAmount()));
+        assertEquals(result.get(1).getCurrency(), Currency.BTC);
+        assertEquals(result.get(1).getAccountId(), accountId);
+        assertEquals(result.get(1).getBundleId(), bundleId);
+        assertEquals(result.get(1).getSubscriptionId(), subscriptionId);
+        assertEquals(result.get(1).getPlanName(), planName);
+        assertEquals(result.get(1).getPhaseName(), phaseName);
+        assertEquals(result.get(1).getUsageName(), usage.getName());
+        assertTrue(result.get(1).getStartDate().compareTo(firstBCDDate) == 0);
+        assertTrue(result.get(1).getEndDate().compareTo(endDate) == 0);
+    }
+
+
+
+}
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/usage/TestContiguousIntervalConsumableInArrear.java b/invoice/src/test/java/org/killbill/billing/invoice/usage/TestContiguousIntervalConsumableInArrear.java
index b7436b8..ea6ce5e 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/usage/TestContiguousIntervalConsumableInArrear.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/usage/TestContiguousIntervalConsumableInArrear.java
@@ -35,7 +35,7 @@ import org.killbill.billing.catalog.api.Usage;
 import org.killbill.billing.invoice.api.InvoiceItem;
 import org.killbill.billing.invoice.model.FixedPriceInvoiceItem;
 import org.killbill.billing.invoice.model.UsageInvoiceItem;
-import org.killbill.billing.invoice.usage.ContiguousIntervalConsumableInArrear.ConsumableInArrearItemsAndNextNotificationDate;
+import org.killbill.billing.invoice.usage.ContiguousIntervalUsageInArrear.UsageInArrearItemsAndNextNotificationDate;
 import org.killbill.billing.junction.BillingEvent;
 import org.killbill.billing.usage.RawUsage;
 import org.killbill.billing.usage.api.RolledUpUsage;
@@ -83,15 +83,15 @@ public class TestContiguousIntervalConsumableInArrear extends TestUsageInArrearB
         final LocalDate endDate = new LocalDate(2014, 04, 20);
 
         final DefaultTieredBlock block = createDefaultTieredBlock("unit", 100, 1000, BigDecimal.ONE);
-        final DefaultTier tier = createDefaultTier(block);
-        final DefaultUsage usage = createDefaultUsage(usageName, BillingPeriod.MONTHLY, tier);
+        final DefaultTier tier = createDefaultTierWithBlocks(block);
+        final DefaultUsage usage = createConsumableInArrearUsage(usageName, BillingPeriod.MONTHLY, tier);
 
         final LocalDate targetDate = startDate.plusDays(1);
-        final ContiguousIntervalConsumableInArrear intervalConsumableInArrear = createContiguousIntervalConsumableInArrear(usage, ImmutableList.<RawUsage>of(), targetDate, false,
-                                                                                                                           createMockBillingEvent(targetDate.toDateTimeAtStartOfDay(DateTimeZone.UTC),
+        final ContiguousIntervalUsageInArrear intervalConsumableInArrear = createContiguousIntervalConsumableInArrear(usage, ImmutableList.<RawUsage>of(), targetDate, false,
+                                                                                                                      createMockBillingEvent(targetDate.toDateTimeAtStartOfDay(DateTimeZone.UTC),
                                                                                                                                                   BillingPeriod.MONTHLY,
                                                                                                                                                   Collections.<Usage>emptyList())
-                                                                                                                          );
+                                                                                                                     );
 
         final List<InvoiceItem> existingUsage = Lists.newArrayList();
         final UsageInvoiceItem ii1 = new UsageInvoiceItem(invoiceId, accountId, bundleId, subscriptionId, planName, phaseName, usage.getName(), startDate, endDate, BigDecimal.TEN, currency);
@@ -119,21 +119,21 @@ public class TestContiguousIntervalConsumableInArrear extends TestUsageInArrearB
     public void testComputeBilledUsage() throws CatalogApiException {
 
         final DefaultTieredBlock block1 = createDefaultTieredBlock("unit", 100, 10, BigDecimal.ONE);
-        final DefaultTier tier1 = createDefaultTier(block1);
+        final DefaultTier tier1 = createDefaultTierWithBlocks(block1);
 
         final DefaultTieredBlock block2 = createDefaultTieredBlock("unit", 1000, 100, BigDecimal.ONE);
-        final DefaultTier tier2 = createDefaultTier(block2);
-        final DefaultUsage usage = createDefaultUsage(usageName, BillingPeriod.MONTHLY, tier1, tier2);
+        final DefaultTier tier2 = createDefaultTierWithBlocks(block2);
+        final DefaultUsage usage = createConsumableInArrearUsage(usageName, BillingPeriod.MONTHLY, tier1, tier2);
 
         final LocalDate targetDate = new LocalDate(2014, 03, 20);
 
-        final ContiguousIntervalConsumableInArrear intervalConsumableInArrear = createContiguousIntervalConsumableInArrear(usage, ImmutableList.<RawUsage>of(), targetDate, false,
-                                                                                                                           createMockBillingEvent(targetDate.toDateTimeAtStartOfDay(DateTimeZone.UTC),
+        final ContiguousIntervalUsageInArrear intervalConsumableInArrear = createContiguousIntervalConsumableInArrear(usage, ImmutableList.<RawUsage>of(), targetDate, false,
+                                                                                                                      createMockBillingEvent(targetDate.toDateTimeAtStartOfDay(DateTimeZone.UTC),
                                                                                                                                                   BillingPeriod.MONTHLY,
                                                                                                                                                   Collections.<Usage>emptyList())
-                                                                                                                          );
+                                                                                                                     );
 
-        final BigDecimal result = intervalConsumableInArrear.computeToBeBilledUsage(5325L, "unit");
+        final BigDecimal result = intervalConsumableInArrear.computeToBeBilledConsumableInArrear(new DefaultRolledUpUnit("unit", 5325L));
 
         // 5000 = 1000 (tier1) + 4325 (tier2) => 10 + 5 = 15
         assertEquals(result, new BigDecimal("15"));
@@ -154,15 +154,15 @@ public class TestContiguousIntervalConsumableInArrear extends TestUsageInArrearB
         rawUsages.add(new DefaultRawUsage(subscriptionId, new LocalDate(2014, 04, 15), "unit", 199L));
 
         final DefaultTieredBlock block = createDefaultTieredBlock("unit", 100, 10, BigDecimal.ONE);
-        final DefaultTier tier = createDefaultTier(block);
-        final DefaultUsage usage = createDefaultUsage(usageName, BillingPeriod.MONTHLY, tier);
+        final DefaultTier tier = createDefaultTierWithBlocks(block);
+        final DefaultUsage usage = createConsumableInArrearUsage(usageName, BillingPeriod.MONTHLY, tier);
 
         final LocalDate targetDate = endDate;
 
         final BillingEvent event1 = createMockBillingEvent(startDate.toDateTimeAtStartOfDay(DateTimeZone.UTC),BillingPeriod.MONTHLY, Collections.<Usage>emptyList());
         final BillingEvent event2 = createMockBillingEvent(endDate.toDateTimeAtStartOfDay(DateTimeZone.UTC), BillingPeriod.MONTHLY, Collections.<Usage>emptyList());
 
-        final ContiguousIntervalConsumableInArrear intervalConsumableInArrear = createContiguousIntervalConsumableInArrear(usage, rawUsages, targetDate, true, event1, event2);
+        final ContiguousIntervalUsageInArrear intervalConsumableInArrear = createContiguousIntervalConsumableInArrear(usage, rawUsages, targetDate, true, event1, event2);
 
         final List<InvoiceItem> invoiceItems = new ArrayList<InvoiceItem>();
         final InvoiceItem ii1 = new UsageInvoiceItem(invoiceId, accountId, bundleId, subscriptionId, planName, phaseName, usage.getName(), startDate, firstBCDDate, BigDecimal.ONE, currency);
@@ -171,7 +171,7 @@ public class TestContiguousIntervalConsumableInArrear extends TestUsageInArrearB
         final InvoiceItem ii2 = new UsageInvoiceItem(invoiceId, accountId, bundleId, subscriptionId, planName, phaseName, usage.getName(), firstBCDDate, endDate, BigDecimal.ONE, currency);
         invoiceItems.add(ii2);
 
-        final ConsumableInArrearItemsAndNextNotificationDate usageResult = intervalConsumableInArrear.computeMissingItemsAndNextNotificationDate(invoiceItems);
+        final UsageInArrearItemsAndNextNotificationDate usageResult = intervalConsumableInArrear.computeMissingItemsAndNextNotificationDate(invoiceItems);
         final List<InvoiceItem> rawResults = usageResult.getInvoiceItems();
         assertEquals(rawResults.size(), 4);
 
@@ -213,10 +213,10 @@ public class TestContiguousIntervalConsumableInArrear extends TestUsageInArrearB
 
         final DefaultTieredBlock tieredBlock1 = createDefaultTieredBlock("unit", 100, 1000, BigDecimal.ONE);
         final DefaultTieredBlock tieredBlock2 = createDefaultTieredBlock("unit2", 10, 1000, BigDecimal.ONE);
-        final DefaultTier tier = createDefaultTier(tieredBlock1, tieredBlock2);
+        final DefaultTier tier = createDefaultTierWithBlocks(tieredBlock1, tieredBlock2);
 
 
-        final DefaultUsage usage = createDefaultUsage(usageName, BillingPeriod.MONTHLY, tier);
+        final DefaultUsage usage = createConsumableInArrearUsage(usageName, BillingPeriod.MONTHLY, tier);
 
 
         final LocalDate t0 = new LocalDate(2015, 03, BCD);
@@ -253,7 +253,7 @@ public class TestContiguousIntervalConsumableInArrear extends TestUsageInArrearB
 
         final List<RawUsage> rawUsage = ImmutableList.of(raw1, raw2, raw3, raw4, oraw1, raw5, raw6);
 
-        final ContiguousIntervalConsumableInArrear intervalConsumableInArrear = createContiguousIntervalConsumableInArrear(usage, rawUsage, targetDate, true, eventT0, eventT1, eventT2, eventT3);
+        final ContiguousIntervalUsageInArrear intervalConsumableInArrear = createContiguousIntervalConsumableInArrear(usage, rawUsage, targetDate, true, eventT0, eventT1, eventT2, eventT3);
 
 
         final List<RolledUpUsage> unsortedRolledUpUsage =  intervalConsumableInArrear.getRolledUpUsage();
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/usage/TestRawUsageOptimizer.java b/invoice/src/test/java/org/killbill/billing/invoice/usage/TestRawUsageOptimizer.java
index 292ec21..38ab0f8 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/usage/TestRawUsageOptimizer.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/usage/TestRawUsageOptimizer.java
@@ -46,8 +46,8 @@ public class TestRawUsageOptimizer extends TestUsageInArrearBase {
 
         final Map<String, Usage> knownUsage = new HashMap<String, Usage>();
         final DefaultTieredBlock block = createDefaultTieredBlock("unit", 100, 1000, BigDecimal.ONE);
-        final DefaultTier tier = createDefaultTier(block);
-        final DefaultUsage usage = createDefaultUsage(usageName, BillingPeriod.MONTHLY, tier);
+        final DefaultTier tier = createDefaultTierWithBlocks(block);
+        final DefaultUsage usage = createConsumableInArrearUsage(usageName, BillingPeriod.MONTHLY, tier);
         knownUsage.put(usageName, usage);
 
         final LocalDate result = rawUsageOptimizer.getOptimizedRawUsageStartDate(firstEventStartDate, firstEventStartDate.plusDays(1), invoiceItems, knownUsage, internalCallContext);
@@ -65,8 +65,8 @@ public class TestRawUsageOptimizer extends TestUsageInArrearBase {
 
         final Map<String, Usage> knownUsage = new HashMap<String, Usage>();
         final DefaultTieredBlock block = createDefaultTieredBlock("unit", 100, 1000, BigDecimal.ONE);
-        final DefaultTier tier = createDefaultTier(block);
-        final DefaultUsage usage = createDefaultUsage(usageName, BillingPeriod.MONTHLY, tier);
+        final DefaultTier tier = createDefaultTierWithBlocks(block);
+        final DefaultUsage usage = createConsumableInArrearUsage(usageName, BillingPeriod.MONTHLY, tier);
         knownUsage.put(usageName, usage);
 
         final LocalDate result = rawUsageOptimizer.getOptimizedRawUsageStartDate(firstEventStartDate, targetDate, invoiceItems, knownUsage, internalCallContext);
@@ -88,8 +88,8 @@ public class TestRawUsageOptimizer extends TestUsageInArrearBase {
 
         final Map<String, Usage> knownUsage = new HashMap<String, Usage>();
         final DefaultTieredBlock block = createDefaultTieredBlock("unit", 100, 1000, BigDecimal.ONE);
-        final DefaultTier tier = createDefaultTier(block);
-        final DefaultUsage usage = createDefaultUsage(usageName, BillingPeriod.MONTHLY, tier);
+        final DefaultTier tier = createDefaultTierWithBlocks(block);
+        final DefaultUsage usage = createConsumableInArrearUsage(usageName, BillingPeriod.MONTHLY, tier);
         knownUsage.put(usageName, usage);
 
         final LocalDate result = rawUsageOptimizer.getOptimizedRawUsageStartDate(firstEventStartDate, targetDate, invoiceItems, knownUsage, internalCallContext);
@@ -110,13 +110,13 @@ public class TestRawUsageOptimizer extends TestUsageInArrearBase {
 
         final Map<String, Usage> knownUsage = new HashMap<String, Usage>();
         final DefaultTieredBlock block = createDefaultTieredBlock("unit", 100, 1000, BigDecimal.ONE);
-        final DefaultTier tier = createDefaultTier(block);
-        final DefaultUsage usage = createDefaultUsage(usageName, BillingPeriod.MONTHLY, tier);
+        final DefaultTier tier = createDefaultTierWithBlocks(block);
+        final DefaultUsage usage = createConsumableInArrearUsage(usageName, BillingPeriod.MONTHLY, tier);
         knownUsage.put(usageName, usage);
 
         final DefaultTieredBlock block2 = createDefaultTieredBlock("unit2", 10, 10000, BigDecimal.TEN);
-        final DefaultTier tier2 = createDefaultTier(block2);
-        final DefaultUsage usage2 = createDefaultUsage("usageName2", BillingPeriod.ANNUAL, tier2);
+        final DefaultTier tier2 = createDefaultTierWithBlocks(block2);
+        final DefaultUsage usage2 = createConsumableInArrearUsage("usageName2", BillingPeriod.ANNUAL, tier2);
         knownUsage.put("usageName2", usage2);
 
         final LocalDate result = rawUsageOptimizer.getOptimizedRawUsageStartDate(firstEventStartDate, targetDate, invoiceItems, knownUsage, internalCallContext);
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/usage/TestSubscriptionConsumableInArrear.java b/invoice/src/test/java/org/killbill/billing/invoice/usage/TestSubscriptionConsumableInArrear.java
index 19a4b45..3e6b03a 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/usage/TestSubscriptionConsumableInArrear.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/usage/TestSubscriptionConsumableInArrear.java
@@ -52,13 +52,13 @@ public class TestSubscriptionConsumableInArrear extends TestUsageInArrearBase {
 
         final String usageName1 = "erw";
         final DefaultTieredBlock block1 = createDefaultTieredBlock("unit", 100, 10, BigDecimal.ONE);
-        final DefaultTier tier1 = createDefaultTier(block1);
-        final Usage usage1 = createDefaultUsage(usageName1, BillingPeriod.MONTHLY, tier1);
+        final DefaultTier tier1 = createDefaultTierWithBlocks(block1);
+        final Usage usage1 = createConsumableInArrearUsage(usageName1, BillingPeriod.MONTHLY, tier1);
 
         final String usageName2 = "hghg";
         final DefaultTieredBlock block2 = createDefaultTieredBlock("unit", 100, 10, BigDecimal.ONE);
-        final DefaultTier tier2 = createDefaultTier(block2);
-        final Usage usage2 = createDefaultUsage(usageName2, BillingPeriod.MONTHLY, tier2);
+        final DefaultTier tier2 = createDefaultTierWithBlocks(block2);
+        final Usage usage2 = createConsumableInArrearUsage(usageName2, BillingPeriod.MONTHLY, tier2);
 
         final DateTime dt1 = new DateTime(2013, 3, 23, 4, 34, 59, DateTimeZone.UTC);
         final BillingEvent evt1 = createMockBillingEvent(dt1, BillingPeriod.MONTHLY, ImmutableList.<Usage>builder().add(usage1).add(usage2).build());
@@ -74,8 +74,8 @@ public class TestSubscriptionConsumableInArrear extends TestUsageInArrearBase {
 
         LocalDate targetDate = new LocalDate(2013, 6, 23);
 
-        final SubscriptionConsumableInArrear foo = new SubscriptionConsumableInArrear(accountId, invoiceId, billingEvents, ImmutableList.<RawUsage>of(), targetDate, new LocalDate(dt1, DateTimeZone.UTC), internalCallContext);
-        final List<ContiguousIntervalConsumableInArrear> result = foo.computeInArrearUsageInterval();
+        final SubscriptionUsageInArrear foo = new SubscriptionUsageInArrear(accountId, invoiceId, billingEvents, ImmutableList.<RawUsage>of(), targetDate, new LocalDate(dt1, DateTimeZone.UTC), internalCallContext);
+        final List<ContiguousIntervalUsageInArrear> result = foo.computeInArrearUsageInterval();
         assertEquals(result.size(), 3);
 
         assertEquals(result.get(0).getUsage().getName(), usageName2);
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/usage/TestUsageInArrearBase.java b/invoice/src/test/java/org/killbill/billing/invoice/usage/TestUsageInArrearBase.java
index 838854d..d766b3a 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/usage/TestUsageInArrearBase.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/usage/TestUsageInArrearBase.java
@@ -26,6 +26,7 @@ import org.joda.time.DateTimeZone;
 import org.joda.time.LocalDate;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.catalog.DefaultInternationalPrice;
+import org.killbill.billing.catalog.DefaultLimit;
 import org.killbill.billing.catalog.DefaultPrice;
 import org.killbill.billing.catalog.DefaultTier;
 import org.killbill.billing.catalog.DefaultTieredBlock;
@@ -71,8 +72,8 @@ public abstract class TestUsageInArrearBase extends InvoiceTestSuiteNoDB {
         currency = Currency.BTC;
     }
 
-    protected ContiguousIntervalConsumableInArrear createContiguousIntervalConsumableInArrear(final DefaultUsage usage, final List<RawUsage> rawUsages, final LocalDate targetDate, final boolean closedInterval, final BillingEvent... events) {
-        final ContiguousIntervalConsumableInArrear intervalConsumableInArrear = new ContiguousIntervalConsumableInArrear(usage, accountId, invoiceId, rawUsages, targetDate, new LocalDate(events[0].getEffectiveDate()), internalCallContext);
+    protected ContiguousIntervalUsageInArrear createContiguousIntervalConsumableInArrear(final DefaultUsage usage, final List<RawUsage> rawUsages, final LocalDate targetDate, final boolean closedInterval, final BillingEvent... events) {
+        final ContiguousIntervalUsageInArrear intervalConsumableInArrear = new ContiguousIntervalUsageInArrear(usage, accountId, invoiceId, rawUsages, targetDate, new LocalDate(events[0].getEffectiveDate()), internalCallContext);
         for (final BillingEvent event : events) {
             intervalConsumableInArrear.addBillingEvent(event);
         }
@@ -80,7 +81,7 @@ public abstract class TestUsageInArrearBase extends InvoiceTestSuiteNoDB {
         return intervalConsumableInArrear;
     }
 
-    protected DefaultUsage createDefaultUsage(final String usageName, final BillingPeriod billingPeriod, final DefaultTier... tiers) {
+    protected DefaultUsage createConsumableInArrearUsage(final String usageName, final BillingPeriod billingPeriod, final DefaultTier... tiers) {
         final DefaultUsage usage = new DefaultUsage();
         usage.setName(usageName);
         usage.setBillingMode(BillingMode.IN_ARREAR);
@@ -90,12 +91,33 @@ public abstract class TestUsageInArrearBase extends InvoiceTestSuiteNoDB {
         return usage;
     }
 
-    protected DefaultTier createDefaultTier(final DefaultTieredBlock... blocks) {
+    protected DefaultUsage createCapacityInArrearUsage(final String usageName, final BillingPeriod billingPeriod, final DefaultTier... tiers) {
+        final DefaultUsage usage = new DefaultUsage();
+        usage.setName(usageName);
+        usage.setBillingMode(BillingMode.IN_ARREAR);
+        usage.setUsageType(UsageType.CAPACITY);
+        usage.setBillingPeriod(billingPeriod);
+        usage.setTiers(tiers);
+        return usage;
+    }
+
+    protected DefaultTier createDefaultTierWithBlocks(final DefaultTieredBlock... blocks) {
         final DefaultTier tier = new DefaultTier();
         tier.setBlocks(blocks);
         return tier;
     }
 
+    protected DefaultTier createDefaultTierWithLimits(final BigDecimal recurringAmountInCurrency, final DefaultLimit... limits) {
+        final DefaultTier tier = new DefaultTier();
+        tier.setLimits(limits);
+
+        final DefaultPrice[] prices = new DefaultPrice[1];
+        prices[0] = new DefaultPrice().setCurrency(currency).setValue(recurringAmountInCurrency);
+        final DefaultInternationalPrice price = new DefaultInternationalPrice().setPrices(prices);
+        tier.setRecurringPrice(price);
+        return tier;
+    }
+
     protected DefaultTieredBlock createDefaultTieredBlock(final String unit, final int size, final int max, final BigDecimal btcPrice) {
         final DefaultTieredBlock block = new DefaultTieredBlock();
         block.setUnit(new DefaultUnit().setName(unit));

jaxrs/pom.xml 2(+1 -1)

diff --git a/jaxrs/pom.xml b/jaxrs/pom.xml
index 5a0b837..b76c36c 100644
--- a/jaxrs/pom.xml
+++ b/jaxrs/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-jaxrs</artifactId>
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/json/PaymentMethodJson.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/json/PaymentMethodJson.java
index 77a5acd..6a6d997 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/json/PaymentMethodJson.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/json/PaymentMethodJson.java
@@ -33,6 +33,7 @@ import org.killbill.billing.payment.api.PluginProperty;
 import org.killbill.billing.util.audit.AccountAuditLogs;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
@@ -66,6 +67,7 @@ public class PaymentMethodJson extends JsonBase {
         this.pluginInfo = pluginInfo;
     }
 
+
     public static PaymentMethodJson toPaymentMethodJson(final Account account, final PaymentMethod in, @Nullable final AccountAuditLogs accountAuditLogs) {
         final boolean isDefault = account.getPaymentMethodId() != null && account.getPaymentMethodId().equals(in.getId());
         final PaymentMethodPlugin pluginDetail = in.getPluginDetail();
@@ -185,6 +187,7 @@ public class PaymentMethodJson extends JsonBase {
         return externalKey;
     }
 
+
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder("PaymentMethodJson{");
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/json/SubscriptionJson.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/json/SubscriptionJson.java
index 5e17d23..b3f9a9e 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/json/SubscriptionJson.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/json/SubscriptionJson.java
@@ -349,32 +349,32 @@ public class SubscriptionJson extends JsonBase {
         final List<SubscriptionEvent> subscriptionEvents = subscription.getSubscriptionEvents();
         final SubscriptionEvent firstEvent = subscriptionEvents.isEmpty() ? null : subscriptionEvents.get(0);
         if (subscription.getLastActiveProduct() == null) {
-            this.productName = firstEvent == null ? null : firstEvent.getNextProduct().getName();
+            this.productName = (firstEvent == null || firstEvent.getNextProduct() == null) ? null : firstEvent.getNextProduct().getName();
         } else {
             this.productName = subscription.getLastActiveProduct().getName();
         }
         if (subscription.getLastActiveProductCategory() == null) {
-            this.productCategory = firstEvent == null ? null : firstEvent.getNextProduct().getCategory().name();
+            this.productCategory = (firstEvent == null || firstEvent.getNextProduct() == null) ? null : firstEvent.getNextProduct().getCategory().name();
         } else {
             this.productCategory = subscription.getLastActiveProductCategory().name();
         }
         if (subscription.getLastActivePlan() == null) {
-            this.billingPeriod = firstEvent == null ? null : firstEvent.getNextPlan().getRecurringBillingPeriod().name();
+            this.billingPeriod = (firstEvent == null || firstEvent.getNextPlan() == null) ? null : firstEvent.getNextPlan().getRecurringBillingPeriod().name();
         } else {
             this.billingPeriod = subscription.getLastActivePlan().getRecurringBillingPeriod().toString();
         }
         if (subscription.getLastActivePhase() == null) {
-            this.phaseType = firstEvent == null ? null : firstEvent.getNextPhase().getPhaseType().name();
+            this.phaseType = (firstEvent == null || firstEvent.getNextPhase() == null) ? null : firstEvent.getNextPhase().getPhaseType().name();
         } else {
             this.phaseType = subscription.getLastActivePhase().getPhaseType().toString();
         }
         if (subscription.getLastActivePriceList() == null) {
-            this.priceList = firstEvent == null ? null : firstEvent.getNextPriceList().getName();
+            this.priceList = (firstEvent == null || firstEvent.getNextPriceList() == null) ? null : firstEvent.getNextPriceList().getName();
         } else {
             this.priceList = subscription.getLastActivePriceList().getName();
         }
         if (subscription.getLastActivePlan() == null) {
-            this.planName = firstEvent == null ? null : firstEvent.getNextPlan().getName();
+            this.planName = (firstEvent == null || firstEvent.getNextPlan() == null) ? null : firstEvent.getNextPlan().getName();
         } else {
             this.planName = subscription.getLastActivePlan().getName();
         }
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java
index b391d41..27168ba 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AdminResource.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -20,10 +20,7 @@ package org.killbill.billing.jaxrs.resources;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.Iterator;
 import java.util.UUID;
 
 import javax.annotation.Nullable;
@@ -90,7 +87,6 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
 import com.google.inject.Singleton;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
@@ -152,6 +148,7 @@ public class AdminResource extends JaxRsResourceBase {
                                     @QueryParam("serviceName") final String serviceName,
                                     @QueryParam("withHistory") @DefaultValue("true") final Boolean withHistory,
                                     @QueryParam("minDate") final String minDateOrNull,
+                                    @QueryParam("maxDate") final String maxDateOrNull,
                                     @QueryParam("withInProcessing") @DefaultValue("true") final Boolean withInProcessing,
                                     @QueryParam("withBusEvents") @DefaultValue("true") final Boolean withBusEvents,
                                     @QueryParam("withNotifications") @DefaultValue("true") final Boolean withNotifications,
@@ -161,36 +158,59 @@ public class AdminResource extends JaxRsResourceBase {
         final Long accountRecordId = Strings.isNullOrEmpty(accountIdStr) ? null : recordIdApi.getRecordId(UUID.fromString(accountIdStr), ObjectType.ACCOUNT, tenantContext);
 
         // Limit search results by default
-        final DateTime minDate = Strings.isNullOrEmpty(minDateOrNull) ? clock.getUTCNow().minusMonths(2) : DATE_TIME_FORMATTER.parseDateTime(minDateOrNull).toDateTime(DateTimeZone.UTC);
+        final DateTime minDate = Strings.isNullOrEmpty(minDateOrNull) ? clock.getUTCNow().minusDays(2) : DATE_TIME_FORMATTER.parseDateTime(minDateOrNull).toDateTime(DateTimeZone.UTC);
+        final DateTime maxDate = Strings.isNullOrEmpty(maxDateOrNull) ? clock.getUTCNow().plusDays(2) : DATE_TIME_FORMATTER.parseDateTime(maxDateOrNull).toDateTime(DateTimeZone.UTC);
 
         final StreamingOutput json = new StreamingOutput() {
             @Override
             public void write(final OutputStream output) throws IOException, WebApplicationException {
-                final JsonGenerator generator = mapper.getFactory().createGenerator(output);
-                generator.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
+                Iterator<BusEventWithMetadata<BusEvent>> busEventsIterator = null;
+                Iterator<NotificationEventWithMetadata<NotificationEvent>> notificationsIterator = null;
+
+                try {
+                    final JsonGenerator generator = mapper.getFactory().createGenerator(output);
+                    generator.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
+
+                    generator.writeStartObject();
+
+                    if (withBusEvents) {
+                        generator.writeFieldName("busEvents");
+                        generator.writeStartArray();
+                        busEventsIterator = getBusEvents(withInProcessing, withHistory, minDate, maxDate, accountRecordId, tenantRecordId).iterator();
+                        while (busEventsIterator.hasNext()) {
+                            final BusEventWithMetadata<BusEvent> busEvent = busEventsIterator.next();
+                            generator.writeObject(new BusEventWithRichMetadata(busEvent));
+                        }
+                        generator.writeEndArray();
+                    }
 
-                generator.writeStartObject();
+                    if (withNotifications) {
+                        generator.writeFieldName("notifications");
+                        generator.writeStartArray();
 
-                if (withBusEvents) {
-                    generator.writeFieldName("busEvents");
-                    generator.writeStartArray();
-                    for (final BusEventWithMetadata<BusEvent> busEvent : getBusEvents(withInProcessing, withHistory, minDate, accountRecordId, tenantRecordId)) {
-                        generator.writeObject(new BusEventWithRichMetadata(busEvent));
+                        notificationsIterator = getNotifications(queueName, serviceName, withInProcessing, withHistory, minDate, maxDate, accountRecordId, tenantRecordId).iterator();
+                        while (notificationsIterator.hasNext()) {
+                            final NotificationEventWithMetadata<NotificationEvent> notification = notificationsIterator.next();
+                            generator.writeObject(notification);
+                        }
+                        generator.writeEndArray();
                     }
-                    generator.writeEndArray();
-                }
 
-                if (withNotifications) {
-                    generator.writeFieldName("notifications");
-                    generator.writeStartArray();
-                    for (final NotificationEventWithMetadata<NotificationEvent> notification : getNotifications(queueName, serviceName, withInProcessing, withHistory, minDate, accountRecordId, tenantRecordId)) {
-                        generator.writeObject(notification);
+                    generator.writeEndObject();
+                    generator.close();
+                } finally {
+                    // In case the client goes away (IOException), make sure to close the underlying DB connection
+                    if (busEventsIterator != null) {
+                        while (busEventsIterator.hasNext()) {
+                            busEventsIterator.next();
+                        }
+                    }
+                    if (notificationsIterator != null) {
+                        while (notificationsIterator.hasNext()) {
+                            notificationsIterator.next();
+                        }
                     }
-                    generator.writeEndArray();
                 }
-
-                generator.writeEndObject();
-                generator.close();
             }
         };
 
@@ -245,28 +265,37 @@ public class AdminResource extends JaxRsResourceBase {
 
         // TODO Consider adding a real invoice API post 0.18.x
         final Pagination<Tag> tags = tagUserApi.searchTags(SystemTags.PARK_TAG_DEFINITION_NAME, offset, limit, callContext);
+        final Iterator<Tag> iterator = tags.iterator();
 
         final StreamingOutput json = new StreamingOutput() {
             @Override
             public void write(final OutputStream output) throws IOException, WebApplicationException {
-                final JsonGenerator generator = mapper.getFactory().createGenerator(output);
-                generator.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
-
-                generator.writeStartObject();
-                for (final Tag tag : tags) {
-                    final UUID accountId = tag.getObjectId();
-                    try {
-                        invoiceUserApi.triggerInvoiceGeneration(accountId, clock.getUTCToday(), null, callContext);
-                        generator.writeStringField(accountId.toString(), OK);
-                    } catch (final InvoiceApiException e) {
-                        if (e.getCode() != ErrorCode.INVOICE_NOTHING_TO_DO.getCode()) {
-                            log.warn("Unable to trigger invoice generation for accountId='{}'", accountId);
+                try {
+                    final JsonGenerator generator = mapper.getFactory().createGenerator(output);
+                    generator.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
+
+                    generator.writeStartObject();
+                    while (iterator.hasNext()) {
+                        final Tag tag = iterator.next();
+                        final UUID accountId = tag.getObjectId();
+                        try {
+                            invoiceUserApi.triggerInvoiceGeneration(accountId, clock.getUTCToday(), null, callContext);
+                            generator.writeStringField(accountId.toString(), OK);
+                        } catch (final InvoiceApiException e) {
+                            if (e.getCode() != ErrorCode.INVOICE_NOTHING_TO_DO.getCode()) {
+                                log.warn("Unable to trigger invoice generation for accountId='{}'", accountId);
+                            }
+                            generator.writeStringField(accountId.toString(), ErrorCode.fromCode(e.getCode()).toString());
                         }
-                        generator.writeStringField(accountId.toString(), ErrorCode.fromCode(e.getCode()).toString());
+                    }
+                    generator.writeEndObject();
+                    generator.close();
+                } finally {
+                    // In case the client goes away (IOException), make sure to close the underlying DB connection
+                    while (iterator.hasNext()) {
+                        iterator.next();
                     }
                 }
-                generator.writeEndObject();
-                generator.close();
             }
         };
 
@@ -391,9 +420,10 @@ public class AdminResource extends JaxRsResourceBase {
                                                                                         final boolean includeInProcessing,
                                                                                         final boolean includeHistory,
                                                                                         @Nullable final DateTime minEffectiveDate,
+                                                                                        @Nullable final DateTime maxEffectiveDate,
                                                                                         @Nullable final Long accountRecordId,
                                                                                         final Long tenantRecordId) {
-        final Collection<NotificationEventWithMetadata<NotificationEvent>> notifications = new LinkedList<NotificationEventWithMetadata<NotificationEvent>>();
+        Iterable<NotificationEventWithMetadata<NotificationEvent>> notifications = ImmutableList.<NotificationEventWithMetadata<NotificationEvent>>of();
         for (final NotificationQueue notificationQueue : notificationQueueService.getNotificationQueues()) {
             if (queueName != null && !queueName.equals(notificationQueue.getQueueName())) {
                 continue;
@@ -401,75 +431,76 @@ public class AdminResource extends JaxRsResourceBase {
                 continue;
             }
 
-            final List<NotificationEventWithMetadata<NotificationEvent>> notificationsForQueue;
             if (includeInProcessing) {
                 if (accountRecordId != null) {
-                    notificationsForQueue = notificationQueue.getFutureOrInProcessingNotificationForSearchKeys(accountRecordId, tenantRecordId);
+                    notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notifications,
+                                                                                                       notificationQueue.getFutureOrInProcessingNotificationForSearchKeys(accountRecordId, tenantRecordId));
                 } else {
-                    notificationsForQueue = notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(tenantRecordId);
+                    notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notifications,
+                                                                                                       notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(maxEffectiveDate, tenantRecordId));
                 }
             } else {
                 if (accountRecordId != null) {
-                    notificationsForQueue = notificationQueue.getFutureNotificationForSearchKeys(accountRecordId, tenantRecordId);
+                    notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notifications,
+                                                                                                       notificationQueue.getFutureNotificationForSearchKeys(accountRecordId, tenantRecordId));
                 } else {
-                    notificationsForQueue = notificationQueue.getFutureNotificationForSearchKey2(tenantRecordId);
+                    notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notifications,
+                                                                                                       notificationQueue.getFutureNotificationForSearchKey2(maxEffectiveDate, tenantRecordId));
                 }
             }
 
-            notifications.addAll(notificationsForQueue);
-
             if (includeHistory) {
                 if (accountRecordId != null) {
-                    notifications.addAll(notificationQueue.getHistoricalNotificationForSearchKeys(accountRecordId, tenantRecordId));
+                    notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notificationQueue.getHistoricalNotificationForSearchKeys(accountRecordId, tenantRecordId),
+                                                                                                       notifications);
                 } else {
-                    notifications.addAll(notificationQueue.getHistoricalNotificationForSearchKey2(minEffectiveDate, tenantRecordId));
+                    notifications = Iterables.<NotificationEventWithMetadata<NotificationEvent>>concat(notificationQueue.getHistoricalNotificationForSearchKey2(minEffectiveDate, tenantRecordId),
+                                                                                                       notifications);
                 }
             }
         }
 
-        return Ordering.<NotificationEventWithMetadata<NotificationEvent>>from(new Comparator<NotificationEventWithMetadata<NotificationEvent>>() {
-            @Override
-            public int compare(final NotificationEventWithMetadata<NotificationEvent> o1, final NotificationEventWithMetadata<NotificationEvent> o2) {
-                final int effectiveDateComparison = o1.getEffectiveDate().compareTo(o2.getEffectiveDate());
-                return effectiveDateComparison == 0 ? o1.getRecordId().compareTo(o2.getRecordId()) : effectiveDateComparison;
-            }
-        }).sortedCopy(notifications);
+        // Note: entries are properly ordered by queue, but not cross queues unfortunately
+        return notifications;
     }
 
     private Iterable<BusEventWithMetadata<BusEvent>> getBusEvents(final boolean includeInProcessing,
                                                                   final boolean includeHistory,
                                                                   @Nullable final DateTime minCreatedDate,
+                                                                  @Nullable final DateTime maxCreatedDate,
                                                                   @Nullable final Long accountRecordId,
                                                                   final Long tenantRecordId) {
-        final Collection<BusEventWithMetadata<BusEvent>> busEvents = new LinkedList<BusEventWithMetadata<BusEvent>>();
+        Iterable<BusEventWithMetadata<BusEvent>> busEvents = ImmutableList.<BusEventWithMetadata<BusEvent>>of();
         if (includeInProcessing) {
             if (accountRecordId != null) {
-                busEvents.addAll(persistentBus.getAvailableOrInProcessingBusEventsForSearchKeys(accountRecordId, tenantRecordId));
+                busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(busEvents,
+                                                                             persistentBus.getAvailableOrInProcessingBusEventsForSearchKeys(accountRecordId, tenantRecordId));
             } else {
-                busEvents.addAll(persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(tenantRecordId));
+                busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(busEvents,
+                                                                             persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(maxCreatedDate, tenantRecordId));
             }
         } else {
             if (accountRecordId != null) {
-                busEvents.addAll(persistentBus.getAvailableBusEventsForSearchKeys(accountRecordId, tenantRecordId));
+                busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(busEvents,
+                                                                             persistentBus.getAvailableBusEventsForSearchKeys(accountRecordId, tenantRecordId));
             } else {
-                busEvents.addAll(persistentBus.getAvailableBusEventsForSearchKey2(tenantRecordId));
+                busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(busEvents,
+                                                                             persistentBus.getAvailableBusEventsForSearchKey2(maxCreatedDate, tenantRecordId));
             }
         }
 
         if (includeHistory) {
             if (accountRecordId != null) {
-                busEvents.addAll(persistentBus.getHistoricalBusEventsForSearchKeys(accountRecordId, tenantRecordId));
+                busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(persistentBus.getHistoricalBusEventsForSearchKeys(accountRecordId, tenantRecordId),
+                                                                             busEvents);
             } else {
-                busEvents.addAll(persistentBus.getHistoricalBusEventsForSearchKey2(minCreatedDate, tenantRecordId));
+                busEvents = Iterables.<BusEventWithMetadata<BusEvent>>concat(persistentBus.getHistoricalBusEventsForSearchKey2(minCreatedDate, tenantRecordId),
+                                                                             busEvents);
             }
         }
 
-        return Ordering.<BusEventWithMetadata<BusEvent>>from(new Comparator<BusEventWithMetadata<BusEvent>>() {
-            @Override
-            public int compare(final BusEventWithMetadata<BusEvent> o1, final BusEventWithMetadata<BusEvent> o2) {
-                return o1.getRecordId().compareTo(o2.getRecordId());
-            }
-        }).sortedCopy(busEvents);
+        // Note: entries are properly ordered by queue, but not cross queues unfortunately
+        return busEvents;
     }
 
     private class BusEventWithRichMetadata extends BusEventWithMetadata<BusEvent> {
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/ComboPaymentResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/ComboPaymentResource.java
index b2299ad..1dc5bdf 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/ComboPaymentResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/ComboPaymentResource.java
@@ -79,7 +79,13 @@ public abstract class ComboPaymentResource extends JaxRsResourceBase {
         return accountUserApi.createAccount(accountJson.toAccount(null), callContext);
     }
 
-    protected UUID getOrCreatePaymentMethod(final Account account, final PaymentMethodJson paymentMethodJson, final Iterable<PluginProperty> pluginProperties, final CallContext callContext) throws PaymentApiException {
+    protected UUID getOrCreatePaymentMethod(final Account account, @Nullable final PaymentMethodJson paymentMethodJson, final Iterable<PluginProperty> pluginProperties, final CallContext callContext) throws PaymentApiException {
+
+        // No info about payment method was passed, we default to null payment Method ID (which is allowed to be overridden in payment control plugins)
+        if (paymentMethodJson == null || paymentMethodJson.getPluginName() == null) {
+            return null;
+        }
+
         // Get all payment methods for account
         final List<PaymentMethod> accountPaymentMethods = paymentApi.getAccountPaymentMethods(account.getId(), false, ImmutableList.<PluginProperty>of(), callContext);
 
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxRsResourceBase.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxRsResourceBase.java
index c61ada1..2a3da40 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxRsResourceBase.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/JaxRsResourceBase.java
@@ -314,18 +314,28 @@ public abstract class JaxRsResourceBase implements JaxrsResource {
         final StreamingOutput json = new StreamingOutput() {
             @Override
             public void write(final OutputStream output) throws IOException, WebApplicationException {
-                final JsonGenerator generator = mapper.getFactory().createJsonGenerator(output);
-                generator.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
-
-                generator.writeStartArray();
-                for (final E entity : entities) {
-                    final J asJson = toJson.apply(entity);
-                    if (asJson != null) {
-                        generator.writeObject(asJson);
+                final Iterator<E> iterator = entities.iterator();
+
+                try {
+                    final JsonGenerator generator = mapper.getFactory().createGenerator(output);
+                    generator.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
+
+                    generator.writeStartArray();
+                    while (iterator.hasNext()) {
+                        final E entity = iterator.next();
+                        final J asJson = toJson.apply(entity);
+                        if (asJson != null) {
+                            generator.writeObject(asJson);
+                        }
+                    }
+                    generator.writeEndArray();
+                    generator.close();
+                } finally {
+                    // In case the client goes away (IOException), make sure to close the underlying DB connection
+                    while (iterator.hasNext()) {
+                        iterator.next();
                     }
                 }
-                generator.writeEndArray();
-                generator.close();
             }
         };
 
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
index 9e1f704..261238a 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/TestResource.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2015 Groupon, Inc
- * Copyright 2014-2015 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -18,9 +18,6 @@
 
 package org.killbill.billing.jaxrs.resources;
 
-import java.util.Collection;
-import java.util.List;
-
 import javax.inject.Inject;
 import javax.servlet.ServletRequest;
 import javax.servlet.http.HttpServletRequest;
@@ -61,9 +58,7 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -237,30 +232,27 @@ public class TestResource extends JaxRsResourceBase {
     }
 
     private boolean areAllNotificationsProcessed(final Long tenantRecordId) {
-        final Collection<NotificationQueue> filtered = ImmutableList.<NotificationQueue>copyOf(Collections2.<NotificationQueue>filter(notificationQueueService.getNotificationQueues(),
-                                                                                                                                      new Predicate<NotificationQueue>() {
-                                                                                                                                          @Override
-                                                                                                                                          public boolean apply(final NotificationQueue notificationQueue) {
-                                                                                                                                              for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(tenantRecordId)) {
-                                                                                                                                                  if (!notificationEvent.getEffectiveDate().isAfter(clock.getUTCNow())) {
-                                                                                                                                                      return true;
-                                                                                                                                                  }
-                                                                                                                                              }
-                                                                                                                                              return false;
-                                                                                                                                          }
-                                                                                                                                      }));
-        if (!filtered.isEmpty()) {
-            log.info("TestResource: {} queue(s) with more notification(s) to process", filtered.size());
+        int nbNotifications = 0;
+        for (final NotificationQueue notificationQueue : notificationQueueService.getNotificationQueues()) {
+            for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationQueue.getFutureOrInProcessingNotificationForSearchKey2(null, tenantRecordId)) {
+                if (!notificationEvent.getEffectiveDate().isAfter(clock.getUTCNow())) {
+                    nbNotifications += 1;
+                }
+            }
+        }
+        if (nbNotifications != 0) {
+            log.info("TestResource: {} queue(s) with more notification(s) to process", nbNotifications);
         }
-        return filtered.isEmpty();
+        return nbNotifications == 0;
     }
 
     private boolean areAllBusEventsProcessed(final Long tenantRecordId) {
-        final List<BusEventWithMetadata<BusEvent>> availableBusEventForSearchKey2 = persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(tenantRecordId);
-        if (!availableBusEventForSearchKey2.isEmpty()) {
-            log.info("TestResource: at least {} more bus event(s) to process", availableBusEventForSearchKey2.size());
+        final Iterable<BusEventWithMetadata<BusEvent>> availableBusEventForSearchKey2 = persistentBus.getAvailableOrInProcessingBusEventsForSearchKey2(null, tenantRecordId);
+        final int nbBusEvents = Iterables.<BusEventWithMetadata<BusEvent>>size(availableBusEventForSearchKey2);
+        if (nbBusEvents != 0) {
+            log.info("TestResource: at least {} more bus event(s) to process", nbBusEvents);
         }
-        return availableBusEventForSearchKey2.isEmpty();
+        return nbBusEvents == 0;
     }
 
     private ClockMock getClockMock() {

junction/pom.xml 2(+1 -1)

diff --git a/junction/pom.xml b/junction/pom.xml
index a8335a3..6caedca 100644
--- a/junction/pom.xml
+++ b/junction/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-junction</artifactId>

NEWS 3(+3 -0)

diff --git a/NEWS b/NEWS
index d841538..1ba9b8e 100644
--- a/NEWS
+++ b/NEWS
@@ -1,3 +1,6 @@
+0.18.4
+    See https://github.com/killbill/killbill/releases/tag/killbill-0.18.4
+
 0.18.3
     See https://github.com/killbill/killbill/releases/tag/killbill-0.18.3
 

overdue/pom.xml 2(+1 -1)

diff --git a/overdue/pom.xml b/overdue/pom.xml
index 4f3e728..4cd0b83 100644
--- a/overdue/pom.xml
+++ b/overdue/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-overdue</artifactId>
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java b/overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java
index 463d08c..d06c167 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/notification/DefaultOverduePosterBase.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -18,7 +18,6 @@
 
 package org.killbill.billing.overdue.notification;
 
-import java.util.Collection;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
@@ -66,8 +65,8 @@ public abstract class DefaultOverduePosterBase implements OverduePoster {
                 public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
                     // Check if we already have notifications for that key
                     final Class<T> clazz = (Class<T>) notificationKey.getClass();
-                    final Collection<NotificationEventWithMetadata<T>> futureNotifications = getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, overdueQueue,
-                                                                                                                                           clazz, context);
+                    final Iterable<NotificationEventWithMetadata<T>> futureNotifications = getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, overdueQueue,
+                                                                                                                                         clazz, context);
 
                     final boolean shouldInsertNewNotification = cleanupFutureNotificationsFormTransaction(entitySqlDaoWrapperFactory, futureNotifications, futureNotificationTime, overdueQueue);
                     if (shouldInsertNewNotification) {
@@ -92,8 +91,8 @@ public abstract class DefaultOverduePosterBase implements OverduePoster {
             transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
                 @Override
                 public Void inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
-                    final Collection<NotificationEventWithMetadata<T>> futureNotifications = getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, checkOverdueQueue,
-                                                                                                                                           clazz, context);
+                    final Iterable<NotificationEventWithMetadata<T>> futureNotifications = getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, checkOverdueQueue,
+                                                                                                                                         clazz, context);
                     for (final NotificationEventWithMetadata<T> notification : futureNotifications) {
                         checkOverdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), notification.getRecordId());
                     }
@@ -107,15 +106,15 @@ public abstract class DefaultOverduePosterBase implements OverduePoster {
     }
 
     @VisibleForTesting
-    <T extends OverdueCheckNotificationKey> Collection<NotificationEventWithMetadata<T>> getFutureNotificationsForAccountInTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
-                                                                                                                                       final NotificationQueue checkOverdueQueue,
-                                                                                                                                       final Class<T> clazz,
-                                                                                                                                       final InternalCallContext context) {
+    <T extends OverdueCheckNotificationKey> Iterable<NotificationEventWithMetadata<T>> getFutureNotificationsForAccountInTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
+                                                                                                                                     final NotificationQueue checkOverdueQueue,
+                                                                                                                                     final Class<T> clazz,
+                                                                                                                                     final InternalCallContext context) {
         return checkOverdueQueue.getFutureNotificationFromTransactionForSearchKeys(context.getAccountRecordId(), context.getTenantRecordId(), entitySqlDaoWrapperFactory.getHandle().getConnection());
     }
 
     protected abstract <T extends OverdueCheckNotificationKey> boolean cleanupFutureNotificationsFormTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
-                                                                                                                 final Collection<NotificationEventWithMetadata<T>> futureNotifications,
+                                                                                                                 final Iterable<NotificationEventWithMetadata<T>> futureNotifications,
                                                                                                                  final DateTime futureNotificationTime, final NotificationQueue overdueQueue);
 
 }
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java b/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java
index d6b7a44..124fe69 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueAsyncBusPoster.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -18,8 +18,6 @@
 
 package org.killbill.billing.overdue.notification;
 
-import java.util.Collection;
-
 import org.joda.time.DateTime;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
@@ -31,6 +29,7 @@ import org.killbill.notificationq.api.NotificationQueue;
 import org.killbill.notificationq.api.NotificationQueueService;
 import org.skife.jdbi.v2.IDBI;
 
+import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 
 public class OverdueAsyncBusPoster extends DefaultOverduePosterBase {
@@ -44,12 +43,13 @@ public class OverdueAsyncBusPoster extends DefaultOverduePosterBase {
 
     @Override
     protected <T extends OverdueCheckNotificationKey> boolean cleanupFutureNotificationsFormTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
-                                                                                                        final Collection<NotificationEventWithMetadata<T>> futureNotifications,
+                                                                                                        final Iterable<NotificationEventWithMetadata<T>> futureNotifications,
                                                                                                         final DateTime futureNotificationTime,
                                                                                                         final NotificationQueue overdueQueue) {
         // If we already have notification for that account we don't insert the new one
         // Note that this is slightly incorrect because we could for instance already have a REFRESH and insert a CLEAR, but if that were the case,
         // if means overdue state would change very rapidly and the behavior would anyway be non deterministic
-        return futureNotifications.isEmpty();
+        // Note: don't use isEmpty() to go through all results to close the connection
+        return Iterables.<NotificationEventWithMetadata<T>>size(futureNotifications) == 0;
     }
 }
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueCheckPoster.java b/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueCheckPoster.java
index 1119e53..25d7eff 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueCheckPoster.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/notification/OverdueCheckPoster.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -18,8 +18,6 @@
 
 package org.killbill.billing.overdue.notification;
 
-import java.util.Collection;
-
 import org.joda.time.DateTime;
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
@@ -44,32 +42,31 @@ public class OverdueCheckPoster extends DefaultOverduePosterBase {
 
     @Override
     protected <T extends OverdueCheckNotificationKey> boolean cleanupFutureNotificationsFormTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory,
-                                                                                                        final Collection<NotificationEventWithMetadata<T>> futureNotifications,
+                                                                                                        final Iterable<NotificationEventWithMetadata<T>> futureNotifications,
                                                                                                         final DateTime futureNotificationTime, final NotificationQueue overdueQueue) {
 
         boolean shouldInsertNewNotification = true;
-        if (!futureNotifications.isEmpty()) {
+        int minIndexToDeleteFrom = 0;
+        int index = 0;
+        for (final NotificationEventWithMetadata<T> cur : futureNotifications) {
             // Results are ordered by effective date asc
-            final DateTime earliestExistingNotificationDate = futureNotifications.iterator().next().getEffectiveDate();
-
-            final int minIndexToDeleteFrom;
-            if (earliestExistingNotificationDate.isBefore(futureNotificationTime)) {
-                // We don't have to insert a new one. For sanity, delete any other future notification
-                minIndexToDeleteFrom = 1;
-                shouldInsertNewNotification = false;
-            } else {
-                // We win - we are before any other already recorded. Delete all others.
-                minIndexToDeleteFrom = 0;
+            if (index == 0) {
+                if (cur.getEffectiveDate().isBefore(futureNotificationTime)) {
+                    // We don't have to insert a new one. For sanity, delete any other future notification
+                    minIndexToDeleteFrom = 1;
+                    shouldInsertNewNotification = false;
+                } else {
+                    // We win - we are before any other already recorded. Delete all others.
+                    minIndexToDeleteFrom = 0;
+                }
             }
 
-            int index = 0;
-            for (final NotificationEventWithMetadata<T> cur : futureNotifications) {
-                if (minIndexToDeleteFrom <= index) {
-                    overdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), cur.getRecordId());
-                }
-                index++;
+            if (minIndexToDeleteFrom <= index) {
+                overdueQueue.removeNotificationFromTransaction(entitySqlDaoWrapperFactory.getHandle().getConnection(), cur.getRecordId());
             }
+            index++;
         }
+
         return shouldInsertNewNotification;
     }
 }
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/wrapper/OverdueWrapper.java b/overdue/src/main/java/org/killbill/billing/overdue/wrapper/OverdueWrapper.java
index 0c1e151..56db945 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/wrapper/OverdueWrapper.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/wrapper/OverdueWrapper.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -19,7 +19,7 @@
 package org.killbill.billing.overdue.wrapper;
 
 import org.joda.time.DateTime;
-import org.killbill.billing.account.api.ImmutableAccountData;
+import org.killbill.billing.account.api.Account;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.entitlement.api.BlockingState;
@@ -52,7 +52,7 @@ public class OverdueWrapper {
     // Should we introduce a config?
     private static final int MAX_LOCK_RETRIES = 50;
 
-    private final ImmutableAccountData overdueable;
+    private final Account overdueable;
     private final BlockingInternalApi api;
     private final GlobalLocker locker;
     private final Clock clock;
@@ -61,7 +61,7 @@ public class OverdueWrapper {
     private final OverdueStateApplicator overdueStateApplicator;
     private final InternalCallContextFactory internalCallContextFactory;
 
-    public OverdueWrapper(final ImmutableAccountData overdueable,
+    public OverdueWrapper(final Account overdueable,
                           final BlockingInternalApi api,
                           final OverdueStateSet overdueStateSet,
                           final GlobalLocker locker,
diff --git a/overdue/src/main/java/org/killbill/billing/overdue/wrapper/OverdueWrapperFactory.java b/overdue/src/main/java/org/killbill/billing/overdue/wrapper/OverdueWrapperFactory.java
index aa43f69..eb21b00 100644
--- a/overdue/src/main/java/org/killbill/billing/overdue/wrapper/OverdueWrapperFactory.java
+++ b/overdue/src/main/java/org/killbill/billing/overdue/wrapper/OverdueWrapperFactory.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -21,9 +21,9 @@ package org.killbill.billing.overdue.wrapper;
 import java.util.UUID;
 
 import org.joda.time.Period;
+import org.killbill.billing.account.api.Account;
 import org.killbill.billing.account.api.AccountApiException;
 import org.killbill.billing.account.api.AccountInternalApi;
-import org.killbill.billing.account.api.ImmutableAccountData;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.junction.BlockingInternalApi;
 import org.killbill.billing.overdue.api.OverdueApiException;
@@ -76,13 +76,13 @@ public class OverdueWrapperFactory {
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
-    public OverdueWrapper createOverdueWrapperFor(final ImmutableAccountData blockable, final InternalTenantContext context) throws OverdueException {
+    public OverdueWrapper createOverdueWrapperFor(final Account blockable, final InternalTenantContext context) throws OverdueException {
         return new OverdueWrapper(blockable, api, getOverdueStateSet(context), locker, clock, billingStateCalculator, overdueStateApplicator, internalCallContextFactory);
     }
 
     public OverdueWrapper createOverdueWrapperFor(final UUID id, final InternalTenantContext context) throws OverdueException {
         try {
-            final ImmutableAccountData account = accountApi.getImmutableAccountDataById(id, context);
+            final Account account = accountApi.getAccountById(id, context);
             return new OverdueWrapper(account, api, getOverdueStateSet(context), locker, clock, billingStateCalculator, overdueStateApplicator, internalCallContextFactory);
         } catch (final AccountApiException e) {
             throw new OverdueException(e);
diff --git a/overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java b/overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java
index 03e91fa..3f8cd00 100644
--- a/overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java
+++ b/overdue/src/test/java/org/killbill/billing/overdue/notification/TestDefaultOverdueCheckPoster.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -17,24 +19,26 @@
 package org.killbill.billing.overdue.notification;
 
 import java.io.IOException;
-import java.util.Collection;
+import java.util.List;
 import java.util.UUID;
 
 import org.joda.time.DateTime;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
 import org.killbill.billing.account.api.Account;
-import org.killbill.notificationq.api.NotificationEventWithMetadata;
-import org.killbill.notificationq.api.NotificationQueue;
 import org.killbill.billing.overdue.OverdueTestSuiteWithEmbeddedDB;
 import org.killbill.billing.overdue.service.DefaultOverdueService;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
 import org.killbill.billing.util.jackson.ObjectMapper;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 
 public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedDB {
 
@@ -68,7 +72,7 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
         insertOverdueCheckAndVerifyQueueContent(overdueable, 15, 5);
 
         // Verify the final content of the queue
-        Assert.assertEquals(overdueQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()).size(), 1);
+        Assert.assertEquals(Iterables.<NotificationEventWithMetadata>size(overdueQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId())), 1);
     }
 
     private void insertOverdueCheckAndVerifyQueueContent(final Account account, final int nbDaysInFuture, final int expectedNbDaysInFuture) throws IOException {
@@ -77,18 +81,18 @@ public class TestDefaultOverdueCheckPoster extends OverdueTestSuiteWithEmbeddedD
         final OverdueCheckNotificationKey notificationKey = new OverdueCheckNotificationKey(account.getId());
         checkPoster.insertOverdueNotification(account.getId(), futureNotificationTime, OverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE, notificationKey, internalCallContext);
 
-        final Collection<NotificationEventWithMetadata<OverdueCheckNotificationKey>> notificationsForKey = getNotificationsForOverdueable(account);
+        final List<NotificationEventWithMetadata<OverdueCheckNotificationKey>> notificationsForKey = getNotificationsForOverdueable(account);
         Assert.assertEquals(notificationsForKey.size(), 1);
-        final NotificationEventWithMetadata nm = notificationsForKey.iterator().next();
+        final NotificationEventWithMetadata nm = notificationsForKey.get(0);
         Assert.assertEquals(nm.getEvent(), notificationKey);
         Assert.assertEquals(nm.getEffectiveDate(), testReferenceTime.plusDays(expectedNbDaysInFuture));
     }
 
-    private Collection<NotificationEventWithMetadata<OverdueCheckNotificationKey>> getNotificationsForOverdueable(final Account account) {
-        return entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Collection<NotificationEventWithMetadata<OverdueCheckNotificationKey>>>() {
+    private List<NotificationEventWithMetadata<OverdueCheckNotificationKey>> getNotificationsForOverdueable(final Account account) {
+        return entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<List<NotificationEventWithMetadata<OverdueCheckNotificationKey>>>() {
             @Override
-            public Collection<NotificationEventWithMetadata<OverdueCheckNotificationKey>> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
-                return ((OverdueCheckPoster)checkPoster).getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, overdueQueue, OverdueCheckNotificationKey.class, internalCallContext);
+            public List<NotificationEventWithMetadata<OverdueCheckNotificationKey>> inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
+                return ImmutableList.<NotificationEventWithMetadata<OverdueCheckNotificationKey>>copyOf(((OverdueCheckPoster) checkPoster).getFutureNotificationsForAccountInTransaction(entitySqlDaoWrapperFactory, overdueQueue, OverdueCheckNotificationKey.class, internalCallContext));
             }
         });
     }
diff --git a/overdue/src/test/java/org/killbill/billing/overdue/TestOverdueHelper.java b/overdue/src/test/java/org/killbill/billing/overdue/TestOverdueHelper.java
index f805d5d..135e86f 100644
--- a/overdue/src/test/java/org/killbill/billing/overdue/TestOverdueHelper.java
+++ b/overdue/src/test/java/org/killbill/billing/overdue/TestOverdueHelper.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -26,9 +26,9 @@ import java.util.UUID;
 import org.joda.time.DateTimeZone;
 import org.joda.time.LocalDate;
 import org.killbill.billing.ObjectType;
+import org.killbill.billing.account.api.Account;
 import org.killbill.billing.account.api.AccountApiException;
 import org.killbill.billing.account.api.AccountInternalApi;
-import org.killbill.billing.account.api.ImmutableAccountData;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.entitlement.api.BlockingState;
 import org.killbill.billing.invoice.api.Invoice;
@@ -121,13 +121,12 @@ public class TestOverdueHelper {
         Assert.assertEquals(result.isBlockBilling(), state.isDisableEntitlementAndChangesBlocked());
     }
 
-    public ImmutableAccountData createImmutableAccountData(final LocalDate dateOfLastUnPaidInvoice) throws SubscriptionBaseApiException, AccountApiException {
-
+    public Account createAccount(final LocalDate dateOfLastUnPaidInvoice) throws SubscriptionBaseApiException, AccountApiException {
         final UUID accountId = UUID.randomUUID();
-        final ImmutableAccountData account = Mockito.mock(ImmutableAccountData.class);
+        final Account account = Mockito.mock(Account.class);
         Mockito.when(account.getId()).thenReturn(accountId);
         Mockito.when(account.getTimeZone()).thenReturn(DateTimeZone.UTC);
-        Mockito.when(accountInternalApi.getImmutableAccountDataById(Mockito.eq(account.getId()), Mockito.<InternalTenantContext>any())).thenReturn(account);
+        Mockito.when(accountInternalApi.getAccountById(Mockito.eq(account.getId()), Mockito.<InternalTenantContext>any())).thenReturn(account);
 
         final Invoice invoice = Mockito.mock(Invoice.class);
         Mockito.when(invoice.getInvoiceDate()).thenReturn(dateOfLastUnPaidInvoice);
diff --git a/overdue/src/test/java/org/killbill/billing/overdue/wrapper/TestOverdueWrapper.java b/overdue/src/test/java/org/killbill/billing/overdue/wrapper/TestOverdueWrapper.java
index f495016..7b5c9a3 100644
--- a/overdue/src/test/java/org/killbill/billing/overdue/wrapper/TestOverdueWrapper.java
+++ b/overdue/src/test/java/org/killbill/billing/overdue/wrapper/TestOverdueWrapper.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2016 Groupon, Inc
+ * Copyright 2014-2016 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -19,8 +21,7 @@ package org.killbill.billing.overdue.wrapper;
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 
-import org.killbill.billing.account.api.ImmutableAccountData;
-import org.killbill.billing.junction.DefaultBlockingState;
+import org.killbill.billing.account.api.Account;
 import org.killbill.billing.overdue.OverdueTestSuiteWithEmbeddedDB;
 import org.killbill.billing.overdue.api.OverdueState;
 import org.killbill.billing.overdue.caching.MockOverdueConfigCache;
@@ -44,24 +45,24 @@ public class TestOverdueWrapper extends OverdueTestSuiteWithEmbeddedDB {
         final DefaultOverdueConfig config = XMLLoader.getObjectFromStreamNoValidation(is, DefaultOverdueConfig.class);
         ((MockOverdueConfigCache) overdueConfigCache).loadOverwriteDefaultOverdueConfig(config);
 
-        ImmutableAccountData account;
+        Account account;
         OverdueWrapper wrapper;
         OverdueState state;
 
         state = config.getOverdueStatesAccount().findState("OD1");
-        account = testOverdueHelper.createImmutableAccountData(clock.getUTCToday().minusDays(31));
+        account = testOverdueHelper.createAccount(clock.getUTCToday().minusDays(31));
         wrapper = overdueWrapperFactory.createOverdueWrapperFor(account, internalCallContext);
         wrapper.refresh(clock.getUTCNow(), internalCallContext);
         testOverdueHelper.checkStateApplied(state);
 
         state = config.getOverdueStatesAccount().findState("OD2");
-        account = testOverdueHelper.createImmutableAccountData(clock.getUTCToday().minusDays(41));
+        account = testOverdueHelper.createAccount(clock.getUTCToday().minusDays(41));
         wrapper = overdueWrapperFactory.createOverdueWrapperFor(account, internalCallContext);
         wrapper.refresh(clock.getUTCNow(), internalCallContext);
         testOverdueHelper.checkStateApplied(state);
 
         state = config.getOverdueStatesAccount().findState("OD3");
-        account = testOverdueHelper.createImmutableAccountData(clock.getUTCToday().minusDays(51));
+        account = testOverdueHelper.createAccount(clock.getUTCToday().minusDays(51));
         wrapper = overdueWrapperFactory.createOverdueWrapperFor(account, internalCallContext);
         wrapper.refresh(clock.getUTCNow(), internalCallContext);
         testOverdueHelper.checkStateApplied(state);
@@ -70,14 +71,14 @@ public class TestOverdueWrapper extends OverdueTestSuiteWithEmbeddedDB {
     @Test(groups = "slow")
     public void testWrapperNoConfig() throws Exception {
 
-        final ImmutableAccountData account;
+        final Account account;
         final OverdueWrapper wrapper;
         final OverdueState state;
 
         final InputStream is = new ByteArrayInputStream(testOverdueHelper.getConfigXml().getBytes());
         final DefaultOverdueConfig config = XMLLoader.getObjectFromStreamNoValidation(is, DefaultOverdueConfig.class);
         state = config.getOverdueStatesAccount().findState(OverdueWrapper.CLEAR_STATE_NAME);
-        account = testOverdueHelper.createImmutableAccountData(clock.getUTCToday().minusDays(31));
+        account = testOverdueHelper.createAccount(clock.getUTCToday().minusDays(31));
         wrapper = overdueWrapperFactory.createOverdueWrapperFor(account, internalCallContext);
         final OverdueState result = wrapper.refresh(clock.getUTCNow(), internalCallContext);
 

payment/pom.xml 2(+1 -1)

diff --git a/payment/pom.xml b/payment/pom.xml
index 3841404..0fc7d88 100644
--- a/payment/pom.xml
+++ b/payment/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-payment</artifactId>
diff --git a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentApi.java b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentApi.java
index 7a0c2bf..d52b6da 100644
--- a/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentApi.java
+++ b/payment/src/main/java/org/killbill/billing/payment/api/DefaultPaymentApi.java
@@ -125,7 +125,6 @@ public class DefaultPaymentApi extends DefaultApiBase implements PaymentApi {
         }
 
         checkNotNullParameter(account, "account");
-        checkNotNullParameter(paymentMethodId, "paymentMethodId");
         if (paymentId == null) {
             checkNotNullParameter(amount, "amount");
             checkNotNullParameter(currency, "currency");
@@ -565,6 +564,7 @@ public class DefaultPaymentApi extends DefaultApiBase implements PaymentApi {
             checkNotNullParameter(amount, "amount");
             checkNotNullParameter(currency, "currency");
         }
+        checkNotNullParameter(paymentMethodId, "paymentMethodId");
         checkNotNullParameter(properties, "plugin properties");
         checkExternalKeyLength(paymentTransactionExternalKey);
 
diff --git a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
index f4a0491..b35e99a 100644
--- a/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/org/killbill/billing/payment/core/PaymentProcessor.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -408,14 +408,14 @@ public class PaymentProcessor extends ProcessorBase {
     public void cancelScheduledPaymentTransaction(final UUID lastPaymentAttemptId, final InternalCallContext internalCallContext) throws PaymentApiException {
         try {
             final NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, DefaultRetryService.QUEUE_NAME);
-            final List<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
+            final Iterable<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
                     retryQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
 
             for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationEventWithMetadatas) {
                 if (((PaymentRetryNotificationKey) notificationEvent.getEvent()).getAttemptId().equals(lastPaymentAttemptId)) {
                     retryQueue.removeNotification(notificationEvent.getRecordId());
-                    break;
                 }
+                // Go through all results to close the connection
             }
         } catch (final NoSuchNotificationQueue noSuchNotificationQueue) {
             log.error("ERROR Loading Notification Queue - " + noSuchNotificationQueue.getMessage());
@@ -767,7 +767,7 @@ public class PaymentProcessor extends ProcessorBase {
         // Get Future Payment Attempts from Notification Queue and add them to the list
         try {
             final NotificationQueue retryQueue = notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, DefaultRetryService.QUEUE_NAME);
-            final List<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
+            final Iterable<NotificationEventWithMetadata<NotificationEvent>> notificationEventWithMetadatas =
                     retryQueue.getFutureNotificationForSearchKeys(internalTenantContext.getAccountRecordId(), internalTenantContext.getTenantRecordId());
 
             for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationEventWithMetadatas) {
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
index 020ff49..4d00d40 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/DefaultPaymentDao.java
@@ -53,6 +53,7 @@ import org.killbill.billing.util.dao.NonEntityDao;
 import org.killbill.billing.util.entity.Entity;
 import org.killbill.billing.util.entity.Pagination;
 import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper;
+import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.Ordering;
 import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.PaginationIteratorBuilder;
 import org.killbill.billing.util.entity.dao.EntityDaoBase;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
@@ -150,8 +151,8 @@ public class DefaultPaymentDao extends EntityDaoBase<PaymentModelDao, Payment, P
                                                       return sqlDao.getCountByStateNameAcrossTenants(stateName, createdBefore);
                                                   }
                                                   @Override
-                                                  public Iterator<PaymentAttemptModelDao> build(final PaymentAttemptSqlDao sqlDao, final Long limit, final InternalTenantContext context) {
-                                                      return sqlDao.getByStateNameAcrossTenants(stateName, createdBefore, offset, limit);
+                                                  public Iterator<PaymentAttemptModelDao> build(final PaymentAttemptSqlDao sqlDao, final Long offset, final Long limit, final Ordering ordering, final InternalTenantContext context) {
+                                                      return sqlDao.getByStateNameAcrossTenants(stateName, createdBefore, offset, limit, ordering.toString());
                                                   }
                                               },
                                               offset,
@@ -200,8 +201,8 @@ public class DefaultPaymentDao extends EntityDaoBase<PaymentModelDao, Payment, P
                                                   }
 
                                                   @Override
-                                                  public Iterator<PaymentTransactionModelDao> build(final TransactionSqlDao sqlDao, final Long limit, final InternalTenantContext context) {
-                                                      return sqlDao.getByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBefore, createdAfter, offset, limit);
+                                                  public Iterator<PaymentTransactionModelDao> build(final TransactionSqlDao sqlDao, final Long offset, final Long limit, final Ordering ordering, final InternalTenantContext context) {
+                                                      return sqlDao.getByTransactionStatusPriorDateAcrossTenants(allTransactionStatus, createdBefore, createdAfter, offset, limit, ordering.toString());
                                                   }
                                               },
                                               offset,
@@ -240,8 +241,8 @@ public class DefaultPaymentDao extends EntityDaoBase<PaymentModelDao, Payment, P
                                                   }
 
                                                   @Override
-                                                  public Iterator<PaymentModelDao> build(final PaymentSqlDao paymentSqlDao, final Long limit, final InternalTenantContext context) {
-                                                      final Iterator<PaymentModelDao> result = paymentSqlDao.getByPluginName(pluginName, offset, limit, context);
+                                                  public Iterator<PaymentModelDao> build(final PaymentSqlDao paymentSqlDao, final Long offset, final Long limit, final Ordering ordering, final InternalTenantContext context) {
+                                                      final Iterator<PaymentModelDao> result = paymentSqlDao.getByPluginName(pluginName, offset, limit, ordering.toString(), context);
                                                       return result;
                                                   }
                                               },
@@ -265,8 +266,8 @@ public class DefaultPaymentDao extends EntityDaoBase<PaymentModelDao, Payment, P
                                                   }
 
                                                   @Override
-                                                  public Iterator<PaymentModelDao> build(final PaymentSqlDao paymentSqlDao, final Long limit, final InternalTenantContext context) {
-                                                      return !paymentStates.isEmpty() ? paymentSqlDao.searchByState(paymentStates, offset, limit, context) : paymentSqlDao.search(searchKey, likeSearchKey, offset, limit, context);
+                                                  public Iterator<PaymentModelDao> build(final PaymentSqlDao paymentSqlDao, final Long offset, final Long limit, final Ordering ordering, final InternalTenantContext context) {
+                                                      return !paymentStates.isEmpty() ? paymentSqlDao.searchByState(paymentStates, offset, limit, ordering.toString(), context) : paymentSqlDao.search(searchKey, likeSearchKey, offset, limit, ordering.toString(), context);
                                                   }
                                               },
                                               offset,
@@ -492,8 +493,8 @@ public class DefaultPaymentDao extends EntityDaoBase<PaymentModelDao, Payment, P
                                                   }
 
                                                   @Override
-                                                  public Iterator<PaymentMethodModelDao> build(final PaymentMethodSqlDao paymentMethodSqlDao, final Long limit, final InternalTenantContext context) {
-                                                      return paymentMethodSqlDao.search(searchKey, String.format("%%%s%%", searchKey), offset, limit, context);
+                                                  public Iterator<PaymentMethodModelDao> build(final PaymentMethodSqlDao paymentMethodSqlDao, final Long offset, final Long limit, final Ordering ordering, final InternalTenantContext context) {
+                                                      return paymentMethodSqlDao.search(searchKey, String.format("%%%s%%", searchKey), offset, limit, ordering.toString(), context);
                                                   }
                                               },
                                               offset,
@@ -511,8 +512,8 @@ public class DefaultPaymentDao extends EntityDaoBase<PaymentModelDao, Payment, P
                                                   }
 
                                                   @Override
-                                                  public Iterator<PaymentMethodModelDao> build(final PaymentMethodSqlDao paymentMethodSqlDao, final Long limit, final InternalTenantContext context) {
-                                                      return paymentMethodSqlDao.getByPluginName(pluginName, offset, limit, context);
+                                                  public Iterator<PaymentMethodModelDao> build(final PaymentMethodSqlDao paymentMethodSqlDao, final Long offset, final Long limit, final Ordering ordering, final InternalTenantContext context) {
+                                                      return paymentMethodSqlDao.getByPluginName(pluginName, offset, limit, ordering.toString(), context);
                                                   }
                                               },
                                               offset,
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
index 4b25a00..d9aef5d 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.java
@@ -31,6 +31,7 @@ import org.skife.jdbi.v2.sqlobject.Bind;
 import org.skife.jdbi.v2.sqlobject.BindBean;
 import org.skife.jdbi.v2.sqlobject.SqlQuery;
 import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Define;
 
 @EntitySqlDaoStringTemplate
 public interface PaymentAttemptSqlDao extends EntitySqlDao<PaymentAttemptModelDao, Entity> {
@@ -66,6 +67,7 @@ public interface PaymentAttemptSqlDao extends EntitySqlDao<PaymentAttemptModelDa
     Iterator<PaymentAttemptModelDao> getByStateNameAcrossTenants(@Bind("stateName") final String stateName,
                                                                  @Bind("createdBeforeDate") final Date createdBeforeDate,
                                                                  @Bind("offset") final Long offset,
-                                                                 @Bind("rowCount") final Long rowCount);
+                                                                 @Bind("rowCount") final Long rowCount,
+                                                                 @Define("ordering") final String ordering);
 
 }
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentMethodSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentMethodSqlDao.java
index ec7cc3d..da14f3e 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentMethodSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentMethodSqlDao.java
@@ -34,6 +34,7 @@ import org.killbill.billing.util.audit.ChangeType;
 import org.killbill.billing.util.entity.dao.Audited;
 import org.killbill.billing.util.entity.dao.EntitySqlDao;
 import org.killbill.billing.util.entity.dao.EntitySqlDaoStringTemplate;
+import org.skife.jdbi.v2.sqlobject.customizers.Define;
 
 @EntitySqlDaoStringTemplate
 public interface PaymentMethodSqlDao extends EntitySqlDao<PaymentMethodModelDao, PaymentMethod> {
@@ -69,6 +70,7 @@ public interface PaymentMethodSqlDao extends EntitySqlDao<PaymentMethodModelDao,
     public Iterator<PaymentMethodModelDao> getByPluginName(@Bind("pluginName") final String pluginName,
                                                            @Bind("offset") final Long offset,
                                                            @Bind("rowCount") final Long rowCount,
+                                                           @Define("ordering") final String ordering,
                                                            @BindBean final InternalTenantContext context);
 
     @SqlQuery
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
index 23dcd3f..2be09a6 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/PaymentSqlDao.java
@@ -33,6 +33,7 @@ import org.skife.jdbi.v2.sqlobject.Bind;
 import org.skife.jdbi.v2.sqlobject.BindBean;
 import org.skife.jdbi.v2.sqlobject.SqlQuery;
 import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Define;
 
 @EntitySqlDaoStringTemplate
 public interface PaymentSqlDao extends EntitySqlDao<PaymentModelDao, Payment> {
@@ -71,6 +72,7 @@ public interface PaymentSqlDao extends EntitySqlDao<PaymentModelDao, Payment> {
     public Iterator<PaymentModelDao> searchByState(@PaymentStateCollectionBinder final Collection<String> paymentStates,
                                                    @Bind("offset") final Long offset,
                                                    @Bind("rowCount") final Long rowCount,
+                                                   @Define("ordering") final String ordering,
                                                    @BindBean final InternalTenantContext context);
 
     @SqlQuery
@@ -82,7 +84,8 @@ public interface PaymentSqlDao extends EntitySqlDao<PaymentModelDao, Payment> {
     public Iterator<PaymentModelDao> getByPluginName(@Bind("pluginName") final String pluginName,
                                                      @Bind("offset") final Long offset,
                                                      @Bind("rowCount") final Long rowCount,
-                                                           @BindBean final InternalTenantContext context);
+                                                     @Define("ordering") final String ordering,
+                                                     @BindBean final InternalTenantContext context);
 
     @SqlQuery
     public Long getCountByPluginName(@Bind("pluginName") final String pluginName,
diff --git a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
index d85ac3a..4f687aa 100644
--- a/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
+++ b/payment/src/main/java/org/killbill/billing/payment/dao/TransactionSqlDao.java
@@ -35,6 +35,7 @@ import org.skife.jdbi.v2.sqlobject.Bind;
 import org.skife.jdbi.v2.sqlobject.BindBean;
 import org.skife.jdbi.v2.sqlobject.SqlQuery;
 import org.skife.jdbi.v2.sqlobject.SqlUpdate;
+import org.skife.jdbi.v2.sqlobject.customizers.Define;
 
 @EntitySqlDaoStringTemplate
 public interface TransactionSqlDao extends EntitySqlDao<PaymentTransactionModelDao, PaymentTransaction> {
@@ -61,10 +62,11 @@ public interface TransactionSqlDao extends EntitySqlDao<PaymentTransactionModelD
 
     @SqlQuery
     Iterator<PaymentTransactionModelDao> getByTransactionStatusPriorDateAcrossTenants(@TransactionStatusCollectionBinder final Collection<String> statuses,
-                                                                                  @Bind("createdBeforeDate") final Date createdBeforeDate,
-                                                                                  @Bind("createdAfterDate") final Date createdAfterDate,
-                                                                                  @Bind("offset") final Long offset,
-                                                                                  @Bind("rowCount") final Long rowCount);
+                                                                                      @Bind("createdBeforeDate") final Date createdBeforeDate,
+                                                                                      @Bind("createdAfterDate") final Date createdAfterDate,
+                                                                                      @Bind("offset") final Long offset,
+                                                                                      @Bind("rowCount") final Long rowCount,
+                                                                                      @Define("ordering") final String ordering);
 
     @SqlQuery
     public List<PaymentTransactionModelDao> getByPaymentId(@Bind("paymentId") final UUID paymentId,
diff --git a/payment/src/main/java/org/killbill/billing/payment/invoice/InvoicePaymentControlPluginApi.java b/payment/src/main/java/org/killbill/billing/payment/invoice/InvoicePaymentControlPluginApi.java
index 81185d4..d027222 100644
--- a/payment/src/main/java/org/killbill/billing/payment/invoice/InvoicePaymentControlPluginApi.java
+++ b/payment/src/main/java/org/killbill/billing/payment/invoice/InvoicePaymentControlPluginApi.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -32,8 +32,8 @@ import org.joda.time.DateTime;
 import org.killbill.billing.ErrorCode;
 import org.killbill.billing.ObjectType;
 import org.killbill.billing.account.api.AccountApiException;
+import org.killbill.billing.account.api.AccountData;
 import org.killbill.billing.account.api.AccountInternalApi;
-import org.killbill.billing.account.api.ImmutableAccountData;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.callcontext.InternalTenantContext;
 import org.killbill.billing.catalog.api.Currency;
@@ -322,8 +322,8 @@ public final class InvoicePaymentControlPluginApi implements PaymentControlPlugi
                 return new DefaultPriorPaymentControlResult(true);
             }
 
-            // get immutable account and check if it is child and payment is delegated to parent => abort
-            final ImmutableAccountData accountData = accountApi.getImmutableAccountDataById(invoice.getAccountId(), internalContext);
+            // Get account and check if it is child and payment is delegated to parent => abort
+            final AccountData accountData = accountApi.getAccountById(invoice.getAccountId(), internalContext);
             if ((accountData != null) && (accountData.getParentAccountId() != null) && accountData.isPaymentDelegatedToParent()) {
                 return new DefaultPriorPaymentControlResult(true);
             }
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
index d8f6437..d8699fe 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentAttemptSqlDao.sql.stg
@@ -65,14 +65,14 @@ where payment_external_key = :paymentExternalKey
 >>
 
 /* Does not include tenant info, global */
-getByStateNameAcrossTenants() ::= <<
+getByStateNameAcrossTenants(ordering) ::= <<
 select
 <allTableFields("")>
 from <tableName()>
 where state_name = :stateName
 and created_date \< :createdBeforeDate
 <andCheckSoftDeletionWithComma("")>
-<defaultOrderBy()>
+order by <recordIdField()> <ordering>
 limit :rowCount offset :offset
 ;
 >>
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentMethodSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentMethodSqlDao.sql.stg
index bece059..67c65f1 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentMethodSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentMethodSqlDao.sql.stg
@@ -101,14 +101,14 @@ searchQuery(prefix) ::= <<
   or <prefix>plugin_name like :likeSearchKey
 >>
 
-getByPluginName() ::= <<
+getByPluginName(ordering) ::= <<
 select
 <allTableFields("t.")>
 from <tableName()> t
 where t.plugin_name = :pluginName
 <andCheckSoftDeletionWithComma("t.")>
 <AND_CHECK_TENANT("t.")>
-order by t.record_id
+order by t.record_id <ordering>
 limit :rowCount offset :offset
 ;
 >>
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentSqlDao.sql.stg
index bf30477..b15a04b 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/PaymentSqlDao.sql.stg
@@ -82,7 +82,7 @@ searchQuery(prefix) ::= <<
   or <prefix>external_key like :likeSearchKey
 >>
 
-searchByState(states) ::= <<
+searchByState(states, ordering) ::= <<
 select
 <allTableFields("t.")>
 from <tableName()> t
@@ -92,10 +92,10 @@ join (
   where state_name in (<states: {state | :state_<i0>}; separator="," >)
   <AND_CHECK_TENANT()>
   <andCheckSoftDeletionWithComma()>
-  order by <recordIdField()>
+  order by <recordIdField()> <ordering>
   limit :rowCount offset :offset
 ) optimization on <recordIdField("optimization.")> = <recordIdField("t.")>
-order by <recordIdField("t.")>
+order by <recordIdField("t.")> <ordering>
 ;
 >>
 
@@ -109,14 +109,14 @@ where t.state_name in (<states: {state | :state_<i0>}; separator="," >)
 ;
 >>
 
-getByPluginName() ::= <<
+getByPluginName(ordering) ::= <<
 select
 <allTableFields("t.")>
 from <tableName()> t
 join payment_methods pm on pm.id = t.payment_method_id
 where pm.plugin_name = :pluginName
 <AND_CHECK_TENANT("t.")>
-order by t.record_id asc
+order by t.record_id <ordering>
 limit :rowCount offset :offset
 ;
 >>
diff --git a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
index c00aee0..1f19f0b 100644
--- a/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
+++ b/payment/src/main/resources/org/killbill/billing/payment/dao/TransactionSqlDao.sql.stg
@@ -83,14 +83,14 @@ where payment_id = :paymentId
 
 
 /* Does not include AND_CHECK_TENANT() since this is a global operation */
-getByTransactionStatusPriorDateAcrossTenants(statuses) ::= <<
+getByTransactionStatusPriorDateAcrossTenants(statuses, ordering) ::= <<
 select <allTableFields()>
 from <tableName()>
 where
 created_date >= :createdAfterDate
 and created_date \< :createdBeforeDate
 and transaction_status in (<statuses: {status | :status_<i0>}; separator="," >)
-<defaultOrderBy()>
+order by <recordIdField()> <ordering>
 limit :rowCount offset :offset
 ;
 >>
diff --git a/payment/src/test/java/org/killbill/billing/payment/api/TestPaymentApiWithControl.java b/payment/src/test/java/org/killbill/billing/payment/api/TestPaymentApiWithControl.java
index 4ad4a59..545fcc4 100644
--- a/payment/src/test/java/org/killbill/billing/payment/api/TestPaymentApiWithControl.java
+++ b/payment/src/test/java/org/killbill/billing/payment/api/TestPaymentApiWithControl.java
@@ -94,7 +94,7 @@ public class TestPaymentApiWithControl extends PaymentTestSuiteWithEmbeddedDB {
     @Test(groups = "slow")
     public void testCreateAuthWithControl() throws PaymentApiException {
         final PaymentMethodPlugin paymentMethodInfo = new DefaultNoOpPaymentMethodPlugin(UUID.randomUUID().toString(), false, null);
-        final UUID newPaymentMethodId = paymentApi.addPaymentMethod(account, paymentMethodInfo.getExternalPaymentMethodId(), MockPaymentProviderPlugin.PLUGIN_NAME, false, paymentMethodInfo, ImmutableList.<PluginProperty>of(), callContext);
+        final UUID newPaymentMethodId = paymentApi.addPaymentMethod(account, null, MockPaymentProviderPlugin.PLUGIN_NAME, false, paymentMethodInfo, ImmutableList.<PluginProperty>of(), callContext);
         testPaymentControlPluginApi.setNewPaymentMethodId(newPaymentMethodId);
 
         final Payment payment = paymentApi.createAuthorizationWithPaymentControl(account, account.getPaymentMethodId(), null, BigDecimal.TEN, Currency.USD, UUID.randomUUID().toString(),
diff --git a/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
index 5a137ac..191ed5c 100644
--- a/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
+++ b/payment/src/test/java/org/killbill/billing/payment/core/janitor/TestIncompletePaymentTransactionTaskWithDB.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -40,6 +40,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 
 public class TestIncompletePaymentTransactionTaskWithDB extends PaymentTestSuiteWithEmbeddedDB {
 
@@ -77,7 +78,7 @@ public class TestIncompletePaymentTransactionTaskWithDB extends PaymentTestSuite
         final JanitorNotificationKey notificationKey = new JanitorNotificationKey(transactionId, incompletePaymentTransactionTask.getClass().toString(), 1);
         final UUID userToken = UUID.randomUUID();
 
-        Assert.assertTrue(incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()).isEmpty());
+        Assert.assertTrue(Iterables.<NotificationEventWithMetadata<NotificationEvent>>isEmpty(incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId())));
 
         GlobalLock lock = null;
         try {
@@ -85,16 +86,17 @@ public class TestIncompletePaymentTransactionTaskWithDB extends PaymentTestSuite
 
             incompletePaymentTransactionTask.processNotification(notificationKey, userToken, internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
 
-            final List<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
-            Assert.assertFalse(futureNotifications.isEmpty());
-            Assert.assertEquals(futureNotifications.get(0).getUserToken(), userToken);
-            Assert.assertEquals(futureNotifications.get(0).getEvent().getClass(), JanitorNotificationKey.class);
-            final JanitorNotificationKey event = (JanitorNotificationKey) futureNotifications.get(0).getEvent();
+            final Iterable<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = incompletePaymentTransactionTask.janitorQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+            Assert.assertFalse(Iterables.<NotificationEventWithMetadata<NotificationEvent>>isEmpty(futureNotifications));
+            final NotificationEventWithMetadata<NotificationEvent> notificationEventWithMetadata = ImmutableList.<NotificationEventWithMetadata<NotificationEvent>>copyOf(futureNotifications).get(0);
+            Assert.assertEquals(notificationEventWithMetadata.getUserToken(), userToken);
+            Assert.assertEquals(notificationEventWithMetadata.getEvent().getClass(), JanitorNotificationKey.class);
+            final JanitorNotificationKey event = (JanitorNotificationKey) notificationEventWithMetadata.getEvent();
             Assert.assertEquals(event.getUuidKey(), transactionId);
             Assert.assertEquals((int) event.getAttemptNumber(), 2);
 
             // Based on config "15s,1m,3m,1h,1d,1d,1d,1d,1d"
-            Assert.assertTrue(futureNotifications.get(0).getEffectiveDate().compareTo(clock.getUTCNow().plusMinutes(1).plusSeconds(1)) < 0);
+            Assert.assertTrue(notificationEventWithMetadata.getEffectiveDate().compareTo(clock.getUTCNow().plusMinutes(1).plusSeconds(1)) < 0);
         } catch (final LockFailedException e) {
             Assert.fail();
         } finally {
diff --git a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
index b8d044b..0fcfb7f 100644
--- a/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
+++ b/payment/src/test/java/org/killbill/billing/payment/TestJanitor.java
@@ -1,6 +1,6 @@
 /*
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -71,6 +71,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 
 import static com.jayway.awaitility.Awaitility.await;
@@ -508,12 +509,14 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
             await().atMost(timeoutSec, SECONDS).until(new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws Exception {
+                    boolean completed = true;
                     for (final NotificationEventWithMetadata<NotificationEvent> notificationEvent : notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId())) {
                         if (!notificationEvent.getEffectiveDate().isAfter(clock.getUTCNow())) {
-                            return false;
+                            completed = false;
                         }
+                        // Go through all results to close the connection
                     }
-                    return true;
+                    return completed;
                 }
             });
         } catch (final Exception e) {
@@ -523,7 +526,7 @@ public class TestJanitor extends PaymentTestSuiteWithEmbeddedDB {
 
     private int getPendingNotificationCnt(final InternalCallContext internalCallContext) {
         try {
-            return notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()).size();
+            return Iterables.<NotificationEventWithMetadata>size(notificationQueueService.getNotificationQueue(DefaultPaymentService.SERVICE_NAME, Janitor.QUEUE_NAME).getFutureOrInProcessingNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
         } catch (final Exception e) {
             fail("Test failed ", e);
         }

pom.xml 2(+1 -1)

diff --git a/pom.xml b/pom.xml
index ec1e3ea..2c2ec75 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,7 @@
         <version>0.141-SNAPSHOT</version>
     </parent>
     <artifactId>killbill</artifactId>
-    <version>0.18.4-SNAPSHOT</version>
+    <version>0.18.5-SNAPSHOT</version>
     <packaging>pom</packaging>
     <name>killbill</name>
     <description>Library for managing recurring subscriptions and the associated billing</description>
diff --git a/profiles/killbill/pom.xml b/profiles/killbill/pom.xml
index d5c9444..1e98233 100644
--- a/profiles/killbill/pom.xml
+++ b/profiles/killbill/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>killbill-profiles</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-profiles-killbill</artifactId>
diff --git a/profiles/killpay/pom.xml b/profiles/killpay/pom.xml
index 030fee8..d6cd1b3 100644
--- a/profiles/killpay/pom.xml
+++ b/profiles/killpay/pom.xml
@@ -20,7 +20,7 @@
     <parent>
         <artifactId>killbill-profiles</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-profiles-killpay</artifactId>

profiles/pom.xml 2(+1 -1)

diff --git a/profiles/pom.xml b/profiles/pom.xml
index 7764c49..1e1e897 100644
--- a/profiles/pom.xml
+++ b/profiles/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-profiles</artifactId>
diff --git a/subscription/pom.xml b/subscription/pom.xml
index 109e33b..c01bb38 100644
--- a/subscription/pom.xml
+++ b/subscription/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-subscription</artifactId>
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java b/subscription/src/main/java/org/killbill/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java
index c72cf42..ccc67eb 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/api/svcs/DefaultSubscriptionInternalApi.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -731,7 +731,7 @@ public class DefaultSubscriptionInternalApi extends SubscriptionApiBase implemen
         try {
             final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultSubscriptionBaseService.SUBSCRIPTION_SERVICE_NAME,
                                                                                                       DefaultSubscriptionBaseService.NOTIFICATION_QUEUE_NAME);
-            final List<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
+            final Iterable<NotificationEventWithMetadata<NotificationEvent>> futureNotifications = notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId());
             return Iterables.transform(futureNotifications, new Function<NotificationEventWithMetadata<NotificationEvent>, DateTime>() {
                 @Nullable
                 @Override
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBase.java b/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBase.java
index e255419..1dd9003 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBase.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBase.java
@@ -204,6 +204,14 @@ public class DefaultSubscriptionBase extends EntityBase implements SubscriptionB
                                                  : getPreviousTransition().getNextPlan();
     }
 
+    public Plan getCurrentOrPendingPlan() {
+        if (getState() == EntitlementState.PENDING) {
+            return getPendingTransition().getNextPlan();
+        } else {
+            return getCurrentPlan();
+        }
+    }
+
     @Override
     public PriceList getCurrentPriceList() {
         return (getPreviousTransition() == null) ? null :
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java b/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java
index c7d3e41..77b73c9 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/api/user/DefaultSubscriptionBaseApiService.java
@@ -113,10 +113,6 @@ public class DefaultSubscriptionBaseApiService implements SubscriptionBaseApiSer
 
         try {
 
-            if (subscription.getAlignStartDate().compareTo(effectiveDate) != 0) {
-                throw new RuntimeException("Subscription id = " + subscription.getId() + ", alignStartDate = " + subscription.getAlignStartDate() + ", effectiveDate = " + effectiveDate);
-            }
-
             final List<SubscriptionBaseEvent> events = getEventsOnCreation(subscription.getBundleId(), subscription.getId(), subscription.getAlignStartDate(), subscription.getBundleStartDate(),
                                                                            plan, initialPhase, realPriceList, effectiveDate, processedDate, internalCallContext);
             dao.createSubscription(subscription, events, internalCallContext);
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/engine/addon/AddonUtils.java b/subscription/src/main/java/org/killbill/billing/subscription/engine/addon/AddonUtils.java
index 8fd9da7..5b238c2 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/engine/addon/AddonUtils.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/engine/addon/AddonUtils.java
@@ -47,19 +47,22 @@ public class AddonUtils {
     public void checkAddonCreationRights(final DefaultSubscriptionBase baseSubscription, final Plan targetAddOnPlan, final DateTime requestedDate, final InternalTenantContext context)
             throws SubscriptionBaseApiException, CatalogApiException {
 
-        if (baseSubscription.getState() != EntitlementState.ACTIVE) {
+
+        if (baseSubscription.getState() == EntitlementState.CANCELLED ||
+            (baseSubscription.getState() == EntitlementState.PENDING &&  context.toLocalDate(baseSubscription.getStartDate()).compareTo(context.toLocalDate(requestedDate)) < 0)) {
             throw new SubscriptionBaseApiException(ErrorCode.SUB_CREATE_AO_BP_NON_ACTIVE, targetAddOnPlan.getName());
         }
 
-        final Product baseProduct = catalogService.getFullCatalog(true, true, context).findProduct(baseSubscription.getCurrentPlan().getProduct().getName(), requestedDate);
+        final Plan currentOrPendingPlan = baseSubscription.getCurrentOrPendingPlan();
+        final Product baseProduct = catalogService.getFullCatalog(true, true, context).findProduct(currentOrPendingPlan.getProduct().getName(), requestedDate);
         if (isAddonIncluded(baseProduct, targetAddOnPlan)) {
             throw new SubscriptionBaseApiException(ErrorCode.SUB_CREATE_AO_ALREADY_INCLUDED,
-                                                   targetAddOnPlan.getName(), baseSubscription.getCurrentPlan().getProduct().getName());
+                                                   targetAddOnPlan.getName(), currentOrPendingPlan.getProduct().getName());
         }
 
         if (!isAddonAvailable(baseProduct, targetAddOnPlan)) {
             throw new SubscriptionBaseApiException(ErrorCode.SUB_CREATE_AO_NOT_AVAILABLE,
-                                                   targetAddOnPlan.getName(), baseSubscription.getCurrentPlan().getProduct().getName());
+                                                   targetAddOnPlan.getName(), currentOrPendingPlan.getProduct().getName());
         }
     }
 
diff --git a/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java b/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java
index 9bbc75d..af140fb 100644
--- a/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java
+++ b/subscription/src/main/java/org/killbill/billing/subscription/engine/dao/DefaultSubscriptionDao.java
@@ -82,6 +82,7 @@ import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.dao.NonEntityDao;
 import org.killbill.billing.util.entity.Entity;
 import org.killbill.billing.util.entity.Pagination;
+import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.Ordering;
 import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.PaginationIteratorBuilder;
 import org.killbill.billing.util.entity.dao.EntityDaoBase;
 import org.killbill.billing.util.entity.dao.EntitySqlDao;
@@ -97,7 +98,6 @@ import org.killbill.notificationq.api.NotificationQueue;
 import org.killbill.notificationq.api.NotificationQueueService;
 import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
 import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.sqlobject.customizers.OverrideStatementLocatorWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -206,8 +206,8 @@ public class DefaultSubscriptionDao extends EntityDaoBase<SubscriptionBundleMode
                                                   }
 
                                                   @Override
-                                                  public Iterator<SubscriptionBundleModelDao> build(final BundleSqlDao bundleSqlDao, final Long limit, final InternalTenantContext context) {
-                                                      return bundleSqlDao.search(searchKey, String.format("%%%s%%", searchKey), offset, limit, context);
+                                                  public Iterator<SubscriptionBundleModelDao> build(final BundleSqlDao bundleSqlDao, final Long offset, final Long limit, final Ordering ordering, final InternalTenantContext context) {
+                                                      return bundleSqlDao.search(searchKey, String.format("%%%s%%", searchKey), offset, limit, ordering.toString(), context);
                                                   }
                                               },
                                               offset,

tenant/pom.xml 2(+1 -1)

diff --git a/tenant/pom.xml b/tenant/pom.xml
index d12ae39..b7db50f 100644
--- a/tenant/pom.xml
+++ b/tenant/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-tenant</artifactId>

usage/pom.xml 2(+1 -1)

diff --git a/usage/pom.xml b/usage/pom.xml
index 386fbcb..345bc3c 100644
--- a/usage/pom.xml
+++ b/usage/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-usage</artifactId>

util/pom.xml 2(+1 -1)

diff --git a/util/pom.xml b/util/pom.xml
index c7237a3..7743895 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>killbill</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.18.4-SNAPSHOT</version>
+        <version>0.18.5-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>killbill-util</artifactId>
diff --git a/util/src/main/java/org/killbill/billing/util/customfield/dao/DefaultCustomFieldDao.java b/util/src/main/java/org/killbill/billing/util/customfield/dao/DefaultCustomFieldDao.java
index 6561b8f..742bf5d 100644
--- a/util/src/main/java/org/killbill/billing/util/customfield/dao/DefaultCustomFieldDao.java
+++ b/util/src/main/java/org/killbill/billing/util/customfield/dao/DefaultCustomFieldDao.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import javax.annotation.Nullable;
 
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.Ordering;
 import org.skife.jdbi.v2.IDBI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -161,8 +162,8 @@ public class DefaultCustomFieldDao extends EntityDaoBase<CustomFieldModelDao, Cu
                                                   }
 
                                                   @Override
-                                                  public Iterator<CustomFieldModelDao> build(final CustomFieldSqlDao customFieldSqlDao, final Long limit, final InternalTenantContext context) {
-                                                      return customFieldSqlDao.search(searchKey, String.format("%%%s%%", searchKey), offset, limit, context);
+                                                  public Iterator<CustomFieldModelDao> build(final CustomFieldSqlDao customFieldSqlDao, final Long offset, final Long limit, final Ordering ordering, final InternalTenantContext context) {
+                                                      return customFieldSqlDao.search(searchKey, String.format("%%%s%%", searchKey), offset, limit, ordering.toString(), context);
                                                   }
                                               },
                                               offset,
diff --git a/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java b/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java
index 8157d11..574d3b0 100644
--- a/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java
+++ b/util/src/main/java/org/killbill/billing/util/entity/dao/DefaultPaginationSqlDaoHelper.java
@@ -29,6 +29,11 @@ import org.killbill.billing.util.entity.Pagination;
 
 public class DefaultPaginationSqlDaoHelper {
 
+    // Number large enough so that small installations have access to an accurate count
+    // but small enough to not impact very large deployments
+    // TODO Should this be configurable per tenant?
+    private static final Long SIMPLE_PAGINATION_THRESHOLD = 20000L;
+
     private final EntitySqlDaoTransactionalJdbiWrapper transactionalSqlDao;
 
     public DefaultPaginationSqlDaoHelper(final EntitySqlDaoTransactionalJdbiWrapper transactionalSqlDao) {
@@ -38,13 +43,19 @@ public class DefaultPaginationSqlDaoHelper {
     public <E extends Entity, M extends EntityModelDao<E>, S extends EntitySqlDao<M, E>> Pagination<M> getPagination(final Class<? extends EntitySqlDao<M, E>> sqlDaoClazz,
                                                                                                                      final PaginationIteratorBuilder<M, E, S> paginationIteratorBuilder,
                                                                                                                      final Long offset,
-                                                                                                                     final Long limit,
+                                                                                                                     final Long limitMaybeNegative,
                                                                                                                      @Nullable final InternalTenantContext context) {
+        // Use a negative limit as a hint to go backwards. It's a bit awkward -- using a negative offset instead would be more intuitive,
+        // but it is non-deterministic for the first page unfortunately (limit 0 offset 50: ASC or DESC?)
+        final Ordering ordering = limitMaybeNegative >= 0 ? Ordering.ASC : Ordering.DESC;
+        final Long limit = Math.abs(limitMaybeNegative);
+
         // Note: the connection will be busy as we stream the results out: hence we cannot use
         // SQL_CALC_FOUND_ROWS / FOUND_ROWS on the actual query.
         // We still need to know the actual number of results, mainly for the UI so that it knows if it needs to fetch
         // more pages.
-        final Long count = transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Long>() {
+        // Note: for simple pagination (no search filter), this will be computed below instead (MaxNbRecords == TotalNbRecords)
+        final Long totalNbRecordsOrNull = transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Long>() {
             @Override
             public Long inTransaction(final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory) throws Exception {
                 final EntitySqlDao<M, E> sqlDao = entitySqlDaoWrapperFactory.become(sqlDaoClazz);
@@ -55,10 +66,20 @@ public class DefaultPaginationSqlDaoHelper {
         // We usually always want to wrap our queries in an EntitySqlDaoTransactionWrapper... except here.
         // Since we want to stream the results out, we don't want to auto-commit when this method returns.
         final EntitySqlDao<M, E> sqlDao = transactionalSqlDao.onDemandForStreamingResults(sqlDaoClazz);
-        final Long totalCount = context !=  null ? sqlDao.getCount(context) : null;
-        final Iterator<M> results = paginationIteratorBuilder.build((S) sqlDao, limit, context);
+        // The count to get maxNbRecords can be expensive on very large datasets. As a heuristic to check how large that number is,
+        // we retrieve 1 record at offset SIMPLE_PAGINATION_THRESHOLD (pretty fast). If we've found a record, that means the count is larger
+        // than this threshold and we don't issue the full count query
+        final Long maxNbRecords;
+        if (context == null || paginationIteratorBuilder.build((S) sqlDao, SIMPLE_PAGINATION_THRESHOLD, 1L, ordering, context).hasNext()) {
+            maxNbRecords = null;
+        } else {
+            maxNbRecords = sqlDao.getCount(context);
+        }
+        final Iterator<M> results = paginationIteratorBuilder.build((S) sqlDao, offset, limit, ordering, context);
+
+        final Long totalNbRecords = totalNbRecordsOrNull == null ? maxNbRecords : totalNbRecordsOrNull;
 
-        return new DefaultPagination<M>(offset, limit, count, totalCount, results);
+        return new DefaultPagination<M>(offset, limit, totalNbRecords, maxNbRecords, results);
     }
 
     public abstract static class PaginationIteratorBuilder<M extends EntityModelDao<E>, E extends Entity, S extends EntitySqlDao<M, E>> {
@@ -68,6 +89,11 @@ public class DefaultPaginationSqlDaoHelper {
         // - For get calls, return the total number of records (totalNbRecords == maxNbRecords)
         public abstract Long getCount(final S sqlDao, final InternalTenantContext context);
 
-        public abstract Iterator<M> build(final S sqlDao, final Long limit, final InternalTenantContext context);
+        public abstract Iterator<M> build(final S sqlDao, final Long offset, final Long limit, final Ordering ordering, final InternalTenantContext context);
+    }
+
+    public enum Ordering {
+        ASC,
+        DESC
     }
 }
diff --git a/util/src/main/java/org/killbill/billing/util/entity/dao/EntityDaoBase.java b/util/src/main/java/org/killbill/billing/util/entity/dao/EntityDaoBase.java
index ad472b6..32ad513 100644
--- a/util/src/main/java/org/killbill/billing/util/entity/dao/EntityDaoBase.java
+++ b/util/src/main/java/org/killbill/billing/util/entity/dao/EntityDaoBase.java
@@ -26,6 +26,7 @@ import org.killbill.billing.util.audit.ChangeType;
 import org.killbill.billing.util.entity.DefaultPagination;
 import org.killbill.billing.util.entity.Entity;
 import org.killbill.billing.util.entity.Pagination;
+import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.Ordering;
 import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.PaginationIteratorBuilder;
 
 public abstract class EntityDaoBase<M extends EntityModelDao<E>, E extends Entity, U extends BillingExceptionBase> implements EntityDao<M, E, U> {
@@ -137,12 +138,13 @@ public abstract class EntityDaoBase<M extends EntityModelDao<E>, E extends Entit
                                               new PaginationIteratorBuilder<M, E, EntitySqlDao<M, E>>() {
                                                   @Override
                                                   public Long getCount(final EntitySqlDao<M, E> sqlDao, final InternalTenantContext context) {
-                                                      return sqlDao.getCount(context);
+                                                      // Only need to compute it once, because no search filter has been applied (see DefaultPaginationSqlDaoHelper)
+                                                      return null;
                                                   }
 
                                                   @Override
-                                                  public Iterator<M> build(final EntitySqlDao<M, E> sqlDao, final Long limit, final InternalTenantContext context) {
-                                                      return sqlDao.get(offset, limit, getNaturalOrderingColumns(), context);
+                                                  public Iterator<M> build(final EntitySqlDao<M, E> sqlDao, final Long offset, final Long limit, final Ordering ordering, final InternalTenantContext context) {
+                                                      return sqlDao.get(offset, limit, getNaturalOrderingColumns(), ordering.toString(), context);
                                                   }
                                               },
                                               offset,
diff --git a/util/src/main/java/org/killbill/billing/util/entity/dao/EntitySqlDao.java b/util/src/main/java/org/killbill/billing/util/entity/dao/EntitySqlDao.java
index b5aa2a0..d20a5e7 100644
--- a/util/src/main/java/org/killbill/billing/util/entity/dao/EntitySqlDao.java
+++ b/util/src/main/java/org/killbill/billing/util/entity/dao/EntitySqlDao.java
@@ -73,6 +73,7 @@ public interface EntitySqlDao<M extends EntityModelDao<E>, E extends Entity> ext
                               @Bind("likeSearchKey") final String likeSearchKey,
                               @Bind("offset") final Long offset,
                               @Bind("rowCount") final Long rowCount,
+                              @Define("ordering") final String ordering,
                               @BindBean final InternalTenantContext context);
 
     @SqlQuery
@@ -89,6 +90,7 @@ public interface EntitySqlDao<M extends EntityModelDao<E>, E extends Entity> ext
     public Iterator<M> get(@Bind("offset") final Long offset,
                            @Bind("rowCount") final Long rowCount,
                            @Define("orderBy") final String orderBy,
+                           @Define("ordering") final String ordering,
                            @BindBean final InternalTenantContext context);
 
     @SqlQuery
diff --git a/util/src/main/java/org/killbill/billing/util/entity/DefaultPagination.java b/util/src/main/java/org/killbill/billing/util/entity/DefaultPagination.java
index 81401d6..bdd5e16 100644
--- a/util/src/main/java/org/killbill/billing/util/entity/DefaultPagination.java
+++ b/util/src/main/java/org/killbill/billing/util/entity/DefaultPagination.java
@@ -67,7 +67,8 @@ public class DefaultPagination<T> implements Pagination<T> {
                              @Nullable final Long totalNbRecords, @Nullable final Long maxNbRecords,
                              final Iterator<T> delegateIterator) {
         this.currentOffset = currentOffset;
-        this.limit = limit;
+        // See DefaultPaginationSqlDaoHelper
+        this.limit = Math.abs(limit);
         this.totalNbRecords = totalNbRecords;
         this.maxNbRecords = maxNbRecords;
         this.delegateIterator = delegateIterator;
diff --git a/util/src/main/java/org/killbill/billing/util/tag/dao/DefaultTagDao.java b/util/src/main/java/org/killbill/billing/util/tag/dao/DefaultTagDao.java
index 1b4c9ec..b6342ee 100644
--- a/util/src/main/java/org/killbill/billing/util/tag/dao/DefaultTagDao.java
+++ b/util/src/main/java/org/killbill/billing/util/tag/dao/DefaultTagDao.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.billing.util.entity.dao.DefaultPaginationSqlDaoHelper.Ordering;
 import org.skife.jdbi.v2.IDBI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -224,8 +225,8 @@ public class DefaultTagDao extends EntityDaoBase<TagModelDao, Tag, TagApiExcepti
                                                   }
 
                                                   @Override
-                                                  public Iterator<TagModelDao> build(final TagSqlDao tagSqlDao, final Long limit, final InternalTenantContext context) {
-                                                      return tagSqlDao.search(searchKey, String.format("%%%s%%", searchKey), offset, limit, context);
+                                                  public Iterator<TagModelDao> build(final TagSqlDao tagSqlDao, final Long offset, final Long limit, final Ordering ordering, final InternalTenantContext context) {
+                                                      return tagSqlDao.search(searchKey, String.format("%%%s%%", searchKey), offset, limit, ordering.toString(), context);
                                                   }
                                               },
                                               offset,
diff --git a/util/src/main/resources/cleanAccount.sql b/util/src/main/resources/cleanAccount.sql
index 6abade9..0224d1f 100644
--- a/util/src/main/resources/cleanAccount.sql
+++ b/util/src/main/resources/cleanAccount.sql
@@ -4,68 +4,17 @@ CREATE PROCEDURE cleanAccount(p_account_key varchar(36))
 BEGIN
 
     DECLARE v_account_record_id bigint /*! unsigned */;
+    DECLARE v_tenant_record_id bigint /*! unsigned */;
 
-    select record_id from accounts WHERE external_key = p_account_key into v_account_record_id;
+    select record_id, tenant_record_id from accounts WHERE external_key = p_account_key into v_account_record_id, v_tenant_record_id;
 
-    DELETE FROM analytics_account_fields WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_account_tags WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_account_transitions WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_accounts WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_bundle_fields WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_bundle_tags WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_bundles WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_chargebacks WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_invoice_adjustments WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_invoice_credits WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_invoice_fields WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_invoice_item_adjustments WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_invoice_items WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_invoice_tags WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_invoices WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_payment_fields WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_payment_tags WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_payments WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_refunds WHERE account_record_id = v_account_record_id;
-    DELETE FROM analytics_subscription_transitions WHERE account_record_id = v_account_record_id;
+    call trimAccount(p_account_key);
 
-    DELETE FROM accounts WHERE record_id = v_account_record_id;
-    DELETE FROM account_emails WHERE account_record_id = v_account_record_id;
-    DELETE FROM account_email_history WHERE account_record_id = v_account_record_id;
-    DELETE FROM account_history WHERE target_record_id = v_account_record_id;
-    DELETE FROM audit_log WHERE account_record_id = v_account_record_id;
-    DELETE FROM bac WHERE account_record_id = v_account_record_id;
-    DELETE FROM bac_fields WHERE account_record_id = v_account_record_id;
-    DELETE FROM bac_tags WHERE account_record_id = v_account_record_id;
-    DELETE FROM bii WHERE account_record_id = v_account_record_id;
-    DELETE FROM bin WHERE account_record_id = v_account_record_id;
-    DELETE FROM bin_fields WHERE account_record_id = v_account_record_id;
-    DELETE FROM bin_tags WHERE account_record_id = v_account_record_id;
-    DELETE FROM bip WHERE account_record_id = v_account_record_id;
-    DELETE FROM bip_fields WHERE account_record_id = v_account_record_id;
-    DELETE FROM bip_tags WHERE account_record_id = v_account_record_id;
-    DELETE FROM blocking_states WHERE account_record_id = v_account_record_id;
-    DELETE FROM bos WHERE account_record_id = v_account_record_id;
-    DELETE FROM bst WHERE account_record_id = v_account_record_id;
-    DELETE FROM bst_fields WHERE account_record_id = v_account_record_id;
-    DELETE FROM bst_tags WHERE account_record_id = v_account_record_id;
-    DELETE FROM bundles WHERE account_record_id = v_account_record_id;
-    DELETE FROM custom_field_history WHERE account_record_id = v_account_record_id;
-    DELETE FROM custom_fields WHERE account_record_id = v_account_record_id;
-    DELETE FROM invoice_payments WHERE account_record_id = v_account_record_id;
-    DELETE FROM invoices WHERE account_record_id = v_account_record_id;
-    DELETE FROM invoice_items WHERE account_record_id = v_account_record_id;
-    DELETE FROM payment_attempt_history WHERE account_record_id = v_account_record_id;
-    DELETE FROM payment_attempts WHERE account_record_id = v_account_record_id;
-    DELETE FROM payment_methods WHERE account_record_id = v_account_record_id;
-    DELETE FROM payment_method_history WHERE account_record_id = v_account_record_id;
-    DELETE FROM payment_history WHERE account_record_id = v_account_record_id;
-    DELETE FROM payments WHERE account_record_id = v_account_record_id;
-    DELETE FROM refunds WHERE account_record_id = v_account_record_id;
-    DELETE FROM refund_history WHERE account_record_id = v_account_record_id;
-    DELETE FROM subscriptions WHERE account_record_id = v_account_record_id;
-    DELETE FROM subscription_events WHERE account_record_id = v_account_record_id;
-    DELETE FROM tags WHERE account_record_id = v_account_record_id;
-    DELETE FROM tag_history WHERE account_record_id = v_account_record_id;
+    DELETE FROM account_history WHERE target_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM accounts WHERE record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM audit_log WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM payment_method_history WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM payment_methods WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
 
     END;
 //
diff --git a/util/src/main/resources/org/killbill/billing/util/ddl.sql b/util/src/main/resources/org/killbill/billing/util/ddl.sql
index 8e21ad6..552f8dd 100644
--- a/util/src/main/resources/org/killbill/billing/util/ddl.sql
+++ b/util/src/main/resources/org/killbill/billing/util/ddl.sql
@@ -193,6 +193,7 @@ CREATE TABLE notifications_history (
     future_user_token varchar(36),
     PRIMARY KEY(record_id)
 ) /*! CHARACTER SET utf8 COLLATE utf8_bin */;
+CREATE INDEX notifications_history_tenant_account_record_id ON notifications_history(search_key2, search_key1);
 
 DROP TABLE IF EXISTS bus_events;
 CREATE TABLE bus_events (
@@ -231,6 +232,7 @@ CREATE TABLE bus_events_history (
     search_key2 bigint /*! unsigned */ not null default 0,
     PRIMARY KEY(record_id)
 ) /*! CHARACTER SET utf8 COLLATE utf8_bin */;
+CREATE INDEX bus_events_history_tenant_account_record_id ON bus_events_history(search_key2, search_key1);
 
 drop table if exists sessions;
 create table sessions (
diff --git a/util/src/main/resources/org/killbill/billing/util/entity/dao/EntitySqlDao.sql.stg b/util/src/main/resources/org/killbill/billing/util/entity/dao/EntitySqlDao.sql.stg
index 6f35450..a79883d 100644
--- a/util/src/main/resources/org/killbill/billing/util/entity/dao/EntitySqlDao.sql.stg
+++ b/util/src/main/resources/org/killbill/billing/util/entity/dao/EntitySqlDao.sql.stg
@@ -147,7 +147,7 @@ where <CHECK_TENANT("t.")>
 ;
 >>
 
-get(offset, rowCount, orderBy) ::= <<
+get(offset, rowCount, orderBy, ordering) ::= <<
 select
 <allTableFields("t.")>
 from <tableName()> t
@@ -156,10 +156,10 @@ join (
   from <tableName()>
   where <CHECK_TENANT()>
   <andCheckSoftDeletionWithComma()>
-  order by <orderBy>
+  order by <orderBy> <ordering>
   limit :rowCount offset :offset
 ) optimization on <recordIdField("optimization.")> = <recordIdField("t.")>
-order by t.<orderBy>
+order by t.<orderBy> <ordering>
 ;
 >>
 
@@ -268,14 +268,14 @@ searchQuery(prefix) ::= <<
 1 = 1
 >>
 
-search() ::= <<
+search(ordering) ::= <<
 select
 <allTableFields("t.")>
 from <tableName()> t
 where (<searchQuery("t.")>)
 <andCheckSoftDeletionWithComma("t.")>
 <AND_CHECK_TENANT("t.")>
-order by <recordIdField("t.")> ASC
+order by <recordIdField("t.")> <ordering>
 limit :rowCount offset :offset
 ;
 >>
diff --git a/util/src/main/resources/org/killbill/billing/util/migration/V20170209083747__queues_history_indexes.sql b/util/src/main/resources/org/killbill/billing/util/migration/V20170209083747__queues_history_indexes.sql
new file mode 100644
index 0000000..f873b01
--- /dev/null
+++ b/util/src/main/resources/org/killbill/billing/util/migration/V20170209083747__queues_history_indexes.sql
@@ -0,0 +1,3 @@
+alter table notifications_history add index notifications_history_tenant_account_record_id(search_key2, search_key1);
+alter table bus_events_history add index bus_events_history_tenant_account_record_id(search_key2, search_key1);
+alter table bus_ext_events_history add index bus_ext_events_history_tenant_account_record_id(search_key2, search_key1);
diff --git a/util/src/main/resources/org/killbill/billing/util/tag/dao/TagSqlDao.sql.stg b/util/src/main/resources/org/killbill/billing/util/tag/dao/TagSqlDao.sql.stg
index 3240d91..177ec5b 100644
--- a/util/src/main/resources/org/killbill/billing/util/tag/dao/TagSqlDao.sql.stg
+++ b/util/src/main/resources/org/killbill/billing/util/tag/dao/TagSqlDao.sql.stg
@@ -114,7 +114,7 @@ searchQuery(tagAlias, tagDefinitionAlias) ::= <<
   or <tagDefinitionAlias>description like :likeSearchKey
 >>
 
-search() ::= <<
+search(ordering) ::= <<
 select
 <allTableFields("t.")>
 from <tableName()> t
@@ -122,7 +122,7 @@ join (<userAndSystemTagDefinitions()>) td on td.id = t.tag_definition_id
 where (<searchQuery(tagAlias="t.", tagDefinitionAlias="td.")>)
 <andCheckSoftDeletionWithComma("t.")>
 <AND_CHECK_TENANT("t.")>
-order by <recordIdField("t.")> ASC
+order by <recordIdField("t.")> <ordering>
 limit :rowCount offset :offset
 ;
 >>
diff --git a/util/src/main/resources/trimAccount.sql b/util/src/main/resources/trimAccount.sql
new file mode 100644
index 0000000..0b3f438
--- /dev/null
+++ b/util/src/main/resources/trimAccount.sql
@@ -0,0 +1,72 @@
+drop procedure if exists trimAccount;
+DELIMITER //
+CREATE PROCEDURE trimAccount(p_account_key varchar(36))
+BEGIN
+
+    DECLARE v_account_record_id bigint /*! unsigned */;
+    DECLARE v_tenant_record_id bigint /*! unsigned */;
+
+    select record_id, tenant_record_id from accounts WHERE external_key = p_account_key into v_account_record_id, v_tenant_record_id;
+
+    DELETE FROM analytics_account_fields WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_account_tags WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_account_transitions WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_accounts WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_bundle_fields WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_bundle_tags WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_bundles WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_invoice_adjustments WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_invoice_credits WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_invoice_fields WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_invoice_item_adjustments WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_invoice_items WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_invoice_payment_fields WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_invoice_tags WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_invoices WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_notifications WHERE search_key1 = v_account_record_id and search_key2 = v_tenant_record_id;
+    DELETE FROM analytics_notifications_history WHERE search_key1 = v_account_record_id and search_key2 = v_tenant_record_id;
+    DELETE FROM analytics_payment_auths WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_payment_captures WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_payment_chargebacks WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_payment_credits WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_payment_fields WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_payment_method_fields WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_payment_purchases WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_payment_refunds WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_payment_tags WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_payment_voids WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_subscription_transitions WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM analytics_transaction_fields WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+
+    DELETE FROM account_email_history WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM account_emails WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM audit_log WHERE table_name not in ('ACCOUNT_HISTORY', 'PAYMENT_METHOD_HISTORY') and account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM blocking_states WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM bundles WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM bus_events WHERE search_key1 = v_account_record_id and search_key2 = v_tenant_record_id;
+    DELETE FROM bus_events_history WHERE search_key1 = v_account_record_id and search_key2 = v_tenant_record_id;
+    DELETE FROM bus_ext_events WHERE search_key1 = v_account_record_id and search_key2 = v_tenant_record_id;
+    DELETE FROM bus_ext_events_history WHERE search_key1 = v_account_record_id and search_key2 = v_tenant_record_id;
+    DELETE FROM custom_field_history WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM custom_fields WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM invoice_items WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM invoice_parent_children WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM invoice_payments WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM invoices WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM notifications WHERE search_key1 = v_account_record_id and search_key2 = v_tenant_record_id;
+    DELETE FROM notifications_history WHERE search_key1 = v_account_record_id and search_key2 = v_tenant_record_id;
+    DELETE FROM payment_attempt_history WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM payment_attempts WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM payment_history WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM payment_transaction_history WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM payment_transactions WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM payments WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM rolled_up_usage WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM subscription_events WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM subscriptions WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM tag_history WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+    DELETE FROM tags WHERE account_record_id = v_account_record_id and tenant_record_id = v_tenant_record_id;
+
+    END;
+//
+DELIMITER ;
diff --git a/util/src/test/java/org/killbill/billing/util/dao/TestPagination.java b/util/src/test/java/org/killbill/billing/util/dao/TestPagination.java
index 502ecfd..107585e 100644
--- a/util/src/test/java/org/killbill/billing/util/dao/TestPagination.java
+++ b/util/src/test/java/org/killbill/billing/util/dao/TestPagination.java
@@ -44,12 +44,12 @@ public class TestPagination extends UtilTestSuiteWithEmbeddedDB {
 
         // Tests via SQL dao directly
         Assert.assertEquals(ImmutableList.<TagDefinitionModelDao>copyOf(tagDefinitionSqlDao.getAll(internalCallContext)).size(), 10);
-        Assert.assertEquals(ImmutableList.<TagDefinitionModelDao>copyOf(tagDefinitionSqlDao.get(0L, 100L, "record_id", internalCallContext)).size(), 10);
-        Assert.assertEquals(ImmutableList.<TagDefinitionModelDao>copyOf(tagDefinitionSqlDao.get(5L, 100L, "record_id", internalCallContext)).size(), 5);
-        Assert.assertEquals(ImmutableList.<TagDefinitionModelDao>copyOf(tagDefinitionSqlDao.get(5L, 10L, "record_id", internalCallContext)).size(), 5);
-        Assert.assertEquals(ImmutableList.<TagDefinitionModelDao>copyOf(tagDefinitionSqlDao.get(0L, 5L, "record_id", internalCallContext)).size(), 5);
+        Assert.assertEquals(ImmutableList.<TagDefinitionModelDao>copyOf(tagDefinitionSqlDao.get(0L, 100L, "record_id", "asc", internalCallContext)).size(), 10);
+        Assert.assertEquals(ImmutableList.<TagDefinitionModelDao>copyOf(tagDefinitionSqlDao.get(5L, 100L, "record_id", "asc", internalCallContext)).size(), 5);
+        Assert.assertEquals(ImmutableList.<TagDefinitionModelDao>copyOf(tagDefinitionSqlDao.get(5L, 10L, "record_id", "asc", internalCallContext)).size(), 5);
+        Assert.assertEquals(ImmutableList.<TagDefinitionModelDao>copyOf(tagDefinitionSqlDao.get(0L, 5L, "record_id", "asc", internalCallContext)).size(), 5);
         for (int i = 0; i < 10; i++) {
-            final List<TagDefinitionModelDao> tagDefinitions = ImmutableList.<TagDefinitionModelDao>copyOf(tagDefinitionSqlDao.get(0L, (long) i, "record_id", internalCallContext));
+            final List<TagDefinitionModelDao> tagDefinitions = ImmutableList.<TagDefinitionModelDao>copyOf(tagDefinitionSqlDao.get(0L, (long) i, "record_id", "asc", internalCallContext));
             Assert.assertEquals(tagDefinitions.size(), i);
 
             for (int j = 0; j < tagDefinitions.size(); j++) {
diff --git a/util/src/test/java/org/killbill/billing/util/dao/TestStringTemplateInheritance.java b/util/src/test/java/org/killbill/billing/util/dao/TestStringTemplateInheritance.java
index cc50c48..a93add6 100644
--- a/util/src/test/java/org/killbill/billing/util/dao/TestStringTemplateInheritance.java
+++ b/util/src/test/java/org/killbill/billing/util/dao/TestStringTemplateInheritance.java
@@ -114,24 +114,24 @@ public class TestStringTemplateInheritance extends UtilTestSuiteNoDB {
                                                                    "where t.tenant_record_id = :tenantRecordId\r?\n" +
                                                                    "order by t.record_id ASC\r?\n" +
                                                                    ";");
-        assertPattern(kombucha.getInstanceOf("get", ImmutableMap.<String, String>of("orderBy", "record_id", "offset", "3", "rowCount", "12")).toString(), "select\r?\n" +
-                                                                                                                                                          "  t.record_id\r?\n" +
-                                                                                                                                                          ", t.id\r?\n" +
-                                                                                                                                                          ", t.tea\r?\n" +
-                                                                                                                                                          ", t.mushroom\r?\n" +
-                                                                                                                                                          ", t.sugar\r?\n" +
-                                                                                                                                                          ", t.account_record_id\r?\n" +
-                                                                                                                                                          ", t.tenant_record_id\r?\n" +
-                                                                                                                                                          "from kombucha t\r?\n" +
-                                                                                                                                                          "join \\(\r?\n" +
-                                                                                                                                                          "  select record_id\r?\n" +
-                                                                                                                                                          "  from kombucha\r?\n" +
-                                                                                                                                                          "  where tenant_record_id = :tenantRecordId\r?\n" +
-                                                                                                                                                          "  order by record_id\r?\n" +
-                                                                                                                                                          "  limit :rowCount offset :offset\r?\n" +
-                                                                                                                                                          "\\) optimization on optimization.record_id = t.record_id\r?\n" +
-                                                                                                                                                          "order by t.record_id\r?\n" +
-                                                                                                                                                          ";");
+        assertPattern(kombucha.getInstanceOf("get", ImmutableMap.<String, String>of("orderBy", "record_id", "offset", "3", "rowCount", "12", "ordering", "ASC")).toString(), "select\r?\n" +
+                                                                                                                                                                             "  t.record_id\r?\n" +
+                                                                                                                                                                             ", t.id\r?\n" +
+                                                                                                                                                                             ", t.tea\r?\n" +
+                                                                                                                                                                             ", t.mushroom\r?\n" +
+                                                                                                                                                                             ", t.sugar\r?\n" +
+                                                                                                                                                                             ", t.account_record_id\r?\n" +
+                                                                                                                                                                             ", t.tenant_record_id\r?\n" +
+                                                                                                                                                                             "from kombucha t\r?\n" +
+                                                                                                                                                                             "join \\(\r?\n" +
+                                                                                                                                                                             "  select record_id\r?\n" +
+                                                                                                                                                                             "  from kombucha\r?\n" +
+                                                                                                                                                                             "  where tenant_record_id = :tenantRecordId\r?\n" +
+                                                                                                                                                                             "  order by record_id ASC\r?\n" +
+                                                                                                                                                                             "  limit :rowCount offset :offset\r?\n" +
+                                                                                                                                                                             "\\) optimization on optimization.record_id = t.record_id\r?\n" +
+                                                                                                                                                                             "order by t.record_id ASC\r?\n" +
+                                                                                                                                                                             ";");
         assertPattern(kombucha.getInstanceOf("test").toString(), "select\r?\n" +
                                                                  "  t.record_id\r?\n" +
                                                                  ", t.id\r?\n" +