killbill-aplcache

Merge pull request #785 from killbill/fix-for-767 Built-in

9/27/2017 3:12:43 PM

Details

diff --git a/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java b/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java
new file mode 100644
index 0000000..1f54f9c
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.invoice.api;
+
+import org.killbill.billing.platform.api.KillbillService;
+
+public interface InvoiceListenerService extends KillbillService {
+}
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
index 606d7dc..fc49f94 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014 Groupon, Inc
- * Copyright 2014 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -27,10 +27,8 @@ import org.joda.time.DateTime;
 import org.killbill.billing.DBTestingHelper;
 import org.killbill.billing.account.api.Account;
 import org.killbill.billing.api.TestApiListener.NextEvent;
-import org.killbill.billing.beatrix.extbus.DefaultBusExternalEvent;
 import org.killbill.billing.callcontext.DefaultCallContext;
 import org.killbill.billing.catalog.api.BillingPeriod;
-import org.killbill.billing.catalog.api.PriceListSet;
 import org.killbill.billing.catalog.api.ProductCategory;
 import org.killbill.billing.entitlement.api.DefaultEntitlement;
 import org.killbill.billing.notification.plugin.api.ExtBusEvent;
@@ -46,11 +44,6 @@ import org.killbill.billing.util.callcontext.CallContext;
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.UserType;
 import org.killbill.billing.util.jackson.ObjectMapper;
-import org.killbill.billing.util.nodes.NodeCommand;
-import org.killbill.billing.util.nodes.NodeCommandMetadata;
-import org.killbill.billing.util.nodes.NodeCommandProperty;
-import org.killbill.billing.util.nodes.PluginNodeCommandMetadata;
-import org.killbill.billing.util.nodes.SystemNodeCommandType;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -58,65 +51,30 @@ import org.testng.annotations.Test;
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonMappingException;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.eventbus.Subscribe;
 
