killbill-aplcache

Details

diff --git a/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java b/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java
new file mode 100644
index 0000000..1f54f9c
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.invoice.api;
+
+import org.killbill.billing.platform.api.KillbillService;
+
+public interface InvoiceListenerService extends KillbillService {
+}
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
index 606d7dc..fc49f94 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014 Groupon, Inc
- * Copyright 2014 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -27,10 +27,8 @@ import org.joda.time.DateTime;
 import org.killbill.billing.DBTestingHelper;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.api.TestApiListener.NextEvent;
-import org.killbill.billing.beatrix.extbus.DefaultBusExternalEvent;
 import org.killbill.billing.callcontext.DefaultCallContext;
 import org.killbill.billing.catalog.api.BillingPeriod;
-import org.killbill.billing.catalog.api.PriceListSet;
 import org.killbill.billing.catalog.api.ProductCategory;
 import org.killbill.billing.entitlement.api.DefaultEntitlement;
 import org.killbill.billing.notification.plugin.api.ExtBusEvent;
@@ -46,11 +44,6 @@ import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.UserType;
 import org.killbill.billing.util.jackson.ObjectMapper;
-import org.killbill.billing.util.nodes.NodeCommand;
-import org.killbill.billing.util.nodes.NodeCommandMetadata;
-import org.killbill.billing.util.nodes.NodeCommandProperty;
-import org.killbill.billing.util.nodes.PluginNodeCommandMetadata;
-import org.killbill.billing.util.nodes.SystemNodeCommandType;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -58,65 +51,30 @@ import org.testng.annotations.Test;
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonMappingException;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.eventbus.Subscribe;
 