-import static org.awaitility.Awaitility.await;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
 import static org.testng.Assert.assertNotNull;
 
 public class TestPublicBus extends TestIntegrationBase {
 
-    private PublicListener publicListener;
+    private static final ObjectMapper mapper = new ObjectMapper();
 
+    private PublicListener publicListener;
     private AtomicInteger externalBusCount;
 
-    private final ObjectMapper mapper = new ObjectMapper();
-
     @Override
     protected KillbillConfigSource getConfigSource() {
-        ImmutableMap additionalProperties = new ImmutableMap.Builder()
-                .put("org.killbill.billing.util.broadcast.rate", "500ms")
-                .build();
+        final ImmutableMap<String, String> additionalProperties = new ImmutableMap.Builder<String, String>().put("org.killbill.billing.util.broadcast.rate", "500ms")
+                                                                                                            .build();
         return getConfigSource("/beatrix.properties", additionalProperties);
     }
 
-
-    public class PublicListener {
-
-        @Subscribe
-        public void handleExternalEvents(final ExtBusEvent event) {
-            log.info("GOT EXT EVENT " + event);
-
-            if (event.getEventType() == ExtBusEventType.SUBSCRIPTION_CREATION ||
-                event.getEventType() == ExtBusEventType.SUBSCRIPTION_CANCEL ||
-                event.getEventType() == ExtBusEventType.SUBSCRIPTION_PHASE ||
-                event.getEventType() == ExtBusEventType.SUBSCRIPTION_CHANGE ||
-                event.getEventType() == ExtBusEventType.SUBSCRIPTION_UNCANCEL ||
-                event.getEventType() == ExtBusEventType.SUBSCRIPTION_BCD_CHANGE) {
-                try {
-                    final SubscriptionMetadata obj = (SubscriptionMetadata) mapper.readValue(event.getMetaData(), SubscriptionMetadata.class);
-                    Assert.assertNotNull(obj.getBundleExternalKey());
-                    Assert.assertNotNull(obj.getActionType());
-                } catch (JsonParseException e) {
-                    Assert.fail("Could not deserialize metada section", e);
-                } catch (JsonMappingException e) {
-                    Assert.fail("Could not deserialize metada section", e);
-                } catch (IOException e) {
-                    Assert.fail("Could not deserialize metada section", e);
-                }
-            }
-
-            externalBusCount.incrementAndGet();
-
-        }
-    }
-
     @Override
     @BeforeMethod(groups = "slow")
     public void beforeMethod() throws Exception {
-
         /*
         We copy the initialization instead of invoking the super method so we can add the registration
         of the publicBus event;
@@ -162,7 +120,6 @@ public class TestPublicBus extends TestIntegrationBase {
 
     @Test(groups = "slow")
     public void testSimple() throws Exception {
-
         final DateTime initialDate = new DateTime(2012, 2, 1, 0, 3, 42, 0, testTimeZone);
         final int billingDay = 2;
 
@@ -189,7 +146,6 @@ public class TestPublicBus extends TestIntegrationBase {
 
         addDaysAndCheckForCompletion(31, NextEvent.PHASE, NextEvent.INVOICE, NextEvent.PAYMENT, NextEvent.INVOICE_PAYMENT);
 
-
         await().atMost(10, SECONDS).until(new Callable<Boolean>() {
             @Override
             public Boolean call() throws Exception {
@@ -197,12 +153,10 @@ public class TestPublicBus extends TestIntegrationBase {
                 return externalBusCount.get() == 10;
             }
         });
-
     }
 
     @Test(groups = "slow")
     public void testTenantKVChange() throws Exception {
-
         final TenantData tenantData = new DefaultTenant(null, clock.getUTCNow(), clock.getUTCNow(), "MY_TENANT", "key", "s3Cr3T");
         final CallContext contextWithNoTenant = new DefaultCallContext(null, null, "loulou", CallOrigin.EXTERNAL, UserType.ADMIN, "no reason", "hum", UUID.randomUUID(), clock);
         final Tenant tenant = tenantUserApi.createTenant(tenantData, contextWithNoTenant);
@@ -219,4 +173,34 @@ public class TestPublicBus extends TestIntegrationBase {
             }
         });
     }
+
+    public class PublicListener {
+
+        @Subscribe
+        public void handleExternalEvents(final ExtBusEvent event) {
+            log.info("GOT EXT EVENT " + event);
+
+            if (event.getEventType() == ExtBusEventType.SUBSCRIPTION_CREATION ||
+                event.getEventType() == ExtBusEventType.SUBSCRIPTION_CANCEL ||
+                event.getEventType() == ExtBusEventType.SUBSCRIPTION_PHASE ||
+                event.getEventType() == ExtBusEventType.SUBSCRIPTION_CHANGE ||
+                event.getEventType() == ExtBusEventType.SUBSCRIPTION_UNCANCEL ||
+                event.getEventType() == ExtBusEventType.SUBSCRIPTION_BCD_CHANGE) {
+                try {
+                    final SubscriptionMetadata obj = (SubscriptionMetadata) mapper.readValue(event.getMetaData(), SubscriptionMetadata.class);
+                    Assert.assertNotNull(obj.getBundleExternalKey());
+                    Assert.assertNotNull(obj.getActionType());
+                } catch (final JsonParseException e) {
+                    Assert.fail("Could not deserialize metada section", e);
+                } catch (final JsonMappingException e) {
+                    Assert.fail("Could not deserialize metada section", e);
+                } catch (final IOException e) {
+                    Assert.fail("Could not deserialize metada section", e);
+                }
+            }
+
+            externalBusCount.incrementAndGet();
+
+        }
+    }
 }
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java
new file mode 100644
index 0000000..6ae2936
--- /dev/null
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.beatrix.integration;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.awaitility.Awaitility;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+import org.killbill.billing.account.api.Account;
+import org.killbill.billing.account.api.AccountData;
+import org.killbill.billing.api.TestApiListener.NextEvent;
+import org.killbill.billing.beatrix.util.InvoiceChecker.ExpectedInvoiceItemCheck;
+import org.killbill.billing.catalog.api.BillingPeriod;
+import org.killbill.billing.catalog.api.ProductCategory;
+import org.killbill.billing.entitlement.api.DefaultEntitlement;
+import org.killbill.billing.invoice.api.DefaultInvoiceService;
+import org.killbill.billing.invoice.api.Invoice;
+import org.killbill.billing.invoice.api.InvoiceItem;
+import org.killbill.billing.invoice.api.InvoiceItemType;
+import org.killbill.billing.invoice.model.TaxInvoiceItem;
+import org.killbill.billing.invoice.notification.DefaultNextBillingDateNotifier;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApi;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApiRetryException;
+import org.killbill.billing.osgi.api.OSGIServiceDescriptor;
+import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.util.callcontext.CallContext;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import static org.testng.Assert.assertEquals;
+
+public class TestWithInvoicePlugin extends TestIntegrationBase {
+
+    @Inject
+    private OSGIServiceRegistration<InvoicePluginApi> pluginRegistry;
+
+    @Inject
+    private NotificationQueueService notificationQueueService;
+
+    private TestInvoicePluginApi testInvoicePluginApi;
+
+    @BeforeClass(groups = "slow")
+    public void beforeClass() throws Exception {
+        super.beforeClass();
+
+        this.testInvoicePluginApi = new TestInvoicePluginApi();
+        pluginRegistry.registerService(new OSGIServiceDescriptor() {
+            @Override
+            public String getPluginSymbolicName() {
+                return "TestInvoicePluginApi";
+            }
+
+            @Override
+            public String getPluginName() {
+                return "TestInvoicePluginApi";
+            }
+
+            @Override
+            public String getRegistrationName() {
+                return "TestInvoicePluginApi";
+            }
+        }, testInvoicePluginApi);
+    }
+
+    @Test(groups = "slow")
+    public void testWithRetries() throws Exception {
+        // We take april as it has 30 days (easier to play with BCD)
+        // Set clock to the initial start date - we implicitly assume here that the account timezone is UTC
+        clock.setDay(new LocalDate(2012, 4, 1));
+
+        final AccountData accountData = getAccountData(1);
+        final Account account = createAccountWithNonOsgiPaymentMethod(accountData);
+        accountChecker.checkAccount(account.getId(), accountData, callContext);
+
+        // Make invoice plugin fail
+        testInvoicePluginApi.shouldThrowException = true;
+
+        // Create original subscription (Trial PHASE)
+        final DefaultEntitlement bpSubscription = createBaseEntitlementAndCheckForCompletion(account.getId(), "bundleKey", "Pistol", ProductCategory.BASE, BillingPeriod.MONTHLY, NextEvent.CREATE, NextEvent.BLOCK);
+        subscriptionChecker.checkSubscriptionCreated(bpSubscription.getId(), internalCallContext);
+        // Invoice failed to generate
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 0);
+
+        // Verify bus event has moved to the retry service (can't easily check the timestamp unfortunately)
+        checkRetryBusEvents();
+
+        // Add 5'
+        clock.addDeltaFromReality(5 * 60 * 1000);
+        checkRetryBusEvents();
+
+        // Fix invoice plugin
+        testInvoicePluginApi.shouldThrowException = false;
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.addDeltaFromReality(10 * 60 * 1000);
+        assertListenerStatus();
+        // No notification in the main queue at this point (the PHASE event is the trigger for the next one)
+        checkNotificationsNoRetry(0);
+
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 1);
+        invoiceChecker.checkInvoice(account.getId(),
+                                    1,
+                                    callContext,
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 4, 1), null, InvoiceItemType.FIXED, BigDecimal.ZERO),
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 4, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+        busHandler.pushExpectedEvents(NextEvent.PHASE, NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.setDay(new LocalDate("2012-05-01"));
+        assertListenerStatus();
+        checkNotificationsNoRetry(1);
+
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 2);
+        invoiceChecker.checkInvoice(account.getId(),
+                                    2,
+                                    callContext,
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 5, 1), new LocalDate(2012, 6, 1), InvoiceItemType.RECURRING, new BigDecimal("29.95")),
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 5, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+        // Make invoice plugin fail again
+        testInvoicePluginApi.shouldThrowException = true;
+
+        clock.addMonths(1);
+        assertListenerStatus();
+
+        // Invoice failed to generate
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 2);
+
+        // Verify notification has moved to the retry service
+        checkRetryNotifications("2012-06-01T00:05:00");
+
+        // Add 5'
+        clock.addDeltaFromReality(5 * 60 * 1000);
+        checkRetryNotifications("2012-06-01T00:15:00");
+
+        // Fix invoice plugin
+        testInvoicePluginApi.shouldThrowException = false;
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+        clock.addDeltaFromReality(10 * 60 * 1000);
+        assertListenerStatus();
+        checkNotificationsNoRetry(1);
+
+        // Invoice was generated
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 3);
+        invoiceChecker.checkInvoice(account.getId(),
+                                    3,
+                                    callContext,
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 1), new LocalDate(2012, 7, 1), InvoiceItemType.RECURRING, new BigDecimal("29.95")),
+                                    new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+        // Make invoice plugin fail again
+        testInvoicePluginApi.shouldThrowException = true;
+
+        clock.setTime(new DateTime("2012-07-01T00:00:00"));
+        assertListenerStatus();
+
+        // Invoice failed to generate
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 3);
+
+        // Verify notification has moved to the retry service
+        checkRetryNotifications("2012-07-01T00:05:00");
+
+        testInvoicePluginApi.shouldThrowException = false;
+
+        busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.PAYMENT, NextEvent.INVOICE_PAYMENT);
+        clock.addDeltaFromReality(5 * 60 * 1000);
+        assertListenerStatus();
+        checkNotificationsNoRetry(1);
+
+        assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 4);
+    }
+
+    private void checkRetryBusEvents() throws NoSuchNotificationQueue {
+        // Verify notification(s) moved to the retry queue
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                final List<NotificationEventWithMetadata> futureInvoiceRetryableBusEvents = getFutureInvoiceRetryableBusEvents();
+                return futureInvoiceRetryableBusEvents.size() == 1;
+            }
+        });
+        assertEquals(getFutureInvoiceRetryableBusEvents().size(), 1);
+        assertEquals(getFutureInvoiceNotifications().size(), 0);
+    }
+
+    private void checkRetryNotifications(final String retryDateTime) throws NoSuchNotificationQueue {
+        // Verify notification(s) moved to the retry queue
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                final List<NotificationEventWithMetadata> futureInvoiceRetryableNotifications = getFutureInvoiceRetryableNotifications();
+                return futureInvoiceRetryableNotifications.size() == 1 && futureInvoiceRetryableNotifications.get(0).getEffectiveDate().compareTo(new DateTime(retryDateTime, DateTimeZone.UTC)) == 0;
+            }
+        });
+        assertEquals(getFutureInvoiceRetryableNotifications().size(), 1);
+        assertEquals(getFutureInvoiceNotifications().size(), 0);
+    }
+
+    private void checkNotificationsNoRetry(final int main) throws NoSuchNotificationQueue {
+        assertEquals(getFutureInvoiceRetryableNotifications().size(), 0);
+        assertEquals(getFutureInvoiceNotifications().size(), main);
+    }
+
+    private List<NotificationEventWithMetadata> getFutureInvoiceNotifications() throws NoSuchNotificationQueue {
+        final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME, DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
+        return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+    }
+
+    private List<NotificationEventWithMetadata> getFutureInvoiceRetryableNotifications() throws NoSuchNotificationQueue {
+        final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
+        return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+    }
+
+    private List<NotificationEventWithMetadata> getFutureInvoiceRetryableBusEvents() throws NoSuchNotificationQueue {
+        final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, "invoice-listener");
+        return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+    }
+
+    public class TestInvoicePluginApi implements InvoicePluginApi {
+
+        boolean shouldThrowException = false;
+
+        @Override
+        public List<InvoiceItem> getAdditionalInvoiceItems(final Invoice invoice, final boolean isDryRun, final Iterable<PluginProperty> pluginProperties, final CallContext callContext) {
+            if (shouldThrowException) {
+                throw new InvoicePluginApiRetryException();
+            }
+            return ImmutableList.<InvoiceItem>of(createTaxInvoiceItem(invoice));
+        }
+
+        private InvoiceItem createTaxInvoiceItem(final Invoice invoice) {
+            return new TaxInvoiceItem(invoice.getId(), invoice.getAccountId(), null, "Tax Item", clock.getUTCNow().toLocalDate(), BigDecimal.ONE, invoice.getCurrency());
+        }
+    }
+}
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java b/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
index 2fc4a1f..eabea2e 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
@@ -26,6 +26,7 @@ import org.killbill.billing.invoice.ParkedAccountsManager;
 import org.killbill.billing.invoice.api.DefaultInvoiceService;
 import org.killbill.billing.invoice.api.InvoiceApiHelper;
 import org.killbill.billing.invoice.api.InvoiceInternalApi;
+import org.killbill.billing.invoice.api.InvoiceListenerService;
 import org.killbill.billing.invoice.api.InvoicePaymentApi;
 import org.killbill.billing.invoice.api.InvoiceService;
 import org.killbill.billing.invoice.api.InvoiceUserApi;
@@ -99,8 +100,9 @@ public class DefaultInvoiceModule extends KillBillModule implements InvoiceModul
         bind(InvoiceConfig.class).to(MultiTenantInvoiceConfig.class).asEagerSingleton();
     }
 
-    protected void installInvoiceService() {
+    protected void installInvoiceServices() {
         bind(InvoiceService.class).to(DefaultInvoiceService.class).asEagerSingleton();
+        bind(InvoiceListenerService.class).to(InvoiceListener.class).asEagerSingleton();
     }
 
     protected void installResourceBundleFactory() {
@@ -143,7 +145,7 @@ public class DefaultInvoiceModule extends KillBillModule implements InvoiceModul
         installConfig();
 
         installInvoicePluginApi();
-        installInvoiceService();
+        installInvoiceServices();
         installNotifiers();
         installInvoiceDispatcher();
         installInvoiceListener();
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
index f81483d..bf7e879 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
@@ -31,13 +31,21 @@ import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
 import org.killbill.billing.events.InvoiceCreationInternalEvent;
 import org.killbill.billing.invoice.api.InvoiceApiException;
 import org.killbill.billing.invoice.api.InvoiceInternalApi;
+import org.killbill.billing.invoice.api.InvoiceListenerService;
 import org.killbill.billing.invoice.api.user.DefaultInvoiceAdjustmentEvent;
+import org.killbill.billing.platform.api.LifecycleHandlerType;
+import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
 import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.UserType;
-import org.killbill.billing.util.config.definition.InvoiceConfig;
 import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.killbill.queue.retry.RetryableSubscriber;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberAction;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberQueueHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,78 +53,157 @@ import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 
-public class InvoiceListener {
+@SuppressWarnings("TypeMayBeWeakened")
+public class InvoiceListener extends RetryableService implements InvoiceListenerService {
+
+    public static final String INVOICE_LISTENER_SERVICE_NAME = "invoice-listener-service";
 
     private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);
 
     private final InvoiceDispatcher dispatcher;
     private final InternalCallContextFactory internalCallContextFactory;
-    private final AccountInternalApi accountApi;
     private final InvoiceInternalApi invoiceApi;
-    private final InvoiceConfig invoiceConfig;
-    private final Clock clock;
+    private final RetryableSubscriber retryableSubscriber;
+    private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
 
     @Inject
-    public InvoiceListener(final AccountInternalApi accountApi, final Clock clock, final InternalCallContextFactory internalCallContextFactory,
-                           final InvoiceConfig invoiceConfig, final InvoiceDispatcher dispatcher, InvoiceInternalApi invoiceApi) {
-        this.accountApi = accountApi;
+    public InvoiceListener(final AccountInternalApi accountApi,
+                           final InternalCallContextFactory internalCallContextFactory,
+                           final InvoiceDispatcher dispatcher,
+                           final InvoiceInternalApi invoiceApi,
+                           final NotificationQueueService notificationQueueService,
+                           final Clock clock) {
+        super(notificationQueueService);
         this.dispatcher = dispatcher;
-        this.invoiceConfig = invoiceConfig;
         this.internalCallContextFactory = internalCallContextFactory;
-        this.clock = clock;
         this.invoiceApi = invoiceApi;
+
+        subscriberQueueHandler.subscribe(EffectiveSubscriptionInternalEvent.class,
+                                         new SubscriberAction<EffectiveSubscriptionInternalEvent>() {
+                                             @Override
+                                             public void run(final EffectiveSubscriptionInternalEvent event) {
+                                                 try {
+                                                     //  Skip future uncancel event
+                                                     //  Skip events which are marked as not being the last one
+                                                     if (event.getTransitionType() == SubscriptionBaseTransitionType.UNCANCEL ||
+                                                         event.getRemainingEventsForUserOperation() > 0) {
+                                                         return;
+                                                     }
+                                                     final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+                                                     dispatcher.processSubscriptionForInvoiceGeneration(event, context);
+                                                 } catch (final InvoiceApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 }
+                                             }
+                                         });
+        subscriberQueueHandler.subscribe(BlockingTransitionInternalEvent.class,
+                                         new SubscriberAction<BlockingTransitionInternalEvent>() {
+                                             @Override
+                                             public void run(final BlockingTransitionInternalEvent event) {
+                                                 // We are only interested in blockBilling or unblockBilling transitions.
+                                                 if (!event.isTransitionedToUnblockedBilling() && !event.isTransitionedToBlockedBilling()) {
+                                                     return;
+                                                 }
+
+                                                 try {
+                                                     final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+                                                     final UUID accountId = accountApi.getByRecordId(event.getSearchKey1(), context);
+                                                     dispatcher.processAccountFromNotificationOrBusEvent(accountId, null, null, context);
+                                                 } catch (final InvoiceApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 } catch (final AccountApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 }
+                                             }
+                                         });
+        subscriberQueueHandler.subscribe(InvoiceCreationInternalEvent.class,
+                                         new SubscriberAction<InvoiceCreationInternalEvent>() {
+                                             @Override
+                                             public void run(final InvoiceCreationInternalEvent event) {
+                                                 try {
+                                                     final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "CreateParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+                                                     final Account account = accountApi.getAccountById(event.getAccountId(), context);
+
+                                                     // catch children invoices and populate the parent summary invoice
+                                                     if (isChildrenAccountAndPaymentDelegated(account)) {
+                                                         dispatcher.processParentInvoiceForInvoiceGeneration(account, event.getInvoiceId(), context);
+                                                     }
+
+                                                 } catch (final InvoiceApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 } catch (final AccountApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 }
+                                             }
+                                         });
+        subscriberQueueHandler.subscribe(DefaultInvoiceAdjustmentEvent.class,
+                                         new SubscriberAction<DefaultInvoiceAdjustmentEvent>() {
+                                             @Override
+                                             public void run(final DefaultInvoiceAdjustmentEvent event) {
+                                                 try {
+                                                     final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "AdjustParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+                                                     final Account account = accountApi.getAccountById(event.getAccountId(), context);
+
+                                                     // catch children invoices and populate the parent summary invoice
+                                                     if (isChildrenAccountAndPaymentDelegated(account)) {
+                                                         dispatcher.processParentInvoiceForAdjustments(account, event.getInvoiceId(), context);
+                                                     }
+                                                 } catch (final InvoiceApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 } catch (final AccountApiException e) {
+                                                     log.warn("Unable to process event {}", event, e);
+                                                 }
+                                             }
+                                         });
+        this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler);
+    }
+
+    @Override
+    public String getName() {
+        return INVOICE_LISTENER_SERVICE_NAME;
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+    public void initialize() {
+        super.initialize("invoice-listener", subscriberQueueHandler);
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+    public void start() {
+        super.start();
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+    public void stop() throws NoSuchNotificationQueue {
+        super.stop();
     }
 
     @AllowConcurrentEvents
     @Subscribe
     public void handleSubscriptionTransition(final EffectiveSubscriptionInternalEvent event) {
-        try {
-            //  Skip future uncancel event
-            //  Skip events which are marked as not being the last one
-            if (event.getTransitionType() == SubscriptionBaseTransitionType.UNCANCEL ||
-                event.getRemainingEventsForUserOperation() > 0) {
-                return;
-            }
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            dispatcher.processSubscriptionForInvoiceGeneration(event, context);
-        } catch (InvoiceApiException e) {
-            log.warn("Unable to process event {}", event, e);
-        }
+        retryableSubscriber.handleEvent(event);
     }
 
     @AllowConcurrentEvents
     @Subscribe
     public void handleBlockingStateTransition(final BlockingTransitionInternalEvent event) {
-        // We are only interested in blockBilling or unblockBilling transitions.
-        if (!event.isTransitionedToUnblockedBilling() && !event.isTransitionedToBlockedBilling()) {
-            return;
-        }
-
-        try {
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            final UUID accountId = accountApi.getByRecordId(event.getSearchKey1(), context);
-            dispatcher.processAccountFromNotificationOrBusEvent(accountId, null, null, context);
-        } catch (InvoiceApiException e) {
-            log.warn("Unable to process event {}", event, e);
-        } catch (AccountApiException e) {
-            log.warn("Unable to process event {}", event, e);
-        }
+        retryableSubscriber.handleEvent(event);
     }
 
     public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+        final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
         try {
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
             dispatcher.processSubscriptionForInvoiceGeneration(subscriptionId, context.toLocalDate(eventDateTime), context);
-        } catch (InvoiceApiException e) {
+        } catch (final InvoiceApiException e) {
             log.warn("Unable to process subscriptionId='{}', eventDateTime='{}'", subscriptionId, eventDateTime, e);
         }
     }
 
     public void handleEventForInvoiceNotification(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+        final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
         try {
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
             dispatcher.processSubscriptionForInvoiceNotification(subscriptionId, context.toLocalDate(eventDateTime), context);
-        } catch (InvoiceApiException e) {
+        } catch (final InvoiceApiException e) {
             log.warn("Unable to process subscriptionId='{}', eventDateTime='{}'", subscriptionId, eventDateTime, e);
         }
     }
@@ -124,21 +211,7 @@ public class InvoiceListener {
     @AllowConcurrentEvents
     @Subscribe
     public void handleChildrenInvoiceCreationEvent(final InvoiceCreationInternalEvent event) {
-
-        try {
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "CreateParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            final Account account = accountApi.getAccountById(event.getAccountId(), context);
-
-            // catch children invoices and populate the parent summary invoice
-            if (isChildrenAccountAndPaymentDelegated(account)) {
-                dispatcher.processParentInvoiceForInvoiceGeneration(account, event.getInvoiceId(), context);
-            }
-
-        } catch (InvoiceApiException e) {
-            log.error(e.getMessage());
-        } catch (AccountApiException e) {
-            log.error(e.getMessage());
-        }
+        retryableSubscriber.handleEvent(event);
     }
 
     private boolean isChildrenAccountAndPaymentDelegated(final Account account) {
@@ -149,7 +222,7 @@ public class InvoiceListener {
         try {
             final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Commit Invoice", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
             invoiceApi.commitInvoice(invoiceId, context);
-        } catch (InvoiceApiException e) {
+        } catch (final InvoiceApiException e) {
             // In case we commit parent invoice earlier we expect to see an INVOICE_INVALID_STATUS status
             if (ErrorCode.INVOICE_INVALID_STATUS.getCode() != e.getCode()) {
                 log.error(e.getMessage());
@@ -160,21 +233,6 @@ public class InvoiceListener {
     @AllowConcurrentEvents
     @Subscribe
     public void handleChildrenInvoiceAdjustmentEvent(final DefaultInvoiceAdjustmentEvent event) {
-
-        try {
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "AdjustParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            final Account account = accountApi.getAccountById(event.getAccountId(), context);
-
-            // catch children invoices and populate the parent summary invoice
-            if (isChildrenAccountAndPaymentDelegated(account)) {
-                dispatcher.processParentInvoiceForAdjustments(account, event.getInvoiceId(), context);
-            }
-
-        } catch (InvoiceApiException e) {
-            log.error(e.getMessage());
-        } catch (AccountApiException e) {
-            log.error(e.getMessage());
-        }
+        retryableSubscriber.handleEvent(event);
     }
-
 }
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
index ab9b982..afb7750 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
@@ -1,7 +1,7 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
  * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
@@ -24,10 +24,20 @@ import org.killbill.billing.ObjectType;
 import org.killbill.billing.callcontext.InternalCallContext;
 import org.killbill.billing.events.ControlTagDeletionInternalEvent;
 import org.killbill.billing.invoice.api.InvoiceApiException;
+import org.killbill.billing.platform.api.KillbillService;
+import org.killbill.billing.platform.api.LifecycleHandlerType;
+import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
 import org.killbill.billing.util.callcontext.CallOrigin;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.callcontext.UserType;
 import org.killbill.billing.util.tag.ControlTagType;
+import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.killbill.queue.retry.RetryableSubscriber;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberAction;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberQueueHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,28 +45,64 @@ import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 
-public class InvoiceTagHandler {
+@SuppressWarnings("TypeMayBeWeakened")
+public class InvoiceTagHandler extends RetryableService implements KillbillService {
+
+    private static final String INVOICE_TAG_HANDLER_SERVICE_NAME = "invoice-tag-handler-service";
 
     private static final Logger log = LoggerFactory.getLogger(InvoiceTagHandler.class);
 
     private final InvoiceDispatcher dispatcher;
-    private final InternalCallContextFactory internalCallContextFactory;
+    private final RetryableSubscriber retryableSubscriber;
+
+    private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
 
     @Inject
-    public InvoiceTagHandler(final InvoiceDispatcher dispatcher,
+    public InvoiceTagHandler(final Clock clock,
+                             final InvoiceDispatcher dispatcher,
+                             final NotificationQueueService notificationQueueService,
                              final InternalCallContextFactory internalCallContextFactory) {
+        super(notificationQueueService);
         this.dispatcher = dispatcher;
-        this.internalCallContextFactory = internalCallContextFactory;
+
+        final SubscriberAction<ControlTagDeletionInternalEvent> action = new SubscriberAction<ControlTagDeletionInternalEvent>() {
+            @Override
+            public void run(final ControlTagDeletionInternalEvent event) {
+                if (event.getTagDefinition().getName().equals(ControlTagType.AUTO_INVOICING_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
+                    final UUID accountId = event.getObjectId();
+                    final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "InvoiceTagHandler", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+                    processUnpaid_AUTO_INVOICING_OFF_invoices(accountId, context);
+                }
+            }
+        };
+        subscriberQueueHandler.subscribe(ControlTagDeletionInternalEvent.class, action);
+        this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler);
+    }
+
+    @Override
+    public String getName() {
+        return INVOICE_TAG_HANDLER_SERVICE_NAME;
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+    public void initialize() {
+        super.initialize("invoice-tag-handler", subscriberQueueHandler);
     }
 
     @AllowConcurrentEvents
     @Subscribe
     public void process_AUTO_INVOICING_OFF_removal(final ControlTagDeletionInternalEvent event) {
-        if (event.getTagDefinition().getName().equals(ControlTagType.AUTO_INVOICING_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
-            final UUID accountId = event.getObjectId();
-            final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "InvoiceTagHandler", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
-            processUnpaid_AUTO_INVOICING_OFF_invoices(accountId, context);
-        }
+        retryableSubscriber.handleEvent(event);
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+    public void start() {
+        super.start();
+    }
+
+    @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+    public void stop() throws NoSuchNotificationQueue {
+        super.stop();
     }
 
     private void processUnpaid_AUTO_INVOICING_OFF_invoices(final UUID accountId, final InternalCallContext context) {
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 644cec3..85391a5 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -1,7 +1,9 @@
 /*
  * Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
  *
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
  * (the "License"); you may not use this file except in compliance with the
  * License.  You may obtain a copy of the License at:
  *
@@ -25,45 +27,50 @@ import org.killbill.billing.subscription.api.SubscriptionBase;
 import org.killbill.billing.subscription.api.SubscriptionBaseInternalApi;
 import org.killbill.billing.subscription.api.user.SubscriptionBaseApiException;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
-import org.killbill.billing.util.config.definition.InvoiceConfig;
+import org.killbill.clock.Clock;
 import org.killbill.notificationq.api.NotificationEvent;
 import org.killbill.notificationq.api.NotificationQueue;
 import org.killbill.notificationq.api.NotificationQueueService;
 import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
 import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
 import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
+import org.killbill.queue.retry.RetryableHandler;
+import org.killbill.queue.retry.RetryableService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 
-public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
-
-    private static final Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+public class DefaultNextBillingDateNotifier extends RetryableService implements NextBillingDateNotifier {
 
     public static final String NEXT_BILLING_DATE_NOTIFIER_QUEUE = "next-billing-date-queue";
 
+    private static final Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+
+    private final Clock clock;
     private final NotificationQueueService notificationQueueService;
     private final SubscriptionBaseInternalApi subscriptionApi;
     private final InvoiceListener listener;
-    private final InternalCallContextFactory callContextFactory;
+    private final InternalCallContextFactory internalCallContextFactory;
 
     private NotificationQueue nextBillingQueue;
 
     @Inject
-    public DefaultNextBillingDateNotifier(final NotificationQueueService notificationQueueService,
+    public DefaultNextBillingDateNotifier(final Clock clock,
+                                          final NotificationQueueService notificationQueueService,
                                           final SubscriptionBaseInternalApi subscriptionApi,
                                           final InvoiceListener listener,
-                                          final InternalCallContextFactory callContextFactory) {
+                                          final InternalCallContextFactory internalCallContextFactory) {
+        super(notificationQueueService);
+        this.clock = clock;
         this.notificationQueueService = notificationQueueService;
         this.subscriptionApi = subscriptionApi;
         this.listener = listener;
-        this.callContextFactory = callContextFactory;
+        this.internalCallContextFactory = internalCallContextFactory;
     }
 
     @Override
     public void initialize() throws NotificationQueueAlreadyExists {
-
         final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
             @Override
             public void handleReadyNotification(final NotificationEvent notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
@@ -77,7 +84,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
                 // Just to ensure compatibility with json that might not have that targetDate field (old versions < 0.13.6)
                 final DateTime targetDate = key.getTargetDate() != null ? key.getTargetDate() : eventDate;
                 try {
-                    final SubscriptionBase subscription = subscriptionApi.getSubscriptionFromId(key.getUuidKey(), callContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId));
+                    final SubscriptionBase subscription = subscriptionApi.getSubscriptionFromId(key.getUuidKey(), internalCallContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId));
                     if (subscription == null) {
                         log.warn("Unable to retrieve subscriptionId='{}' for event {}", key.getUuidKey(), key);
                         return;
@@ -88,19 +95,24 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
                     } else {
                         processEventForInvoiceGeneration(key.getUuidKey(), targetDate, userToken, accountRecordId, tenantRecordId);
                     }
-                } catch (SubscriptionBaseApiException e) {
+                } catch (final SubscriptionBaseApiException e) {
                     log.warn("Error retrieving subscriptionId='{}'", key.getUuidKey(), e);
                 }
             }
         };
 
+        final NotificationQueueHandler retryableHandler = new RetryableHandler(clock, this, notificationQueueHandler);
         nextBillingQueue = notificationQueueService.createNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
                                                                             NEXT_BILLING_DATE_NOTIFIER_QUEUE,
-                                                                            notificationQueueHandler);
+                                                                            retryableHandler);
+
+        super.initialize(nextBillingQueue, notificationQueueHandler);
     }
 
     @Override
     public void start() {
+        super.start();
+
         nextBillingQueue.startQueue();
     }
 
@@ -110,6 +122,8 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
             nextBillingQueue.stopQueue();
             notificationQueueService.deleteNotificationQueue(nextBillingQueue.getServiceName(), nextBillingQueue.getQueueName());
         }
+
+        super.stop();
     }
 
     private void processEventForInvoiceGeneration(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
index 128616f..ef72b84 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
@@ -26,6 +26,7 @@ import org.killbill.billing.account.api.AccountInternalApi;
 import org.killbill.billing.invoice.api.InvoiceInternalApi;
 import org.killbill.clock.Clock;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.notificationq.api.NotificationQueueService;
 
 public class TestInvoiceNotificationQListener extends InvoiceListener {
 
@@ -33,8 +34,13 @@ public class TestInvoiceNotificationQListener extends InvoiceListener {
     UUID latestSubscriptionId = null;
 
     @Inject
-    public TestInvoiceNotificationQListener(final AccountInternalApi accountApi, final Clock clock, final InternalCallContextFactory internalCallContextFactory, final InvoiceDispatcher dispatcher, final InvoiceInternalApi invoiceApi) {
-        super(accountApi, clock, internalCallContextFactory, null, dispatcher, invoiceApi);
+    public TestInvoiceNotificationQListener(final AccountInternalApi accountApi,
+                                            final Clock clock,
+                                            final InternalCallContextFactory internalCallContextFactory,
+                                            final InvoiceDispatcher dispatcher,
+                                            final InvoiceInternalApi invoiceApi,
+                                            final NotificationQueueService notificationQueueService) {
+        super(accountApi, internalCallContextFactory, dispatcher, invoiceApi, notificationQueueService, clock);
     }
 
     @Override

util/pom.xml 4(+4 -0)

diff --git a/util/pom.xml b/util/pom.xml
index b6369da..8222bf9 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -210,6 +210,10 @@
         </dependency>
         <dependency>
             <groupId>org.kill-bill.billing.plugin</groupId>
+            <artifactId>killbill-plugin-api-invoice</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.kill-bill.billing.plugin</groupId>
             <artifactId>killbill-plugin-api-notification</artifactId>
         </dependency>
         <dependency>
diff --git a/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java b/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java
new file mode 100644
index 0000000..c33b881
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at:
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.listener;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.killbill.billing.ObjectType;
+import org.killbill.billing.account.api.ImmutableAccountData;
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.BusInternalEvent;
+import org.killbill.billing.events.ControlTagCreationInternalEvent;
+import org.killbill.billing.events.ControlTagDeletionInternalEvent;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApiRetryException;
+import org.killbill.billing.util.UtilTestSuiteWithEmbeddedDB;
+import org.killbill.billing.util.tag.DefaultTagDefinition;
+import org.killbill.billing.util.tag.api.user.DefaultControlTagCreationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.killbill.queue.retry.RetryableSubscriber;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberAction;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberQueueHandler;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class TestRetryableService extends UtilTestSuiteWithEmbeddedDB {
+
+    private static final String TEST_LISTENER = "TestListener";
+    private static final ImmutableList<Period> RETRY_SCHEDULE = ImmutableList.<Period>of(Period.hours(1), Period.days(1));
+
+    private ControlTagCreationInternalEvent event;
+    private TestListener testListener;
+
+    @BeforeClass(groups = "slow")
+    public void setUpClass() throws Exception {
+        final ImmutableAccountData immutableAccountData = Mockito.mock(ImmutableAccountData.class);
+        Mockito.when(immutableAccountInternalApi.getImmutableAccountDataByRecordId(Mockito.<Long>eq(internalCallContext.getAccountRecordId()), Mockito.<InternalTenantContext>any())).thenReturn(immutableAccountData);
+    }
+
+    @BeforeMethod(groups = "slow")
+    public void setUp() throws Exception {
+        event = new DefaultControlTagCreationEvent(UUID.randomUUID(),
+                                                   UUID.randomUUID(),
+                                                   ObjectType.ACCOUNT,
+                                                   new DefaultTagDefinition("name", "description", false),
+                                                   internalCallContext.getAccountRecordId(),
+                                                   internalCallContext.getTenantRecordId(),
+                                                   UUID.randomUUID());
+
+        testListener = new TestListener();
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+    }
+
+    @AfterMethod(groups = "slow")
+    public void tearDown() throws Exception {
+        testListener.stop();
+    }
+
+    @Test(groups = "slow")
+    public void testFixUp() throws Exception {
+        testListener.throwRetryableException = true;
+
+        final DateTime startTime = clock.getUTCNow();
+        testListener.handleEvent(event);
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        List<NotificationEventWithMetadata> futureRetryableEvents = getFutureRetryableEvents();
+        Assert.assertEquals(futureRetryableEvents.size(), 1);
+        Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(0))), 0);
+
+        clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        futureRetryableEvents = getFutureRetryableEvents();
+        Assert.assertEquals(futureRetryableEvents.size(), 1);
+        Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(1))), 0);
+
+        testListener.throwRetryableException = false;
+
+        clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 1);
+        Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+    }
+
+    @Test(groups = "slow")
+    public void testGiveUp() throws Exception {
+        testListener.throwRetryableException = true;
+
+        final DateTime startTime = clock.getUTCNow();
+        testListener.handleEvent(event);
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        List<NotificationEventWithMetadata> futureRetryableEvents = getFutureRetryableEvents();
+        Assert.assertEquals(futureRetryableEvents.size(), 1);
+        Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(0))), 0);
+
+        clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        futureRetryableEvents = getFutureRetryableEvents();
+        Assert.assertEquals(futureRetryableEvents.size(), 1);
+        Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(1))), 0);
+
+        clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        // Give up
+        Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+    }
+
+    @Test(groups = "slow")
+    public void testNotRetryableException() throws Exception {
+        testListener.throwOtherException = true;
+
+        try {
+            testListener.handleEvent(event);
+            Assert.fail("Expected exception");
+        } catch (final IllegalArgumentException e) {
+            Assert.assertTrue(true);
+        }
+        assertListenerStatus();
+
+        Assert.assertEquals(testListener.eventsSeen.size(), 0);
+        Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+    }
+
+    private List<NotificationEventWithMetadata> getFutureRetryableEvents() throws NoSuchNotificationQueue {
+        final NotificationQueue notificationQueue = queueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, TEST_LISTENER);
+        return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+    }
+
+    private final class TestListener extends RetryableService {
+
+        private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
+        private final Collection<BusInternalEvent> eventsSeen = new LinkedList<BusInternalEvent>();
+
+        private final RetryableSubscriber retryableSubscriber;
+
+        private boolean throwRetryableException = false;
+        private boolean throwOtherException = false;
+
+        public TestListener() {
+            super(queueService);
+
+            subscriberQueueHandler.subscribe(ControlTagDeletionInternalEvent.class,
+                                             new SubscriberAction<ControlTagDeletionInternalEvent>() {
+                                                 @Override
+                                                 public void run(final ControlTagDeletionInternalEvent event) {
+                                                     Assert.fail("No handler registered");
+                                                 }
+                                             });
+            subscriberQueueHandler.subscribe(ControlTagCreationInternalEvent.class,
+                                             new SubscriberAction<ControlTagCreationInternalEvent>() {
+                                                 @Override
+                                                 public void run(final ControlTagCreationInternalEvent event) {
+                                                     if (throwRetryableException) {
+                                                         throw new InvoicePluginApiRetryException(RETRY_SCHEDULE);
+                                                     } else if (throwOtherException) {
+                                                         throw new IllegalArgumentException("EXPECTED");
+                                                     } else {
+                                                         eventsSeen.add(event);
+                                                     }
+                                                 }
+                                             });
+            this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler);
+
+            initialize(TEST_LISTENER, subscriberQueueHandler);
+            start();
+        }
+
+        public void handleEvent(final ControlTagCreationInternalEvent event) {
+            retryableSubscriber.handleEvent(event);
+        }
+    }
+}