-import static org.awaitility.Awaitility.await;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
 import static org.testng.Assert.assertNotNull;
 
 public class TestPublicBus extends TestIntegrationBase {
 
-    private PublicListener publicListener;
+    private static final ObjectMapper mapper = new ObjectMapper();
 
+    private PublicListener publicListener;
     private AtomicInteger externalBusCount;
 
-    private final ObjectMapper mapper = new ObjectMapper();
-
     @Override
     protected KillbillConfigSource getConfigSource() {
-        ImmutableMap additionalProperties = new ImmutableMap.Builder()
-                .put("org.killbill.billing.util.broadcast.rate", "500ms")
-                .build();
+        final ImmutableMap<String, String> additionalProperties = new ImmutableMap.Builder<String, String>().put("org.killbill.billing.util.broadcast.rate", "500ms")
+                                                                                                            .build();
         return getConfigSource("/beatrix.properties", additionalProperties);
     }
 
-
-    public class PublicListener {
-
-        @Subscribe
-        public void handleExternalEvents(final ExtBusEvent event) {
-            log.info("GOT EXT EVENT " + event);
-
-            if (event.getEventType() == ExtBusEventType.SUBSCRIPTION_CREATION ||
-                event.getEventType() == ExtBusEventType.SUBSCRIPTION_CANCEL ||
-                event.getEventType() == ExtBusEventType.SUBSCRIPTION_PHASE ||
-                event.getEventType() == ExtBusEventType.SUBSCRIPTION_CHANGE ||
-                event.getEventType() == ExtBusEventType.SUBSCRIPTION_UNCANCEL ||
-                event.getEventType() == ExtBusEventType.SUBSCRIPTION_BCD_CHANGE) {
-                try {
-                    final SubscriptionMetadata obj = (SubscriptionMetadata) mapper.readValue(event.getMetaData(), SubscriptionMetadata.class);
-                    Assert.assertNotNull(obj.getBundleExternalKey());
-                    Assert.assertNotNull(obj.getActionType());
-                } catch (JsonParseException e) {
-                    Assert.fail("Could not deserialize metada section", e);
-                } catch (JsonMappingException e) {
-                    Assert.fail("Could not deserialize metada section", e);
-                } catch (IOException e) {
-                    Assert.fail("Could not deserialize metada section", e);
-                }
-            }
-
-            externalBusCount.incrementAndGet();
-
-        }
-    }
-
     @Override
     @BeforeMethod(groups = "slow")
     public void beforeMethod() throws Exception {
-
         /*
         We copy the initialization instead of invoking the super method so we can add the registration
         of the publicBus event;
@@ -162,7 +120,6 @@ public class TestPublicBus extends TestIntegrationBase {
 
     @Test(groups = "slow")
     public void testSimple() throws Exception {
-
         final DateTime initialDate = new DateTime(2012, 2, 1, 0, 3, 42, 0, testTimeZone);
         final int billingDay = 2;
 
@@ -189,7 +146,6 @@ public class TestPublicBus extends TestIntegrationBase {
 
         addDaysAndCheckForCompletion(31, NextEvent.PHASE, NextEvent.INVOICE, NextEvent.PAYMENT, NextEvent.INVOICE_PAYMENT);
 
-
         await().atMost(10, SECONDS).until(new Callable<Boolean>() {
             @Override
             public Boolean call() throws Exception {
@@ -197,12 +153,10 @@ public class TestPublicBus extends TestIntegrationBase {
                 return externalBusCount.get() == 10;
             }
         });
-
     }
 
     @Test(groups = "slow")
     public void testTenantKVChange() throws Exception {
-
         final TenantData tenantData = new DefaultTenant(null, clock.getUTCNow(), clock.getUTCNow(), "MY_TENANT", "key", "s3Cr3T");
         final CallContext contextWithNoTenant = new DefaultCallContext(null, null, "loulou", CallOrigin.EXTERNAL, UserType.ADMIN, "no reason", "hum", UUID.randomUUID(), clock);
         final Tenant tenant = tenantUserApi.createTenant(tenantData, contextWithNoTenant);
@@ -219,4 +173,34 @@ public class TestPublicBus extends TestIntegrationBase {
             }
         });
     }
+
+    public class PublicListener {
+
+        @Subscribe
+        public void handleExternalEvents(final ExtBusEvent event) {
+            log.info("GOT EXT EVENT " + event);
+
+            if (event.getEventType() == ExtBusEventType.SUBSCRIPTION_CREATION ||
+                event.getEventType() == ExtBusEventType.SUBSCRIPTION_CANCEL ||
+                event.getEventType() == ExtBusEventType.SUBSCRIPTION_PHASE ||
+                event.getEventType() == ExtBusEventType.SUBSCRIPTION_CHANGE ||
+                event.getEventType() == ExtBusEventType.SUBSCRIPTION_UNCANCEL ||
+                event.getEventType() == ExtBusEventType.SUBSCRIPTION_BCD_CHANGE) {
+                try {
+                    final SubscriptionMetadata obj = (SubscriptionMetadata) mapper.readValue(event.getMetaData(), SubscriptionMetadata.class);
+                    Assert.assertNotNull(obj.getBundleExternalKey());
+                    Assert.assertNotNull(obj.getActionType());
+                } catch (final JsonParseException e) {
+                    Assert.fail("Could not deserialize metada section", e);
+                } catch (final JsonMappingException e) {
+                    Assert.fail("Could not deserialize metada section", e);
+                } catch (final IOException e) {
+                    Assert.fail("Could not deserialize metada section", e);
+                }
+            }
+
+            externalBusCount.incrementAndGet();
+
+        }
+    }
 }
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java
new file mode 100644
index 0000000..03dcc30
--- /dev/null
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.beatrix.integration;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.awaitility.Awaitility;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+import org.killbill.billing.account.api.Account;
+import org.killbill.billing.account.api.AccountData;
+import org.killbill.billing.api.TestApiListener.NextEvent;
+import org.killbill.billing.beatrix.util.InvoiceChecker.ExpectedInvoiceItemCheck;
+import org.killbill.billing.catalog.api.BillingPeriod;
+import org.killbill.billing.catalog.api.ProductCategory;
+import org.killbill.billing.entitlement.api.DefaultEntitlement;
+import org.killbill.billing.invoice.api.DefaultInvoiceService;
+import org.killbill.billing.invoice.api.Invoice;
+import org.killbill.billing.invoice.api.InvoiceItem;
+import org.killbill.billing.invoice.api.InvoiceItemType;
+import org.killbill.billing.invoice.model.TaxInvoiceItem;
+import org.killbill.billing.invoice.notification.DefaultNextBillingDateNotifier;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApi;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApiRetryException;
+import org.killbill.billing.osgi.api.OSGIServiceDescriptor;
+import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.util.callcontext.CallContext;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryNotificationEvent;
+import org.killbill.queue.retry.RetryableService;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import static org.testng.Assert.assertEquals;
+
+public class TestWithInvoicePlugin extends TestIntegrationBase {
+
+    @Inject
+    private OSGIServiceRegistration<InvoicePluginApi> pluginRegistry;
+
+    @Inject
+    private NotificationQueueService notificationQueueService;
+
+    private TestInvoicePluginApi testInvoicePluginApi;
+
+    @BeforeClass(groups = "slow")
+    public void beforeClass() throws Exception {
+        super.beforeClass();
+
+        this.testInvoicePluginApi = new TestInvoicePluginApi();
+        pluginRegistry.registerService(new OSGIServiceDescriptor() {
+            @Override
+            public String getPluginSymbolicName() {
+                return "TestInvoicePluginApi";
+            }
+
+            @Override
+            public String getPluginName() {
+                return "TestInvoicePluginApi";
+            }
+
+            @Override
+            public String getRegistrationName() {
+                return "TestInvoicePluginApi";
+            }
+        }, testInvoicePluginApi);
+    }
+
+    @Test(groups = "slow")
+    public void testWithRetries() throws Exception {
+        // We take april as it has 30 days (easier to play with BCD)
+        // Set clock to the initial start date - we implicitly assume here that the account timezone is UTC
+        clock.setDay(new LocalDate(2012, 4, 1));
+
+        final AccountData accountData = getAccountData(1);
+        final Account account = createAccountWithNonOsgiPaymentMethod(accountData);
+        accountChecker.checkAccount(account.getId(), accountData, callContext);
+
+        // Make invoice plugin fail
+        testInvoicePluginApi.shouldThrowException = true;
+
+        // Create original subscription (Trial PHASE)
+        final DefaultEntitlement bpSubscription = createBaseEntitlementAndCheckForCompletion(account.getId(), "bundleKey", "Pistol", ProductCategory.BASE, BillingPeriod.MONTHLY, NextEvent.CREATE, NextEvent.BLOCK);
+        subscriptionChecker.checkSubscriptionCreated(bpSubscription.getId(), internalCallContext);
+        // Invoice failed to generate
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 0);
+
+        // Verify bus event has moved to the retry service (can't easily check the timestamp unfortunately)
+        // No future notification at this point (FIXED item, the PHASE event is the trigger for the next one)
+        checkRetryBusEvents(1, 0);
+
+        // Add 5'
+        clock.addDeltaFromReality(5 * 60 * 1000);
+        checkRetryBusEvents(2, 0);
+
+        // Fix invoice plugin
+        testInvoicePluginApi.shouldThrowException = false;
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.addDeltaFromReality(10 * 60 * 1000);
+        assertListenerStatus();
+        // No notification in the main queue at this point (the PHASE event is the trigger for the next one)
+        checkNotificationsNoRetry(0);
+
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 1);
+        invoiceChecker.checkInvoice(account.getId(),
+                                    1,
+                                    callContext,
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 4, 1), null, InvoiceItemType.FIXED, BigDecimal.ZERO),
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 4, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+        busHandler.pushExpectedEvents(NextEvent.PHASE, NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.setDay(new LocalDate("2012-05-01"));
+        assertListenerStatus();
+        checkNotificationsNoRetry(1);
+
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 2);
+        invoiceChecker.checkInvoice(account.getId(),
+                                    2,
+                                    callContext,
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 5, 1), new LocalDate(2012, 6, 1), InvoiceItemType.RECURRING, new BigDecimal("29.95")),
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 5, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+        // Make invoice plugin fail again
+        testInvoicePluginApi.shouldThrowException = true;
+
+        clock.addMonths(1);
+        assertListenerStatus();
+
+        // Invoice failed to generate
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 2);
+
+        // Verify notification has moved to the retry service
+        checkRetryNotifications("2012-06-01T00:05:00", 1);
+
+        // Add 5'
+        clock.addDeltaFromReality(5 * 60 * 1000);
+        // Verify there are no notification duplicates
+        checkRetryNotifications("2012-06-01T00:15:00", 1);
+
+        // Fix invoice plugin
+        testInvoicePluginApi.shouldThrowException = false;
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.addDeltaFromReality(10 * 60 * 1000);
+        assertListenerStatus();
+        checkNotificationsNoRetry(1);
+
+        // Invoice was generated
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 3);
+        invoiceChecker.checkInvoice(account.getId(),
+                                    3,
+                                    callContext,
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 1), new LocalDate(2012, 7, 1), InvoiceItemType.RECURRING, new BigDecimal("29.95")),
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+        // Make invoice plugin fail again
+        testInvoicePluginApi.shouldThrowException = true;
+
+        clock.setTime(new DateTime("2012-07-01T00:00:00"));
+        assertListenerStatus();
+
+        // Invoice failed to generate
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 3);
+
+        // Verify notification has moved to the retry service
+        checkRetryNotifications("2012-07-01T00:05:00", 1);
+
+        testInvoicePluginApi.shouldThrowException = false;
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.PAYMENT, NextEvent.INVOICE_PAYMENT);
+        clock.addDeltaFromReality(5 * 60 * 1000);
+        assertListenerStatus();
+        checkNotificationsNoRetry(1);
+
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 4);
+    }
+
+    private void checkRetryBusEvents(final int retryNb, final int expectedFutureInvoiceNotifications) throws NoSuchNotificationQueue {
+        // Verify notification(s) moved to the retry queue
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                final List<NotificationEventWithMetadata> futureInvoiceRetryableBusEvents = getFutureInvoiceRetryableBusEvents();
+                return futureInvoiceRetryableBusEvents.size() == 1 && ((RetryNotificationEvent) futureInvoiceRetryableBusEvents.get(0).getEvent()).getRetryNb() == retryNb;
+            }
+        });
+        assertEquals(getFutureInvoiceNotifications().size(), expectedFutureInvoiceNotifications);
+    }
+
+    private void checkRetryNotifications(final String retryDateTime, final int expectedFutureInvoiceNotifications) throws NoSuchNotificationQueue {
+        // Verify notification(s) moved to the retry queue
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                final List<NotificationEventWithMetadata> futureInvoiceRetryableNotifications = getFutureInvoiceRetryableNotifications();
+                return futureInvoiceRetryableNotifications.size() == 1 && futureInvoiceRetryableNotifications.get(0).getEffectiveDate().compareTo(new DateTime(retryDateTime, DateTimeZone.UTC)) == 0;
+            }
+        });
+        assertEquals(getFutureInvoiceNotifications().size(), expectedFutureInvoiceNotifications);
+    }
+
+    private void checkNotificationsNoRetry(final int main) throws NoSuchNotificationQueue {
+        assertEquals(getFutureInvoiceRetryableNotifications().size(), 0);
+        assertEquals(getFutureInvoiceNotifications().size(), main);
+    }
+
+    private List<NotificationEventWithMetadata> getFutureInvoiceNotifications() throws NoSuchNotificationQueue {
+        final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME, DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
+        return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+    }
+
+    private List<NotificationEventWithMetadata> getFutureInvoiceRetryableNotifications() throws NoSuchNotificationQueue {
+        final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
+        return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+    }
+
+    private List<NotificationEventWithMetadata> getFutureInvoiceRetryableBusEvents() throws NoSuchNotificationQueue {
+        final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, "invoice-listener");
+        return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+    }
+
+    public class TestInvoicePluginApi implements InvoicePluginApi {
+
+        boolean shouldThrowException = false;
+
+        @Override
+        public List<InvoiceItem> getAdditionalInvoiceItems(final Invoice invoice, final boolean isDryRun, final Iterable<PluginProperty> pluginProperties, final CallContext callContext) {
+            if (shouldThrowException) {
+                throw new InvoicePluginApiRetryException();
+            }
+            return ImmutableList.<InvoiceItem>of(createTaxInvoiceItem(invoice));
+        }
+
+        private InvoiceItem createTaxInvoiceItem(final Invoice invoice) {
+            return new TaxInvoiceItem(invoice.getId(), invoice.getAccountId(), null, "Tax Item", clock.getUTCNow().toLocalDate(), BigDecimal.ONE, invoice.getCurrency());
+        }
+    }
+}
diff --git a/catalog/src/main/resources/org/killbill/billing/catalog/ddl.sql b/catalog/src/main/resources/org/killbill/billing/catalog/ddl.sql
index 0bbd08d..dcae41e 100644
--- a/catalog/src/main/resources/org/killbill/billing/catalog/ddl.sql
+++ b/catalog/src/main/resources/org/killbill/billing/catalog/ddl.sql
@@ -32,7 +32,7 @@ CREATE INDEX catalog_override_phase_definition_idx ON catalog_override_phase_def
 DROP TABLE IF EXISTS catalog_override_plan_phase;
 CREATE TABLE catalog_override_plan_phase (
     record_id serial unique,
-    phase_number smallint /*! unsigned */ NOT NULL,
+    phase_number int /*! unsigned */ NOT NULL,
     phase_def_record_id bigint /*! unsigned */ not null,
     target_plan_def_record_id bigint /*! unsigned */ not null,
     created_date datetime NOT NULL,
@@ -80,8 +80,8 @@ create table catalog_override_block_definition
 (
 record_id serial unique,
 parent_unit_name varchar(255) NOT NULL,
-size double NOT NULL,
-max double NULL,
+size decimal(15,9) NOT NULL,
+max decimal(15,9) NULL,
 currency varchar(3) NOT NULL,
 price decimal(15,9) NOT NULL,
 effective_date datetime NOT NULL,
@@ -97,7 +97,7 @@ DROP TABLE IF EXISTS catalog_override_phase_usage;
 create table catalog_override_phase_usage
 (
 record_id serial unique,
-usage_number smallint(5) unsigned,
+usage_number int /*! unsigned */,
 usage_def_record_id  bigint /*! unsigned */ not null,
 target_phase_def_record_id bigint /*! unsigned */ not null,
 created_date datetime NOT NULL,
@@ -111,7 +111,7 @@ DROP TABLE IF EXISTS catalog_override_usage_tier;
 create table catalog_override_usage_tier
 (
 record_id serial unique,
-tier_number smallint(5) unsigned,
+tier_number int /*! unsigned */,
 tier_def_record_id bigint /*! unsigned */ not null,
 target_usage_def_record_id bigint /*! unsigned */ not null,
 created_date datetime NOT NULL,
@@ -126,7 +126,7 @@ DROP TABLE IF EXISTS catalog_override_tier_block;
 create table catalog_override_tier_block
 (
 record_id serial unique,
-block_number smallint(5) unsigned,
+block_number int /*! unsigned */,
 block_def_record_id bigint /*! unsigned */ not null,
 target_tier_def_record_id bigint /*! unsigned */ not null,
 created_date datetime NOT NULL,
diff --git a/catalog/src/main/resources/org/killbill/billing/catalog/migration/V20161220000000__unit_price_override.sql b/catalog/src/main/resources/org/killbill/billing/catalog/migration/V20161220000000__unit_price_override.sql
index 85f5589..0abe200 100644
--- a/catalog/src/main/resources/org/killbill/billing/catalog/migration/V20161220000000__unit_price_override.sql
+++ b/catalog/src/main/resources/org/killbill/billing/catalog/migration/V20161220000000__unit_price_override.sql
@@ -15,6 +15,7 @@ PRIMARY KEY(record_id)
 );
 CREATE INDEX catalog_override_usage_definition_idx ON catalog_override_usage_definition(tenant_record_id, parent_usage_name, currency);
 
+
 DROP TABLE IF EXISTS catalog_override_tier_definition;
 create table catalog_override_tier_definition
 (
@@ -35,8 +36,8 @@ create table catalog_override_block_definition
 (
 record_id serial unique,
 parent_unit_name varchar(255) NOT NULL,
-size double NOT NULL,
-max double NULL,
+size decimal(15,9) NOT NULL,
+max decimal(15,9) NULL,
 currency varchar(3) NOT NULL,
 price decimal(15,9) NOT NULL,
 effective_date datetime NOT NULL,
@@ -52,7 +53,7 @@ DROP TABLE IF EXISTS catalog_override_phase_usage;
 create table catalog_override_phase_usage
 (
 record_id serial unique,
-usage_number smallint(5) unsigned,
+usage_number int /*! unsigned */,
 usage_def_record_id  bigint /*! unsigned */ not null,
 target_phase_def_record_id bigint /*! unsigned */ not null,
 created_date datetime NOT NULL,
@@ -66,7 +67,7 @@ DROP TABLE IF EXISTS catalog_override_usage_tier;
 create table catalog_override_usage_tier
 (
 record_id serial unique,
-tier_number smallint(5) unsigned,
+tier_number int /*! unsigned */,
 tier_def_record_id bigint /*! unsigned */ not null,
 target_usage_def_record_id bigint /*! unsigned */ not null,
 created_date datetime NOT NULL,
@@ -81,7 +82,7 @@ DROP TABLE IF EXISTS catalog_override_tier_block;
 create table catalog_override_tier_block
 (
 record_id serial unique,
-block_number smallint(5) unsigned,
+block_number int /*! unsigned */,
 block_def_record_id bigint /*! unsigned */ not null,
 target_tier_def_record_id bigint /*! unsigned */ not null,
 created_date datetime NOT NULL,
@@ -89,6 +90,4 @@ created_by varchar(50) NOT NULL,
 tenant_record_id bigint /*! unsigned */ NOT NULL default 0,
 PRIMARY KEY(record_id)
 );
-CREATE INDEX catalog_override_tier_block_idx ON catalog_override_tier_block(tenant_record_id, block_number, block_def_record_id);
-
-
+CREATE INDEX catalog_override_tier_block_idx ON catalog_override_tier_block(tenant_record_id, block_number, block_def_record_id);
\ No newline at end of file
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java b/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
index 2fc4a1f..eabea2e 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
@@ -26,6 +26,7 @@ import org.killbill.billing.invoice.ParkedAccountsManager;
 import org.killbill.billing.invoice.api.DefaultInvoiceService;
 import org.killbill.billing.invoice.api.InvoiceApiHelper;
 import org.killbill.billing.invoice.api.InvoiceInternalApi;
+import org.killbill.billing.invoice.api.InvoiceListenerService;
 import org.killbill.billing.invoice.api.InvoicePaymentApi;
 import org.killbill.billing.invoice.api.InvoiceService;
 import org.killbill.billing.invoice.api.InvoiceUserApi;
@@ -99,8 +100,9 @@ public class DefaultInvoiceModule extends KillBillModule implements InvoiceModul
         bind(InvoiceConfig.class).to(MultiTenantInvoiceConfig.class).asEagerSingleton();
     }
 
-    protected void installInvoiceService() {
+    protected void installInvoiceServices() {
         bind(InvoiceService.class).to(DefaultInvoiceService.class).asEagerSingleton();
+        bind(InvoiceListenerService.class).to(InvoiceListener.class).asEagerSingleton();
     }
 
     protected void installResourceBundleFactory() {
@@ -143,7 +145,7 @@ public class DefaultInvoiceModule extends KillBillModule implements InvoiceModul
         installConfig();
 
         installInvoicePluginApi();
-        installInvoiceService();
+        installInvoiceServices();
         installNotifiers();
         installInvoiceDispatcher();
         installInvoiceListener();
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
index 2d547ac..d55ebac 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
@@ -364,54 +364,38 @@ public class InvoiceDispatcher {
 
     private Invoice processAccountWithLockAndInputTargetDate(final UUID accountId, final LocalDate targetDate,
                                                              final BillingEventSet billingEvents, final boolean isDryRun, final InternalCallContext context) throws InvoiceApiException {
+        final ImmutableAccountData account;
         try {
-            final ImmutableAccountData account = accountApi.getImmutableAccountDataById(accountId, context);
-
-            final List<Invoice> invoices = billingEvents.isAccountAutoInvoiceOff() ?
-                                           ImmutableList.<Invoice>of() :
-                                           ImmutableList.<Invoice>copyOf(Collections2.transform(invoiceDao.getInvoicesByAccount(context),
-                                                                                                new Function<InvoiceModelDao, Invoice>() {
-                                                                                                    @Override
-                                                                                                    public Invoice apply(final InvoiceModelDao input) {
-                                                                                                        return new DefaultInvoice(input);
-                                                                                                    }
-                                                                                                }));
-
-            final Currency targetCurrency = account.getCurrency();
-
-            final UUID targetInvoiceId;
-            if (billingEvents.isAccountAutoInvoiceReuseDraft()) {
-                final InvoiceModelDao earliestDraftInvoice = invoiceDao.getEarliestDraftInvoiceByAccount(context);
-                targetInvoiceId = earliestDraftInvoice != null ? earliestDraftInvoice.getId() : null;
-            } else {
-                targetInvoiceId = null;
-            }
-
-            final InvoiceWithMetadata invoiceWithMetadata = generator.generateInvoice(account, billingEvents, invoices, targetInvoiceId, targetDate, targetCurrency, context);
-            final DefaultInvoice invoice = invoiceWithMetadata.getInvoice();
+            account = accountApi.getImmutableAccountDataById(accountId, context);
+        } catch (final AccountApiException e) {
+            log.error("Unable to generate invoice for accountId='{}', a future notification has NOT been recorded", accountId, e);
+            return null;
+        }
 
-            // Compute future notifications
-            final FutureAccountNotifications futureAccountNotifications = createNextFutureNotificationDate(invoiceWithMetadata, context);
+        final InvoiceWithMetadata invoiceWithMetadata = generateKillBillInvoice(account, targetDate, billingEvents, context);
+        final DefaultInvoice invoice = invoiceWithMetadata.getInvoice();
 
-            //
+        // Compute future notifications
+        final FutureAccountNotifications futureAccountNotifications = createNextFutureNotificationDate(invoiceWithMetadata, context);
 
-            // If invoice comes back null, there is nothing new to generate, we can bail early
-            //
-            if (invoice == null) {
-                if (isDryRun) {
-                    log.info("Generated null dryRun invoice for accountId='{}', targetDate='{}'", accountId, targetDate);
-                } else {
-                    log.info("Generated null invoice for accountId='{}', targetDate='{}'", accountId, targetDate);
+        // If invoice comes back null, there is nothing new to generate, we can bail early
+        if (invoice == null) {
+            if (isDryRun) {
+                log.info("Generated null dryRun invoice for accountId='{}', targetDate='{}'", accountId, targetDate);
+            } else {
+                log.info("Generated null invoice for accountId='{}', targetDate='{}'", accountId, targetDate);
 
-                    final BusInternalEvent event = new DefaultNullInvoiceEvent(accountId, clock.getUTCToday(),
-                                                                               context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken());
+                final BusInternalEvent event = new DefaultNullInvoiceEvent(accountId, clock.getUTCToday(),
+                                                                           context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken());
 
-                    commitInvoiceAndSetFutureNotifications(account, null, futureAccountNotifications, context);
-                    postEvent(event);
-                }
-                return null;
+                commitInvoiceAndSetFutureNotifications(account, null, futureAccountNotifications, context);
+                postEvent(event);
             }
+            return null;
+        }
 
+        boolean success = false;
+        try {
             // Generate missing credit (> 0 for generation and < 0 for use) prior we call the plugin
             final InvoiceItem cbaItemPreInvoicePlugins = computeCBAOnExistingInvoice(invoice, context);
             DefaultInvoice tmpInvoiceForInvoicePlugins = invoice;
@@ -453,21 +437,47 @@ public class InvoiceDispatcher {
                 invoiceModelDao.addInvoiceItems(invoiceItemModelDaos);
 
                 // Commit invoice on disk
-                final boolean isThereAnyItemsLeft = commitInvoiceAndSetFutureNotifications(account, invoiceModelDao, futureAccountNotifications, context);
+                commitInvoiceAndSetFutureNotifications(account, invoiceModelDao, futureAccountNotifications, context);
+                success = true;
+
+                try {
+                    setChargedThroughDates(invoice.getInvoiceItems(FixedPriceInvoiceItem.class), invoice.getInvoiceItems(RecurringInvoiceItem.class), context);
+                } catch (final SubscriptionBaseApiException e) {
+                    log.error("Failed handling SubscriptionBase change.", e);
+                    return null;
+                }
+            }
+        } finally {
+            // Make sure we always set future notifications in case of errors
+            if (!isDryRun && !success) {
+                commitInvoiceAndSetFutureNotifications(account, null, futureAccountNotifications, context);
+            }
+        }
 
-                final boolean isRealInvoiceWithNonEmptyItems = isThereAnyItemsLeft ? isRealInvoiceWithItems : false;
+        return invoice;
+    }
 
-                setChargedThroughDates(invoice.getInvoiceItems(FixedPriceInvoiceItem.class), invoice.getInvoiceItems(RecurringInvoiceItem.class), context);
+    private InvoiceWithMetadata generateKillBillInvoice(final ImmutableAccountData account, final LocalDate targetDate, final BillingEventSet billingEvents, final InternalCallContext context) throws InvoiceApiException {
+        final List<Invoice> invoices = billingEvents.isAccountAutoInvoiceOff() ?
+                                       ImmutableList.<Invoice>of() :
+                                       ImmutableList.<Invoice>copyOf(Collections2.transform(invoiceDao.getInvoicesByAccount(context),
+                                                                                            new Function<InvoiceModelDao, Invoice>() {
+                                                                                                @Override
+                                                                                                public Invoice apply(final InvoiceModelDao input) {
+                                                                                                    return new DefaultInvoice(input);
+                                                                                                }
+                                                                                            }));
+        final Currency targetCurrency = account.getCurrency();
 
-            }
-            return invoice;
-        } catch (final AccountApiException e) {
-            log.error("Failed handling SubscriptionBase change.", e);
-            return null;
-        } catch (final SubscriptionBaseApiException e) {
-            log.error("Failed handling SubscriptionBase change.", e);
-            return null;
+        final UUID targetInvoiceId;
+        if (billingEvents.isAccountAutoInvoiceReuseDraft()) {
+            final InvoiceModelDao earliestDraftInvoice = invoiceDao.getEarliestDraftInvoiceByAccount(context);
+            targetInvoiceId = earliestDraftInvoice != null ? earliestDraftInvoice.getId() : null;
+        } else {
+            targetInvoiceId = null;
         }
+
+        return generator.generateInvoice(account, billingEvents, invoices, targetInvoiceId, targetDate, targetCurrency, context);
     }
 
     private FutureAccountNotifications createNextFutureNotificationDate(final InvoiceWithMetadata invoiceWithMetadata, final InternalCallContext context) {
@@ -549,17 +559,16 @@ public class InvoiceDispatcher {
         log.info(tmp.toString());
     }
 
-    private boolean commitInvoiceAndSetFutureNotifications(final ImmutableAccountData account,
-                                                           @Nullable final InvoiceModelDao invoiceModelDao,
-                                                           final FutureAccountNotifications futureAccountNotifications,
-                                                           final InternalCallContext context) throws SubscriptionBaseApiException, InvoiceApiException {
+    private void commitInvoiceAndSetFutureNotifications(final ImmutableAccountData account,
+                                                        @Nullable final InvoiceModelDao invoiceModelDao,
+                                                        final FutureAccountNotifications futureAccountNotifications,
+                                                        final InternalCallContext context) {
         final boolean isThereAnyItemsLeft = invoiceModelDao != null && !invoiceModelDao.getInvoiceItems().isEmpty();
         if (isThereAnyItemsLeft) {
             invoiceDao.createInvoice(invoiceModelDao, futureAccountNotifications, context);
         } else {
             invoiceDao.setFutureAccountNotificationsForEmptyInvoice(account.getId(), futureAccountNotifications, context);
         }
-        return isThereAnyItemsLeft;
     }
 
     private InvoiceItem computeCBAOnExistingInvoice(final Invoice invoice, final InternalCallContext context) throws InvoiceApiException {
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
index f81483d..bf7e879 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
@@ -31,13 +31,21 @@ import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
 import org.killbill.billing.events.InvoiceCreationInternalEvent;
 import org.killbill.billing.invoice.api.InvoiceApiException;
 import org.killbill.billing.invoice.api.InvoiceInternalApi;
+import org.killbill.billing.invoice.api.InvoiceListenerService;
 import org.killbill.billing.invoice.api.user.DefaultInvoiceAdjustmentEvent;
+import org.killbill.billing.platform.api.LifecycleHandlerType;
+import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
 import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.UserType;
-import org.killbill.billing.util.config.definition.InvoiceConfig;
 import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.killbill.queue.retry.RetryableSubscriber;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberAction;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberQueueHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,78 +53,157 @@ import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 
-public class InvoiceListener {
+@SuppressWarnings("TypeMayBeWeakened")
+public class InvoiceListener extends RetryableService implements InvoiceListenerService {
+
+    public static final String INVOICE_LISTENER_SERVICE_NAME = "invoice-listener-service";
 
     private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);
 
     private final InvoiceDispatcher dispatcher;
     private final InternalCallContextFactory internalCallContextFactory;
-    private final AccountInternalApi accountApi;
     private final InvoiceInternalApi invoiceApi;
-    private final InvoiceConfig invoiceConfig;
-    private final Clock clock;
+    private final RetryableSubscriber retryableSubscriber;
+    private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
 
     @Inject
-    public InvoiceListener(final AccountInternalApi accountApi, final Clock clock, final InternalCallContextFactory internalCallContextFactory,
-                           final InvoiceConfig invoiceConfig, final InvoiceDispatcher dispatcher, InvoiceInternalApi invoiceApi) {
-        this.accountApi = accountApi;
+    public InvoiceListener(final AccountInternalApi accountApi,
+                           final InternalCallContextFactory internalCallContextFactory,
+                           final InvoiceDispatcher dispatcher,
+                           final InvoiceInternalApi invoiceApi,
+                           final NotificationQueueService notificationQueueService,
+                           final Clock clock) {
+        super(notificationQueueService);
         this.dispatcher = dispatcher;
-        this.invoiceConfig = invoiceConfig;
         this.internalCallContextFactory = internalCallContextFactory;
-        this.clock = clock;
         this.invoiceApi = invoiceApi;
+
+        subscriberQueueHandler.subscribe(EffectiveSubscriptionInternalEvent.class,
+                                         new SubscriberAction<EffectiveSubscriptionInternalEvent>() {
+                                             @Override
+                                             public void run(final EffectiveSubscriptionInternalEvent event) {
+                                                 try {
+                                                     //  Skip future uncancel event
+                                                     //  Skip events which are marked as not being the last one
+                                                     if (event.getTransitionType() == SubscriptionBaseTransitionType.UNCANCEL ||
+                                                         event.getRemainingEventsForUserOperation() > 0) {
+                                                         return;
+                                                     }
+                                                     final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+                                                     dispatcher.processSubscriptionForInvoiceGeneration(event, context);
+                                                 } catch (final InvoiceApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 }
+                                             }
+                                         });
+        subscriberQueueHandler.subscribe(BlockingTransitionInternalEvent.class,
+                                         new SubscriberAction<BlockingTransitionInternalEvent>() {
+                                             @Override
+                                             public void run(final BlockingTransitionInternalEvent event) {
+                                                 // We are only interested in blockBilling or unblockBilling transitions.
+                                                 if (!event.isTransitionedToUnblockedBilling() && !event.isTransitionedToBlockedBilling()) {
+                                                     return;
+                                                 }
+
+                                                 try {
+                                                     final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+                                                     final UUID accountId = accountApi.getByRecordId(event.getSearchKey1(), context);
+                                                     dispatcher.processAccountFromNotificationOrBusEvent(accountId, null, null, context);
+                                                 } catch (final InvoiceApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 } catch (final AccountApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 }
+                                             }
+                                         });
+        subscriberQueueHandler.subscribe(InvoiceCreationInternalEvent.class,
+                                         new SubscriberAction<InvoiceCreationInternalEvent>() {
+                                             @Override
+                                             public void run(final InvoiceCreationInternalEvent event) {
+                                                 try {
+                                                     final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "CreateParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+                                                     final Account account = accountApi.getAccountById(event.getAccountId(), context);
+
+                                                     // catch children invoices and populate the parent summary invoice
+                                                     if (isChildrenAccountAndPaymentDelegated(account)) {
+                                                         dispatcher.processParentInvoiceForInvoiceGeneration(account, event.getInvoiceId(), context);
+                                                     }
+
+                                                 } catch (final InvoiceApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 } catch (final AccountApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 }
+                                             }
+                                         });
+        subscriberQueueHandler.subscribe(DefaultInvoiceAdjustmentEvent.class,
+                                         new SubscriberAction<DefaultInvoiceAdjustmentEvent>() {
+                                             @Override
+                                             public void run(final DefaultInvoiceAdjustmentEvent event) {
+                                                 try {
+                                                     final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "AdjustParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+                                                     final Account account = accountApi.getAccountById(event.getAccountId(), context);
+
+                                                     // catch children invoices and populate the parent summary invoice
+                                                     if (isChildrenAccountAndPaymentDelegated(account)) {
+                                                         dispatcher.processParentInvoiceForAdjustments(account, event.getInvoiceId(), context);
+                                                     }
+                                                 } catch (final InvoiceApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 } catch (final AccountApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 }
+                                             }
+                                         });
+        this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler);
+    }
+
+    @Override
+    public String getName() {
+        return INVOICE_LISTENER_SERVICE_NAME;
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+    public void initialize() {
+        super.initialize("invoice-listener", subscriberQueueHandler);
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+    public void start() {
+        super.start();
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+    public void stop() throws NoSuchNotificationQueue {
+        super.stop();
     }
 
     @AllowConcurrentEvents
     @Subscribe
     public void handleSubscriptionTransition(final EffectiveSubscriptionInternalEvent event) {
-        try {
-            //  Skip future uncancel event
-            //  Skip events which are marked as not being the last one
-            if (event.getTransitionType() == SubscriptionBaseTransitionType.UNCANCEL ||
-                event.getRemainingEventsForUserOperation() > 0) {
-                return;
-            }
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            dispatcher.processSubscriptionForInvoiceGeneration(event, context);
-        } catch (InvoiceApiException e) {
-            log.warn("Unable to process event {}", event, e);
-        }
+        retryableSubscriber.handleEvent(event);
     }
 
     @AllowConcurrentEvents
     @Subscribe
     public void handleBlockingStateTransition(final BlockingTransitionInternalEvent event) {
-        // We are only interested in blockBilling or unblockBilling transitions.
-        if (!event.isTransitionedToUnblockedBilling() && !event.isTransitionedToBlockedBilling()) {
-            return;
-        }
-
-        try {
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            final UUID accountId = accountApi.getByRecordId(event.getSearchKey1(), context);
-            dispatcher.processAccountFromNotificationOrBusEvent(accountId, null, null, context);
-        } catch (InvoiceApiException e) {
-            log.warn("Unable to process event {}", event, e);
-        } catch (AccountApiException e) {
-            log.warn("Unable to process event {}", event, e);
-        }
+        retryableSubscriber.handleEvent(event);
     }
 
     public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+        final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
         try {
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
             dispatcher.processSubscriptionForInvoiceGeneration(subscriptionId, context.toLocalDate(eventDateTime), context);
-        } catch (InvoiceApiException e) {
+        } catch (final InvoiceApiException e) {
             log.warn("Unable to process subscriptionId='{}', eventDateTime='{}'", subscriptionId, eventDateTime, e);
         }
     }
 
     public void handleEventForInvoiceNotification(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+        final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
         try {
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
             dispatcher.processSubscriptionForInvoiceNotification(subscriptionId, context.toLocalDate(eventDateTime), context);
-        } catch (InvoiceApiException e) {
+        } catch (final InvoiceApiException e) {
             log.warn("Unable to process subscriptionId='{}', eventDateTime='{}'", subscriptionId, eventDateTime, e);
         }
     }
@@ -124,21 +211,7 @@ public class InvoiceListener {
     @AllowConcurrentEvents
     @Subscribe
     public void handleChildrenInvoiceCreationEvent(final InvoiceCreationInternalEvent event) {
-
-        try {
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "CreateParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            final Account account = accountApi.getAccountById(event.getAccountId(), context);
-
-            // catch children invoices and populate the parent summary invoice
-            if (isChildrenAccountAndPaymentDelegated(account)) {
-                dispatcher.processParentInvoiceForInvoiceGeneration(account, event.getInvoiceId(), context);
-            }
-
-        } catch (InvoiceApiException e) {
-            log.error(e.getMessage());
-        } catch (AccountApiException e) {
-            log.error(e.getMessage());
-        }
+        retryableSubscriber.handleEvent(event);
     }
 
     private boolean isChildrenAccountAndPaymentDelegated(final Account account) {
@@ -149,7 +222,7 @@ public class InvoiceListener {
         try {
             final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Commit Invoice", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
             invoiceApi.commitInvoice(invoiceId, context);
-        } catch (InvoiceApiException e) {
+        } catch (final InvoiceApiException e) {
             // In case we commit parent invoice earlier we expect to see an INVOICE_INVALID_STATUS status
             if (ErrorCode.INVOICE_INVALID_STATUS.getCode() != e.getCode()) {
                 log.error(e.getMessage());
@@ -160,21 +233,6 @@ public class InvoiceListener {
     @AllowConcurrentEvents
     @Subscribe
     public void handleChildrenInvoiceAdjustmentEvent(final DefaultInvoiceAdjustmentEvent event) {
-
-        try {
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "AdjustParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            final Account account = accountApi.getAccountById(event.getAccountId(), context);
-
-            // catch children invoices and populate the parent summary invoice
-            if (isChildrenAccountAndPaymentDelegated(account)) {
-                dispatcher.processParentInvoiceForAdjustments(account, event.getInvoiceId(), context);
-            }
-
-        } catch (InvoiceApiException e) {
-            log.error(e.getMessage());
-        } catch (AccountApiException e) {
-            log.error(e.getMessage());
-        }
+        retryableSubscriber.handleEvent(event);
     }
-
 }
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
index ab9b982..afb7750 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -24,10 +24,20 @@ import org.killbill.billing.ObjectType;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.events.ControlTagDeletionInternalEvent;
 import org.killbill.billing.invoice.api.InvoiceApiException;
+import org.killbill.billing.platform.api.KillbillService;
+import org.killbill.billing.platform.api.LifecycleHandlerType;
+import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.UserType;
 import org.killbill.billing.util.tag.ControlTagType;
+import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.killbill.queue.retry.RetryableSubscriber;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberAction;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberQueueHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,28 +45,64 @@ import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 
-public class InvoiceTagHandler {
+@SuppressWarnings("TypeMayBeWeakened")
+public class InvoiceTagHandler extends RetryableService implements KillbillService {
+
+    private static final String INVOICE_TAG_HANDLER_SERVICE_NAME = "invoice-tag-handler-service";
 
     private static final Logger log = LoggerFactory.getLogger(InvoiceTagHandler.class);
 
     private final InvoiceDispatcher dispatcher;
-    private final InternalCallContextFactory internalCallContextFactory;
+    private final RetryableSubscriber retryableSubscriber;
+
+    private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
 
     @Inject
-    public InvoiceTagHandler(final InvoiceDispatcher dispatcher,
+    public InvoiceTagHandler(final Clock clock,
+                             final InvoiceDispatcher dispatcher,
+                             final NotificationQueueService notificationQueueService,
                              final InternalCallContextFactory internalCallContextFactory) {
+        super(notificationQueueService);
         this.dispatcher = dispatcher;
-        this.internalCallContextFactory = internalCallContextFactory;
+
+        final SubscriberAction<ControlTagDeletionInternalEvent> action = new SubscriberAction<ControlTagDeletionInternalEvent>() {
+            @Override
+            public void run(final ControlTagDeletionInternalEvent event) {
+                if (event.getTagDefinition().getName().equals(ControlTagType.AUTO_INVOICING_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
+                    final UUID accountId = event.getObjectId();
+                    final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "InvoiceTagHandler", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+                    processUnpaid_AUTO_INVOICING_OFF_invoices(accountId, context);
+                }
+            }
+        };
+        subscriberQueueHandler.subscribe(ControlTagDeletionInternalEvent.class, action);
+        this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler);
+    }
+
+    @Override
+    public String getName() {
+        return INVOICE_TAG_HANDLER_SERVICE_NAME;
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+    public void initialize() {
+        super.initialize("invoice-tag-handler", subscriberQueueHandler);
     }
 
     @AllowConcurrentEvents
     @Subscribe
     public void process_AUTO_INVOICING_OFF_removal(final ControlTagDeletionInternalEvent event) {
-        if (event.getTagDefinition().getName().equals(ControlTagType.AUTO_INVOICING_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
-            final UUID accountId = event.getObjectId();
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "InvoiceTagHandler", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            processUnpaid_AUTO_INVOICING_OFF_invoices(accountId, context);
-        }
+        retryableSubscriber.handleEvent(event);
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+    public void start() {
+        super.start();
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+    public void stop() throws NoSuchNotificationQueue {
+        super.stop();
     }
 
     private void processUnpaid_AUTO_INVOICING_OFF_invoices(final UUID accountId, final InternalCallContext context) {
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 644cec3..85391a5 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -25,45 +27,50 @@ import org.killbill.billing.subscription.api.SubscriptionBase;
 import org.killbill.billing.subscription.api.SubscriptionBaseInternalApi;
 import org.killbill.billing.subscription.api.user.SubscriptionBaseApiException;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
-import org.killbill.billing.util.config.definition.InvoiceConfig;
+import org.killbill.clock.Clock;
 import org.killbill.notificationq.api.NotificationEvent;
 import org.killbill.notificationq.api.NotificationQueue;
 import org.killbill.notificationq.api.NotificationQueueService;
 import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
 import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
 import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
+import org.killbill.queue.retry.RetryableHandler;
+import org.killbill.queue.retry.RetryableService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 
-public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
-
-    private static final Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+public class DefaultNextBillingDateNotifier extends RetryableService implements NextBillingDateNotifier {
 
     public static final String NEXT_BILLING_DATE_NOTIFIER_QUEUE = "next-billing-date-queue";
 
+    private static final Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+
+    private final Clock clock;
     private final NotificationQueueService notificationQueueService;
     private final SubscriptionBaseInternalApi subscriptionApi;
     private final InvoiceListener listener;
-    private final InternalCallContextFactory callContextFactory;
+    private final InternalCallContextFactory internalCallContextFactory;
 
     private NotificationQueue nextBillingQueue;
 
     @Inject
-    public DefaultNextBillingDateNotifier(final NotificationQueueService notificationQueueService,
+    public DefaultNextBillingDateNotifier(final Clock clock,
+                                          final NotificationQueueService notificationQueueService,
                                           final SubscriptionBaseInternalApi subscriptionApi,
                                           final InvoiceListener listener,
-                                          final InternalCallContextFactory callContextFactory) {
+                                          final InternalCallContextFactory internalCallContextFactory) {
+        super(notificationQueueService);
+        this.clock = clock;
         this.notificationQueueService = notificationQueueService;
         this.subscriptionApi = subscriptionApi;
         this.listener = listener;
-        this.callContextFactory = callContextFactory;
+        this.internalCallContextFactory = internalCallContextFactory;
     }
 
     @Override
     public void initialize() throws NotificationQueueAlreadyExists {
-
         final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
             @Override
             public void handleReadyNotification(final NotificationEvent notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
@@ -77,7 +84,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
                 // Just to ensure compatibility with json that might not have that targetDate field (old versions < 0.13.6)
                 final DateTime targetDate = key.getTargetDate() != null ? key.getTargetDate() : eventDate;
                 try {
-                    final SubscriptionBase subscription = subscriptionApi.getSubscriptionFromId(key.getUuidKey(), callContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId));
+                    final SubscriptionBase subscription = subscriptionApi.getSubscriptionFromId(key.getUuidKey(), internalCallContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId));
                     if (subscription == null) {
                         log.warn("Unable to retrieve subscriptionId='{}' for event {}", key.getUuidKey(), key);
                         return;
@@ -88,19 +95,24 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
                     } else {
                         processEventForInvoiceGeneration(key.getUuidKey(), targetDate, userToken, accountRecordId, tenantRecordId);
                     }
-                } catch (SubscriptionBaseApiException e) {
+                } catch (final SubscriptionBaseApiException e) {
                     log.warn("Error retrieving subscriptionId='{}'", key.getUuidKey(), e);
                 }
             }
         };
 
+        final NotificationQueueHandler retryableHandler = new RetryableHandler(clock, this, notificationQueueHandler);
         nextBillingQueue = notificationQueueService.createNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
                                                                             NEXT_BILLING_DATE_NOTIFIER_QUEUE,
-                                                                            notificationQueueHandler);
+                                                                            retryableHandler);
+
+        super.initialize(nextBillingQueue, notificationQueueHandler);
     }
 
     @Override
     public void start() {
+        super.start();
+
         nextBillingQueue.startQueue();
     }
 
@@ -110,6 +122,8 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
             nextBillingQueue.stopQueue();
             notificationQueueService.deleteNotificationQueue(nextBillingQueue.getServiceName(), nextBillingQueue.getQueueName());
         }
+
+        super.stop();
     }
 
     private void processEventForInvoiceGeneration(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
index 128616f..ef72b84 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
@@ -26,6 +26,7 @@ import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.invoice.api.InvoiceInternalApi;
 import org.killbill.clock.Clock;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.notificationq.api.NotificationQueueService;
 
 public class TestInvoiceNotificationQListener extends InvoiceListener {
 
@@ -33,8 +34,13 @@ public class TestInvoiceNotificationQListener extends InvoiceListener {
     UUID latestSubscriptionId = null;
 
     @Inject
-    public TestInvoiceNotificationQListener(final AccountInternalApi accountApi, final Clock clock, final InternalCallContextFactory internalCallContextFactory, final InvoiceDispatcher dispatcher, final InvoiceInternalApi invoiceApi) {
-        super(accountApi, clock, internalCallContextFactory, null, dispatcher, invoiceApi);
+    public TestInvoiceNotificationQListener(final AccountInternalApi accountApi,
+                                            final Clock clock,
+                                            final InternalCallContextFactory internalCallContextFactory,
+                                            final InvoiceDispatcher dispatcher,
+                                            final InvoiceInternalApi invoiceApi,
+                                            final NotificationQueueService notificationQueueService) {
+        super(accountApi, internalCallContextFactory, dispatcher, invoiceApi, notificationQueueService, clock);
     }
 
     @Override
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AccountResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AccountResource.java
index 8d30a76..6cbcebb 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AccountResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AccountResource.java
@@ -821,6 +821,7 @@ public class AccountResource extends JaxRsResourceBase {
                                         @PathParam("accountId") final String accountIdStr,
                                         @QueryParam(QUERY_PAYMENT_METHOD_IS_DEFAULT) @DefaultValue("false") final Boolean isDefault,
                                         @QueryParam(QUERY_PAY_ALL_UNPAID_INVOICES) @DefaultValue("false") final Boolean payAllUnpaidInvoices,
+                                        @QueryParam(QUERY_PAYMENT_CONTROL_PLUGIN_NAME) final List<String> paymentControlPluginNames,
                                         @QueryParam(QUERY_PLUGIN_PROPERTY) final List<String> pluginPropertiesString,
                                         @HeaderParam(HDR_CREATED_BY) final String createdBy,
                                         @HeaderParam(HDR_REASON) final String reason,
@@ -845,7 +846,9 @@ public class AccountResource extends JaxRsResourceBase {
             return Response.status(Status.BAD_REQUEST).build();
         }
 
-        final UUID paymentMethodId = paymentApi.addPaymentMethod(account, data.getExternalKey(), data.getPluginName(), isDefault, data.getPluginDetail(), pluginProperties, callContext);
+
+        final PaymentOptions paymentOptions = createControlPluginApiPaymentOptions(paymentControlPluginNames);
+        final UUID paymentMethodId = paymentApi.addPaymentMethodWithPaymentControl(account, data.getExternalKey(), data.getPluginName(), isDefault, data.getPluginDetail(), pluginProperties, paymentOptions, callContext);
         if (payAllUnpaidInvoices && unpaidInvoices.size() > 0) {
             for (final Invoice invoice : unpaidInvoices) {
                 createPurchaseForInvoice(account, invoice.getId(), invoice.getBalance(), paymentMethodId, false, null, null, pluginProperties, callContext);

pom.xml 2(+1 -1)

diff --git a/pom.xml b/pom.xml
index aa616f6..817b368 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>killbill-oss-parent</artifactId>
         <groupId>org.kill-bill.billing</groupId>
-        <version>0.141.1-SNAPSHOT</version>
+        <version>0.141.4</version>
     </parent>
     <artifactId>killbill</artifactId>
     <version>0.19.0-SNAPSHOT</version>

util/pom.xml 4(+4 -0)

diff --git a/util/pom.xml b/util/pom.xml
index b6369da..8222bf9 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -210,6 +210,10 @@
         </dependency>
         <dependency>
             <groupId>org.kill-bill.billing.plugin</groupId>
+            <artifactId>killbill-plugin-api-invoice</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.kill-bill.billing.plugin</groupId>
             <artifactId>killbill-plugin-api-notification</artifactId>
         </dependency>
         <dependency>
diff --git a/util/src/main/resources/org/killbill/billing/util/ddl-postgresql.sql b/util/src/main/resources/org/killbill/billing/util/ddl-postgresql.sql
index 49d05f7..6fac566 100644
--- a/util/src/main/resources/org/killbill/billing/util/ddl-postgresql.sql
+++ b/util/src/main/resources/org/killbill/billing/util/ddl-postgresql.sql
@@ -45,3 +45,28 @@ CREATE OR REPLACE FUNCTION hour(ts TIMESTAMP WITH TIME ZONE) RETURNS INTEGER AS 
         RETURN result;
     END;
 $$ LANGUAGE plpgsql IMMUTABLE;
+
+/* Alter 'serial' columns to 'bigint' because 'serial' is 32bit in PG while 64bit in MySQL */
+CREATE OR REPLACE FUNCTION update_serial_to_bigint_oncreate()
+        RETURNS event_trigger LANGUAGE plpgsql AS $$
+DECLARE
+    r record;
+    matches text[];
+BEGIN
+    FOR r IN SELECT * FROM pg_event_trigger_ddl_commands()
+    LOOP
+	    SELECT regexp_matches(current_query(), E'\\m(\\w+)\\s+serial\\M') INTO matches;
+	    IF r.object_type = 'table' AND array_length(matches, 1) > 0 THEN
+            RAISE NOTICE 'Altering % % column % from serial to bigint',
+                     r.object_type,
+                     r.object_identity,
+                     matches[1];
+             EXECUTE 'ALTER TABLE ' || r.object_identity || ' ALTER COLUMN ' || matches[1] || ' TYPE bigint';
+        END IF;
+    END LOOP;
+END
+$$;
+
+CREATE EVENT TRIGGER update_serial_to_bigint_oncreate
+   ON ddl_command_end WHEN TAG IN ('CREATE TABLE')
+   EXECUTE PROCEDURE update_serial_to_bigint_oncreate();
diff --git a/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java b/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java
new file mode 100644
index 0000000..c33b881
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.listener;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.killbill.billing.ObjectType;
+import org.killbill.billing.account.api.ImmutableAccountData;
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.BusInternalEvent;
+import org.killbill.billing.events.ControlTagCreationInternalEvent;
+import org.killbill.billing.events.ControlTagDeletionInternalEvent;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApiRetryException;
+import org.killbill.billing.util.UtilTestSuiteWithEmbeddedDB;
+import org.killbill.billing.util.tag.DefaultTagDefinition;
+import org.killbill.billing.util.tag.api.user.DefaultControlTagCreationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.killbill.queue.retry.RetryableSubscriber;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberAction;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberQueueHandler;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class TestRetryableService extends UtilTestSuiteWithEmbeddedDB {
+
+    private static final String TEST_LISTENER = "TestListener";
+    private static final ImmutableList<Period> RETRY_SCHEDULE = ImmutableList.<Period>of(Period.hours(1), Period.days(1));
+
+    private ControlTagCreationInternalEvent event;
+    private TestListener testListener;
+
+    @BeforeClass(groups = "slow")
+    public void setUpClass() throws Exception {
+        final ImmutableAccountData immutableAccountData = Mockito.mock(ImmutableAccountData.class);
+        Mockito.when(immutableAccountInternalApi.getImmutableAccountDataByRecordId(Mockito.<Long>eq(internalCallContext.getAccountRecordId()), Mockito.<InternalTenantContext>any())).thenReturn(immutableAccountData);
+    }
+
+    @BeforeMethod(groups = "slow")
+    public void setUp() throws Exception {
+        event = new DefaultControlTagCreationEvent(UUID.randomUUID(),
+                                                   UUID.randomUUID(),
+                                                   ObjectType.ACCOUNT,
+                                                   new DefaultTagDefinition("name", "description", false),
+                                                   internalCallContext.getAccountRecordId(),
+                                                   internalCallContext.getTenantRecordId(),
+                                                   UUID.randomUUID());
+
+        testListener = new TestListener();
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+    }
+
+    @AfterMethod(groups = "slow")
+    public void tearDown() throws Exception {
+        testListener.stop();
+    }
+
+    @Test(groups = "slow")
+    public void testFixUp() throws Exception {
+        testListener.throwRetryableException = true;
+
+        final DateTime startTime = clock.getUTCNow();
+        testListener.handleEvent(event);
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        List<NotificationEventWithMetadata> futureRetryableEvents = getFutureRetryableEvents();
+        Assert.assertEquals(futureRetryableEvents.size(), 1);
+        Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(0))), 0);
+
+        clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        futureRetryableEvents = getFutureRetryableEvents();
+        Assert.assertEquals(futureRetryableEvents.size(), 1);
+        Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(1))), 0);
+
+        testListener.throwRetryableException = false;
+
+        clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 1);
+        Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+    }
+
+    @Test(groups = "slow")
+    public void testGiveUp() throws Exception {
+        testListener.throwRetryableException = true;
+
+        final DateTime startTime = clock.getUTCNow();
+        testListener.handleEvent(event);
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        List<NotificationEventWithMetadata> futureRetryableEvents = getFutureRetryableEvents();
+        Assert.assertEquals(futureRetryableEvents.size(), 1);
+        Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(0))), 0);
+
+        clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        futureRetryableEvents = getFutureRetryableEvents();
+        Assert.assertEquals(futureRetryableEvents.size(), 1);
+        Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(1))), 0);
+
+        clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        // Give up
+        Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+    }
+
+    @Test(groups = "slow")
+    public void testNotRetryableException() throws Exception {
+        testListener.throwOtherException = true;
+
+        try {
+            testListener.handleEvent(event);
+            Assert.fail("Expected exception");
+        } catch (final IllegalArgumentException e) {
+            Assert.assertTrue(true);
+        }
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+    }
+
+    private List<NotificationEventWithMetadata> getFutureRetryableEvents() throws NoSuchNotificationQueue {
+        final NotificationQueue notificationQueue = queueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, TEST_LISTENER);
+        return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+    }
+
+    private final class TestListener extends RetryableService {
+
+        private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
+        private final Collection<BusInternalEvent> eventsSeen = new LinkedList<BusInternalEvent>();
+
+        private final RetryableSubscriber retryableSubscriber;
+
+        private boolean throwRetryableException = false;
+        private boolean throwOtherException = false;
+
+        public TestListener() {
+            super(queueService);
+
+            subscriberQueueHandler.subscribe(ControlTagDeletionInternalEvent.class,
+                                             new SubscriberAction<ControlTagDeletionInternalEvent>() {
+                                                 @Override
+                                                 public void run(final ControlTagDeletionInternalEvent event) {
+                                                     Assert.fail("No handler registered");
+                                                 }
+                                             });
+            subscriberQueueHandler.subscribe(ControlTagCreationInternalEvent.class,
+                                             new SubscriberAction<ControlTagCreationInternalEvent>() {
+                                                 @Override
+                                                 public void run(final ControlTagCreationInternalEvent event) {
+                                                     if (throwRetryableException) {
+                                                         throw new InvoicePluginApiRetryException(RETRY_SCHEDULE);
+                                                     } else if (throwOtherException) {
+                                                         throw new IllegalArgumentException("EXPECTED");
+                                                     } else {
+                                                         eventsSeen.add(event);
+                                                     }
+                                                 }
+                                             });
+            this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler);
+
+            initialize(TEST_LISTENER, subscriberQueueHandler);
+            start();
+        }
+
+        public void handleEvent(final ControlTagCreationInternalEvent event) {
+            retryableSubscriber.handleEvent(event);
+        }
+    }
+}