killbill-aplcache
Changes
beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java 265(+265 -0)
Details
diff --git a/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java b/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java
new file mode 100644
index 0000000..1f54f9c
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.invoice.api;
+
+import org.killbill.billing.platform.api.KillbillService;
+
+public interface InvoiceListenerService extends KillbillService {
+}
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java
new file mode 100644
index 0000000..080f161
--- /dev/null
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.beatrix.integration;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.awaitility.Awaitility;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+import org.killbill.billing.account.api.Account;
+import org.killbill.billing.account.api.AccountData;
+import org.killbill.billing.api.TestApiListener.NextEvent;
+import org.killbill.billing.beatrix.util.InvoiceChecker.ExpectedInvoiceItemCheck;
+import org.killbill.billing.catalog.api.BillingPeriod;
+import org.killbill.billing.catalog.api.ProductCategory;
+import org.killbill.billing.entitlement.api.DefaultEntitlement;
+import org.killbill.billing.invoice.api.DefaultInvoiceService;
+import org.killbill.billing.invoice.api.Invoice;
+import org.killbill.billing.invoice.api.InvoiceItem;
+import org.killbill.billing.invoice.api.InvoiceItemType;
+import org.killbill.billing.invoice.model.TaxInvoiceItem;
+import org.killbill.billing.invoice.notification.DefaultNextBillingDateNotifier;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApi;
+import org.killbill.billing.osgi.api.OSGIServiceDescriptor;
+import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.util.callcontext.CallContext;
+import org.killbill.billing.util.listener.RetryException;
+import org.killbill.billing.util.listener.RetryableService;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import static org.testng.Assert.assertEquals;
+
+public class TestWithInvoicePlugin extends TestIntegrationBase {
+
+ @Inject
+ private OSGIServiceRegistration<InvoicePluginApi> pluginRegistry;
+
+ @Inject
+ private NotificationQueueService notificationQueueService;
+
+ private TestInvoicePluginApi testInvoicePluginApi;
+
+ @BeforeClass(groups = "slow")
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+
+ this.testInvoicePluginApi = new TestInvoicePluginApi();
+ pluginRegistry.registerService(new OSGIServiceDescriptor() {
+ @Override
+ public String getPluginSymbolicName() {
+ return "TestInvoicePluginApi";
+ }
+
+ @Override
+ public String getPluginName() {
+ return "TestInvoicePluginApi";
+ }
+
+ @Override
+ public String getRegistrationName() {
+ return "TestInvoicePluginApi";
+ }
+ }, testInvoicePluginApi);
+ }
+
+ @Test(groups = "slow")
+ public void testWithRetries() throws Exception {
+ // We take april as it has 30 days (easier to play with BCD)
+ // Set clock to the initial start date - we implicitly assume here that the account timezone is UTC
+ clock.setDay(new LocalDate(2012, 4, 1));
+
+ final AccountData accountData = getAccountData(1);
+ final Account account = createAccountWithNonOsgiPaymentMethod(accountData);
+ accountChecker.checkAccount(account.getId(), accountData, callContext);
+
+ // Make invoice plugin fail
+ testInvoicePluginApi.shouldThrowException = true;
+
+ // Create original subscription (Trial PHASE)
+ final DefaultEntitlement bpSubscription = createBaseEntitlementAndCheckForCompletion(account.getId(), "bundleKey", "Pistol", ProductCategory.BASE, BillingPeriod.MONTHLY, NextEvent.CREATE, NextEvent.BLOCK);
+ subscriptionChecker.checkSubscriptionCreated(bpSubscription.getId(), internalCallContext);
+ // Invoice failed to generate
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 0);
+
+ // Verify bus event has moved to the retry service (can't easily check the timestamp unfortunately)
+ checkRetryBusEvents();
+
+ // Add 5'
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ checkRetryBusEvents();
+
+ // Fix invoice plugin
+ testInvoicePluginApi.shouldThrowException = false;
+
+ busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+ clock.addDeltaFromReality(10 * 60 * 1000);
+ assertListenerStatus();
+ // No notification in the main queue at this point (the PHASE event is the trigger for the next one)
+ checkNotificationsNoRetry(0);
+
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 1);
+ invoiceChecker.checkInvoice(account.getId(),
+ 1,
+ callContext,
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 4, 1), null, InvoiceItemType.FIXED, BigDecimal.ZERO),
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 4, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+ busHandler.pushExpectedEvents(NextEvent.PHASE, NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+ clock.setDay(new LocalDate("2012-05-01"));
+ assertListenerStatus();
+ checkNotificationsNoRetry(1);
+
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 2);
+ invoiceChecker.checkInvoice(account.getId(),
+ 2,
+ callContext,
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 5, 1), new LocalDate(2012, 6, 1), InvoiceItemType.RECURRING, new BigDecimal("29.95")),
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 5, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+ // Make invoice plugin fail again
+ testInvoicePluginApi.shouldThrowException = true;
+
+ clock.addMonths(1);
+ assertListenerStatus();
+
+ // Invoice failed to generate
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 2);
+
+ // Verify notification has moved to the retry service
+ checkRetryNotifications("2012-06-01T00:05:00");
+
+ // Add 5'
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ checkRetryNotifications("2012-06-01T00:15:00");
+
+ // Fix invoice plugin
+ testInvoicePluginApi.shouldThrowException = false;
+
+ busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+ clock.addDeltaFromReality(10 * 60 * 1000);
+ assertListenerStatus();
+ checkNotificationsNoRetry(1);
+
+ // Invoice was generated
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 3);
+ invoiceChecker.checkInvoice(account.getId(),
+ 3,
+ callContext,
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 1), new LocalDate(2012, 7, 1), InvoiceItemType.RECURRING, new BigDecimal("29.95")),
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+ // Make invoice plugin fail again
+ testInvoicePluginApi.shouldThrowException = true;
+
+ clock.setTime(new DateTime("2012-07-01T00:00:00"));
+ assertListenerStatus();
+
+ // Invoice failed to generate
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 3);
+
+ // Verify notification has moved to the retry service
+ checkRetryNotifications("2012-07-01T00:05:00");
+
+ testInvoicePluginApi.shouldThrowException = false;
+
+ busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.PAYMENT, NextEvent.INVOICE_PAYMENT);
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ assertListenerStatus();
+ checkNotificationsNoRetry(1);
+
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 4);
+ }
+
+ private void checkRetryBusEvents() throws NoSuchNotificationQueue {
+ // Verify notification(s) moved to the retry queue
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ final List<NotificationEventWithMetadata> futureInvoiceRetryableBusEvents = getFutureInvoiceRetryableBusEvents();
+ return futureInvoiceRetryableBusEvents.size() == 1;
+ }
+ });
+ assertEquals(getFutureInvoiceRetryableBusEvents().size(), 1);
+ assertEquals(getFutureInvoiceNotifications().size(), 0);
+ }
+
+ private void checkRetryNotifications(final String retryDateTime) throws NoSuchNotificationQueue {
+ // Verify notification(s) moved to the retry queue
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ final List<NotificationEventWithMetadata> futureInvoiceRetryableNotifications = getFutureInvoiceRetryableNotifications();
+ return futureInvoiceRetryableNotifications.size() == 1 && futureInvoiceRetryableNotifications.get(0).getEffectiveDate().compareTo(new DateTime(retryDateTime, DateTimeZone.UTC)) == 0;
+ }
+ });
+ assertEquals(getFutureInvoiceRetryableNotifications().size(), 1);
+ assertEquals(getFutureInvoiceNotifications().size(), 0);
+ }
+
+ private void checkNotificationsNoRetry(final int main) throws NoSuchNotificationQueue {
+ assertEquals(getFutureInvoiceRetryableNotifications().size(), 0);
+ assertEquals(getFutureInvoiceNotifications().size(), main);
+ }
+
+ private List<NotificationEventWithMetadata> getFutureInvoiceNotifications() throws NoSuchNotificationQueue {
+ final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME, DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
+ return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+ }
+
+ private List<NotificationEventWithMetadata> getFutureInvoiceRetryableNotifications() throws NoSuchNotificationQueue {
+ final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
+ return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+ }
+
+ private List<NotificationEventWithMetadata> getFutureInvoiceRetryableBusEvents() throws NoSuchNotificationQueue {
+ final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, "invoice-listener");
+ return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+ }
+
+ public class TestInvoicePluginApi implements InvoicePluginApi {
+
+ boolean shouldThrowException = false;
+
+ @Override
+ public List<InvoiceItem> getAdditionalInvoiceItems(final Invoice invoice, final boolean isDryRun, final Iterable<PluginProperty> pluginProperties, final CallContext callContext) {
+ if (shouldThrowException) {
+ throw new RetryException();
+ }
+ return ImmutableList.<InvoiceItem>of(createTaxInvoiceItem(invoice));
+ }
+
+ private InvoiceItem createTaxInvoiceItem(final Invoice invoice) {
+ return new TaxInvoiceItem(invoice.getId(), invoice.getAccountId(), null, "Tax Item", clock.getUTCNow().toLocalDate(), BigDecimal.ONE, invoice.getCurrency());
+ }
+ }
+}
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java b/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
index d400a3f..1901d42 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
@@ -26,6 +26,7 @@ import org.killbill.billing.invoice.ParkedAccountsManager;
import org.killbill.billing.invoice.api.DefaultInvoiceService;
import org.killbill.billing.invoice.api.InvoiceApiHelper;
import org.killbill.billing.invoice.api.InvoiceInternalApi;
+import org.killbill.billing.invoice.api.InvoiceListenerService;
import org.killbill.billing.invoice.api.InvoiceNotifier;
import org.killbill.billing.invoice.api.InvoicePaymentApi;
import org.killbill.billing.invoice.api.InvoiceService;
@@ -102,8 +103,9 @@ public class DefaultInvoiceModule extends KillBillModule implements InvoiceModul
bind(InvoiceConfig.class).to(MultiTenantInvoiceConfig.class).asEagerSingleton();
}
- protected void installInvoiceService() {
+ protected void installInvoiceServices() {
bind(InvoiceService.class).to(DefaultInvoiceService.class).asEagerSingleton();
+ bind(InvoiceListenerService.class).to(InvoiceListener.class).asEagerSingleton();
}
protected void installResourceBundleFactory() {
@@ -154,7 +156,7 @@ public class DefaultInvoiceModule extends KillBillModule implements InvoiceModul
installConfig();
installInvoicePluginApi();
- installInvoiceService();
+ installInvoiceServices();
installInvoiceNotifier();
installNotifiers();
installInvoiceDispatcher();
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
index f81483d..e62a105 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
@@ -31,13 +31,21 @@ import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
import org.killbill.billing.events.InvoiceCreationInternalEvent;
import org.killbill.billing.invoice.api.InvoiceApiException;
import org.killbill.billing.invoice.api.InvoiceInternalApi;
+import org.killbill.billing.invoice.api.InvoiceListenerService;
import org.killbill.billing.invoice.api.user.DefaultInvoiceAdjustmentEvent;
+import org.killbill.billing.platform.api.LifecycleHandlerType;
+import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.UserType;
-import org.killbill.billing.util.config.definition.InvoiceConfig;
+import org.killbill.billing.util.listener.RetryableService;
+import org.killbill.billing.util.listener.RetryableSubscriber;
+import org.killbill.billing.util.listener.RetryableSubscriber.SubscriberAction;
+import org.killbill.billing.util.listener.RetryableSubscriber.SubscriberQueueHandler;
import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,78 +53,157 @@ import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
-public class InvoiceListener {
+@SuppressWarnings("TypeMayBeWeakened")
+public class InvoiceListener extends RetryableService implements InvoiceListenerService {
+
+ public static final String INVOICE_LISTENER_SERVICE_NAME = "invoice-listener-service";
private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);
private final InvoiceDispatcher dispatcher;
private final InternalCallContextFactory internalCallContextFactory;
- private final AccountInternalApi accountApi;
private final InvoiceInternalApi invoiceApi;
- private final InvoiceConfig invoiceConfig;
- private final Clock clock;
+ private final RetryableSubscriber retryableSubscriber;
+ private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
@Inject
- public InvoiceListener(final AccountInternalApi accountApi, final Clock clock, final InternalCallContextFactory internalCallContextFactory,
- final InvoiceConfig invoiceConfig, final InvoiceDispatcher dispatcher, InvoiceInternalApi invoiceApi) {
- this.accountApi = accountApi;
+ public InvoiceListener(final AccountInternalApi accountApi,
+ final InternalCallContextFactory internalCallContextFactory,
+ final InvoiceDispatcher dispatcher,
+ final InvoiceInternalApi invoiceApi,
+ final NotificationQueueService notificationQueueService,
+ final Clock clock) {
+ super(notificationQueueService, internalCallContextFactory);
this.dispatcher = dispatcher;
- this.invoiceConfig = invoiceConfig;
this.internalCallContextFactory = internalCallContextFactory;
- this.clock = clock;
this.invoiceApi = invoiceApi;
+
+ subscriberQueueHandler.subscribe(EffectiveSubscriptionInternalEvent.class,
+ new SubscriberAction<EffectiveSubscriptionInternalEvent>() {
+ @Override
+ public void run(final EffectiveSubscriptionInternalEvent event) {
+ try {
+ // Skip future uncancel event
+ // Skip events which are marked as not being the last one
+ if (event.getTransitionType() == SubscriptionBaseTransitionType.UNCANCEL ||
+ event.getRemainingEventsForUserOperation() > 0) {
+ return;
+ }
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ dispatcher.processSubscriptionForInvoiceGeneration(event, context);
+ } catch (final InvoiceApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ }
+ }
+ });
+ subscriberQueueHandler.subscribe(BlockingTransitionInternalEvent.class,
+ new SubscriberAction<BlockingTransitionInternalEvent>() {
+ @Override
+ public void run(final BlockingTransitionInternalEvent event) {
+ // We are only interested in blockBilling or unblockBilling transitions.
+ if (!event.isTransitionedToUnblockedBilling() && !event.isTransitionedToBlockedBilling()) {
+ return;
+ }
+
+ try {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ final UUID accountId = accountApi.getByRecordId(event.getSearchKey1(), context);
+ dispatcher.processAccountFromNotificationOrBusEvent(accountId, null, null, context);
+ } catch (final InvoiceApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ } catch (final AccountApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ }
+ }
+ });
+ subscriberQueueHandler.subscribe(InvoiceCreationInternalEvent.class,
+ new SubscriberAction<InvoiceCreationInternalEvent>() {
+ @Override
+ public void run(final InvoiceCreationInternalEvent event) {
+ try {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "CreateParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ final Account account = accountApi.getAccountById(event.getAccountId(), context);
+
+ // catch children invoices and populate the parent summary invoice
+ if (isChildrenAccountAndPaymentDelegated(account)) {
+ dispatcher.processParentInvoiceForInvoiceGeneration(account, event.getInvoiceId(), context);
+ }
+
+ } catch (final InvoiceApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ } catch (final AccountApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ }
+ }
+ });
+ subscriberQueueHandler.subscribe(DefaultInvoiceAdjustmentEvent.class,
+ new SubscriberAction<DefaultInvoiceAdjustmentEvent>() {
+ @Override
+ public void run(final DefaultInvoiceAdjustmentEvent event) {
+ try {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "AdjustParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ final Account account = accountApi.getAccountById(event.getAccountId(), context);
+
+ // catch children invoices and populate the parent summary invoice
+ if (isChildrenAccountAndPaymentDelegated(account)) {
+ dispatcher.processParentInvoiceForAdjustments(account, event.getInvoiceId(), context);
+ }
+ } catch (final InvoiceApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ } catch (final AccountApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ }
+ }
+ });
+ this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler, internalCallContextFactory);
+ }
+
+ @Override
+ public String getName() {
+ return INVOICE_LISTENER_SERVICE_NAME;
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+ public void initialize() {
+ super.initialize("invoice-listener", subscriberQueueHandler);
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+ public void start() {
+ super.start();
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+ public void stop() throws NoSuchNotificationQueue {
+ super.stop();
}
@AllowConcurrentEvents
@Subscribe
public void handleSubscriptionTransition(final EffectiveSubscriptionInternalEvent event) {
- try {
- // Skip future uncancel event
- // Skip events which are marked as not being the last one
- if (event.getTransitionType() == SubscriptionBaseTransitionType.UNCANCEL ||
- event.getRemainingEventsForUserOperation() > 0) {
- return;
- }
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- dispatcher.processSubscriptionForInvoiceGeneration(event, context);
- } catch (InvoiceApiException e) {
- log.warn("Unable to process event {}", event, e);
- }
+ retryableSubscriber.handleEvent(event);
}
@AllowConcurrentEvents
@Subscribe
public void handleBlockingStateTransition(final BlockingTransitionInternalEvent event) {
- // We are only interested in blockBilling or unblockBilling transitions.
- if (!event.isTransitionedToUnblockedBilling() && !event.isTransitionedToBlockedBilling()) {
- return;
- }
-
- try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- final UUID accountId = accountApi.getByRecordId(event.getSearchKey1(), context);
- dispatcher.processAccountFromNotificationOrBusEvent(accountId, null, null, context);
- } catch (InvoiceApiException e) {
- log.warn("Unable to process event {}", event, e);
- } catch (AccountApiException e) {
- log.warn("Unable to process event {}", event, e);
- }
+ retryableSubscriber.handleEvent(event);
}
public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
dispatcher.processSubscriptionForInvoiceGeneration(subscriptionId, context.toLocalDate(eventDateTime), context);
- } catch (InvoiceApiException e) {
+ } catch (final InvoiceApiException e) {
log.warn("Unable to process subscriptionId='{}', eventDateTime='{}'", subscriptionId, eventDateTime, e);
}
}
public void handleEventForInvoiceNotification(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
dispatcher.processSubscriptionForInvoiceNotification(subscriptionId, context.toLocalDate(eventDateTime), context);
- } catch (InvoiceApiException e) {
+ } catch (final InvoiceApiException e) {
log.warn("Unable to process subscriptionId='{}', eventDateTime='{}'", subscriptionId, eventDateTime, e);
}
}
@@ -124,21 +211,7 @@ public class InvoiceListener {
@AllowConcurrentEvents
@Subscribe
public void handleChildrenInvoiceCreationEvent(final InvoiceCreationInternalEvent event) {
-
- try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "CreateParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- final Account account = accountApi.getAccountById(event.getAccountId(), context);
-
- // catch children invoices and populate the parent summary invoice
- if (isChildrenAccountAndPaymentDelegated(account)) {
- dispatcher.processParentInvoiceForInvoiceGeneration(account, event.getInvoiceId(), context);
- }
-
- } catch (InvoiceApiException e) {
- log.error(e.getMessage());
- } catch (AccountApiException e) {
- log.error(e.getMessage());
- }
+ retryableSubscriber.handleEvent(event);
}
private boolean isChildrenAccountAndPaymentDelegated(final Account account) {
@@ -149,7 +222,7 @@ public class InvoiceListener {
try {
final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Commit Invoice", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
invoiceApi.commitInvoice(invoiceId, context);
- } catch (InvoiceApiException e) {
+ } catch (final InvoiceApiException e) {
// In case we commit parent invoice earlier we expect to see an INVOICE_INVALID_STATUS status
if (ErrorCode.INVOICE_INVALID_STATUS.getCode() != e.getCode()) {
log.error(e.getMessage());
@@ -160,21 +233,6 @@ public class InvoiceListener {
@AllowConcurrentEvents
@Subscribe
public void handleChildrenInvoiceAdjustmentEvent(final DefaultInvoiceAdjustmentEvent event) {
-
- try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "AdjustParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- final Account account = accountApi.getAccountById(event.getAccountId(), context);
-
- // catch children invoices and populate the parent summary invoice
- if (isChildrenAccountAndPaymentDelegated(account)) {
- dispatcher.processParentInvoiceForAdjustments(account, event.getInvoiceId(), context);
- }
-
- } catch (InvoiceApiException e) {
- log.error(e.getMessage());
- } catch (AccountApiException e) {
- log.error(e.getMessage());
- }
+ retryableSubscriber.handleEvent(event);
}
-
}
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
index ab9b982..69dc218 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -24,10 +24,20 @@ import org.killbill.billing.ObjectType;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.events.ControlTagDeletionInternalEvent;
import org.killbill.billing.invoice.api.InvoiceApiException;
+import org.killbill.billing.platform.api.KillbillService;
+import org.killbill.billing.platform.api.LifecycleHandlerType;
+import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.billing.util.listener.RetryableService;
+import org.killbill.billing.util.listener.RetryableSubscriber;
+import org.killbill.billing.util.listener.RetryableSubscriber.SubscriberAction;
+import org.killbill.billing.util.listener.RetryableSubscriber.SubscriberQueueHandler;
import org.killbill.billing.util.tag.ControlTagType;
+import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,28 +45,64 @@ import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
-public class InvoiceTagHandler {
+@SuppressWarnings("TypeMayBeWeakened")
+public class InvoiceTagHandler extends RetryableService implements KillbillService {
+
+ private static final String INVOICE_TAG_HANDLER_SERVICE_NAME = "invoice-tag-handler-service";
private static final Logger log = LoggerFactory.getLogger(InvoiceTagHandler.class);
private final InvoiceDispatcher dispatcher;
- private final InternalCallContextFactory internalCallContextFactory;
+ private final RetryableSubscriber retryableSubscriber;
+
+ private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
@Inject
- public InvoiceTagHandler(final InvoiceDispatcher dispatcher,
+ public InvoiceTagHandler(final Clock clock,
+ final InvoiceDispatcher dispatcher,
+ final NotificationQueueService notificationQueueService,
final InternalCallContextFactory internalCallContextFactory) {
+ super(notificationQueueService, internalCallContextFactory);
this.dispatcher = dispatcher;
- this.internalCallContextFactory = internalCallContextFactory;
+
+ final SubscriberAction<ControlTagDeletionInternalEvent> action = new SubscriberAction<ControlTagDeletionInternalEvent>() {
+ @Override
+ public void run(final ControlTagDeletionInternalEvent event) {
+ if (event.getTagDefinition().getName().equals(ControlTagType.AUTO_INVOICING_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
+ final UUID accountId = event.getObjectId();
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "InvoiceTagHandler", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ processUnpaid_AUTO_INVOICING_OFF_invoices(accountId, context);
+ }
+ }
+ };
+ subscriberQueueHandler.subscribe(ControlTagDeletionInternalEvent.class, action);
+ this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler, internalCallContextFactory);
+ }
+
+ @Override
+ public String getName() {
+ return INVOICE_TAG_HANDLER_SERVICE_NAME;
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+ public void initialize() {
+ super.initialize("invoice-tag-handler", subscriberQueueHandler);
}
@AllowConcurrentEvents
@Subscribe
public void process_AUTO_INVOICING_OFF_removal(final ControlTagDeletionInternalEvent event) {
- if (event.getTagDefinition().getName().equals(ControlTagType.AUTO_INVOICING_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
- final UUID accountId = event.getObjectId();
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "InvoiceTagHandler", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- processUnpaid_AUTO_INVOICING_OFF_invoices(accountId, context);
- }
+ retryableSubscriber.handleEvent(event);
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+ public void start() {
+ super.start();
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+ public void stop() throws NoSuchNotificationQueue {
+ super.stop();
}
private void processUnpaid_AUTO_INVOICING_OFF_invoices(final UUID accountId, final InternalCallContext context) {
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 644cec3..0ac03aa 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -1,7 +1,9 @@
/*
* Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
@@ -25,7 +27,9 @@ import org.killbill.billing.subscription.api.SubscriptionBase;
import org.killbill.billing.subscription.api.SubscriptionBaseInternalApi;
import org.killbill.billing.subscription.api.user.SubscriptionBaseApiException;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
-import org.killbill.billing.util.config.definition.InvoiceConfig;
+import org.killbill.billing.util.listener.RetryableHandler;
+import org.killbill.billing.util.listener.RetryableService;
+import org.killbill.clock.Clock;
import org.killbill.notificationq.api.NotificationEvent;
import org.killbill.notificationq.api.NotificationQueue;
import org.killbill.notificationq.api.NotificationQueueService;
@@ -37,33 +41,36 @@ import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
-public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
-
- private static final Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+public class DefaultNextBillingDateNotifier extends RetryableService implements NextBillingDateNotifier {
public static final String NEXT_BILLING_DATE_NOTIFIER_QUEUE = "next-billing-date-queue";
+ private static final Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+
+ private final Clock clock;
private final NotificationQueueService notificationQueueService;
private final SubscriptionBaseInternalApi subscriptionApi;
private final InvoiceListener listener;
- private final InternalCallContextFactory callContextFactory;
+ private final InternalCallContextFactory internalCallContextFactory;
private NotificationQueue nextBillingQueue;
@Inject
- public DefaultNextBillingDateNotifier(final NotificationQueueService notificationQueueService,
+ public DefaultNextBillingDateNotifier(final Clock clock,
+ final NotificationQueueService notificationQueueService,
final SubscriptionBaseInternalApi subscriptionApi,
final InvoiceListener listener,
- final InternalCallContextFactory callContextFactory) {
+ final InternalCallContextFactory internalCallContextFactory) {
+ super(notificationQueueService, internalCallContextFactory);
+ this.clock = clock;
this.notificationQueueService = notificationQueueService;
this.subscriptionApi = subscriptionApi;
this.listener = listener;
- this.callContextFactory = callContextFactory;
+ this.internalCallContextFactory = internalCallContextFactory;
}
@Override
public void initialize() throws NotificationQueueAlreadyExists {
-
final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
@Override
public void handleReadyNotification(final NotificationEvent notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
@@ -77,7 +84,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
// Just to ensure compatibility with json that might not have that targetDate field (old versions < 0.13.6)
final DateTime targetDate = key.getTargetDate() != null ? key.getTargetDate() : eventDate;
try {
- final SubscriptionBase subscription = subscriptionApi.getSubscriptionFromId(key.getUuidKey(), callContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId));
+ final SubscriptionBase subscription = subscriptionApi.getSubscriptionFromId(key.getUuidKey(), internalCallContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId));
if (subscription == null) {
log.warn("Unable to retrieve subscriptionId='{}' for event {}", key.getUuidKey(), key);
return;
@@ -88,19 +95,24 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
} else {
processEventForInvoiceGeneration(key.getUuidKey(), targetDate, userToken, accountRecordId, tenantRecordId);
}
- } catch (SubscriptionBaseApiException e) {
+ } catch (final SubscriptionBaseApiException e) {
log.warn("Error retrieving subscriptionId='{}'", key.getUuidKey(), e);
}
}
};
+ final NotificationQueueHandler retryableHandler = new RetryableHandler(clock, this, notificationQueueHandler, internalCallContextFactory);
nextBillingQueue = notificationQueueService.createNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
NEXT_BILLING_DATE_NOTIFIER_QUEUE,
- notificationQueueHandler);
+ retryableHandler);
+
+ super.initialize(nextBillingQueue, notificationQueueHandler);
}
@Override
public void start() {
+ super.start();
+
nextBillingQueue.startQueue();
}
@@ -110,6 +122,8 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
nextBillingQueue.stopQueue();
notificationQueueService.deleteNotificationQueue(nextBillingQueue.getServiceName(), nextBillingQueue.getQueueName());
}
+
+ super.stop();
}
private void processEventForInvoiceGeneration(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
index 128616f..ef72b84 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
@@ -26,6 +26,7 @@ import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.invoice.api.InvoiceInternalApi;
import org.killbill.clock.Clock;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.notificationq.api.NotificationQueueService;
public class TestInvoiceNotificationQListener extends InvoiceListener {
@@ -33,8 +34,13 @@ public class TestInvoiceNotificationQListener extends InvoiceListener {
UUID latestSubscriptionId = null;
@Inject
- public TestInvoiceNotificationQListener(final AccountInternalApi accountApi, final Clock clock, final InternalCallContextFactory internalCallContextFactory, final InvoiceDispatcher dispatcher, final InvoiceInternalApi invoiceApi) {
- super(accountApi, clock, internalCallContextFactory, null, dispatcher, invoiceApi);
+ public TestInvoiceNotificationQListener(final AccountInternalApi accountApi,
+ final Clock clock,
+ final InternalCallContextFactory internalCallContextFactory,
+ final InvoiceDispatcher dispatcher,
+ final InvoiceInternalApi invoiceApi,
+ final NotificationQueueService notificationQueueService) {
+ super(accountApi, internalCallContextFactory, dispatcher, invoiceApi, notificationQueueService, clock);
}
@Override
diff --git a/util/src/main/java/org/killbill/billing/util/listener/RetryableHandler.java b/util/src/main/java/org/killbill/billing/util/listener/RetryableHandler.java
new file mode 100644
index 0000000..7eda919
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/listener/RetryableHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.listener;
+
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.util.callcontext.CallOrigin;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
+
+public class RetryableHandler implements NotificationQueueHandler {
+
+ protected final Clock clock;
+
+ private final RetryableService retryableService;
+ private final InternalCallContextFactory internalCallContextFactory;
+
+ private final NotificationQueueHandler handlerDelegate;
+
+ public RetryableHandler(final Clock clock,
+ final RetryableService retryableService,
+ final NotificationQueueHandler handlerDelegate,
+ final InternalCallContextFactory internalCallContextFactory) {
+ this.clock = clock;
+ this.retryableService = retryableService;
+ this.handlerDelegate = handlerDelegate;
+ this.internalCallContextFactory = internalCallContextFactory;
+ }
+
+ @Override
+ public void handleReadyNotification(final NotificationEvent notificationEvent, final DateTime eventDateTime, final UUID userToken, final Long searchKey1, final Long searchKey2) {
+ try {
+ handlerDelegate.handleReadyNotification(notificationEvent, eventDateTime, userToken, searchKey1, searchKey2);
+ } catch (final RetryException e) {
+ // Let the retry queue handle the exception
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(searchKey2,
+ searchKey1,
+ "RetryableHandler",
+ CallOrigin.INTERNAL,
+ UserType.SYSTEM,
+ userToken);
+ retryableService.scheduleRetry(e, notificationEvent, eventDateTime, internalCallContext, 1);
+ }
+ }
+}
diff --git a/util/src/main/java/org/killbill/billing/util/listener/RetryableService.java b/util/src/main/java/org/killbill/billing/util/listener/RetryableService.java
new file mode 100644
index 0000000..65e2626
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/listener/RetryableService.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.listener;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.killbill.billing.callcontext.InternalCallContext;
+import org.killbill.billing.util.callcontext.CallOrigin;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.billing.util.callcontext.UserType;
+import org.killbill.billing.util.jackson.ObjectMapper;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
+import org.killbill.queue.api.QueueEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RetryableService {
+
+ public static final String RETRYABLE_SERVICE_NAME = "notifications-retries";
+
+ private static final Logger log = LoggerFactory.getLogger(RetryableService.class);
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ private final NotificationQueueService notificationQueueService;
+ private final InternalCallContextFactory internalCallContextFactory;
+
+ private NotificationQueue retryNotificationQueue;
+
+ public RetryableService(final NotificationQueueService notificationQueueService, final InternalCallContextFactory internalCallContextFactory) {
+ this.notificationQueueService = notificationQueueService;
+ this.internalCallContextFactory = internalCallContextFactory;
+ }
+
+ public void initialize(final NotificationQueue originalQueue, final NotificationQueueHandler originalQueueHandler) {
+ initialize(originalQueue.getQueueName(), originalQueueHandler);
+ }
+
+ public void initialize(final String queueName, final NotificationQueueHandler originalQueueHandler) {
+ try {
+ final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
+
+ @Override
+ public void handleReadyNotification(final NotificationEvent eventJson,
+ final DateTime eventDateTime,
+ final UUID userToken,
+ final Long searchKey1,
+ final Long searchKey2) {
+ if (eventJson instanceof RetryNotificationEvent) {
+ final RetryNotificationEvent retryNotificationEvent = (RetryNotificationEvent) eventJson;
+
+ final NotificationEvent notificationEvent;
+ try {
+ notificationEvent = (NotificationEvent) objectMapper.readValue(retryNotificationEvent.getOriginalEvent(), retryNotificationEvent.getOriginalEventClass());
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ originalQueueHandler.handleReadyNotification(notificationEvent,
+ eventDateTime,
+ userToken,
+ searchKey1,
+ searchKey2);
+ } catch (final RetryException e) {
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(searchKey2,
+ searchKey1,
+ "RetryableService",
+ CallOrigin.INTERNAL,
+ UserType.SYSTEM,
+ userToken);
+ scheduleRetry(e,
+ notificationEvent,
+ retryNotificationEvent.getOriginalEffectiveDate(),
+ internalCallContext,
+ retryNotificationEvent.getRetryNb() + 1);
+ }
+ } else {
+ log.error("Retry service received an unexpected event className='{}'", eventJson.getClass());
+ }
+ }
+ };
+
+ this.retryNotificationQueue = notificationQueueService.createNotificationQueue(RETRYABLE_SERVICE_NAME,
+ queueName,
+ notificationQueueHandler);
+ } catch (final NotificationQueueAlreadyExists notificationQueueAlreadyExists) {
+ throw new RuntimeException(notificationQueueAlreadyExists);
+ }
+ }
+
+ public void start() {
+ retryNotificationQueue.startQueue();
+ }
+
+ public void stop() throws NoSuchNotificationQueue {
+ if (retryNotificationQueue != null) {
+ retryNotificationQueue.stopQueue();
+ notificationQueueService.deleteNotificationQueue(retryNotificationQueue.getServiceName(), retryNotificationQueue.getQueueName());
+ }
+ }
+
+ public void scheduleRetry(final RetryException exception,
+ final QueueEvent originalNotificationEvent,
+ final DateTime originalEffectiveDate,
+ final InternalCallContext context,
+ final int retryNb) {
+ final DateTime effectiveDate = computeRetryDate(exception, originalEffectiveDate, retryNb);
+ if (effectiveDate == null) {
+ log.warn("Error processing event, NOT scheduling retry for event='{}', retryNb='{}'", originalNotificationEvent, effectiveDate, retryNb, exception);
+ return;
+ }
+ log.warn("Error processing event, scheduling retry for event='{}', effectiveDate='{}', retryNb='{}'", originalNotificationEvent, effectiveDate, retryNb, exception);
+
+ try {
+ final NotificationEvent retryNotificationEvent = new RetryNotificationEvent(objectMapper.writeValueAsString(originalNotificationEvent), originalNotificationEvent.getClass(), originalEffectiveDate, retryNb);
+ retryNotificationQueue.recordFutureNotification(effectiveDate, retryNotificationEvent, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId());
+ } catch (final IOException e) {
+ log.error("Unable to schedule retry for event='{}', effectiveDate='{}'", originalNotificationEvent, effectiveDate, e);
+ }
+ }
+
+ private DateTime computeRetryDate(final RetryException retryException, final DateTime initialEventDateTime, final int retryNb) {
+ final List<Period> retrySchedule = retryException.getRetrySchedule();
+ if (retrySchedule == null) {
+ return null;
+ }
+ if (retryNb > retrySchedule.size()) {
+ return initialEventDateTime.plusDays(retryNb - retrySchedule.size());
+ } else {
+ final Period nextDelay = retrySchedule.get(retryNb - 1);
+ return initialEventDateTime.plus(nextDelay);
+ }
+ }
+}
diff --git a/util/src/main/java/org/killbill/billing/util/listener/RetryableSubscriber.java b/util/src/main/java/org/killbill/billing/util/listener/RetryableSubscriber.java
new file mode 100644
index 0000000..cf0df1a
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/listener/RetryableSubscriber.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.listener;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.bus.api.BusEvent;
+import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationEvent;
+import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.reflect.TypeToken;
+
+public class RetryableSubscriber extends RetryableHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(RetryableSubscriber.class);
+
+ public RetryableSubscriber(final Clock clock,
+ final RetryableService retryableService,
+ final NotificationQueueHandler handlerDelegate,
+ final InternalCallContextFactory internalCallContextFactory) {
+ super(clock, retryableService, handlerDelegate, internalCallContextFactory);
+ }
+
+ public void handleEvent(final BusEvent event) {
+ handleReadyNotification(new SubscriberNotificationEvent(event, event.getClass()),
+ clock.getUTCNow(),
+ event.getUserToken(),
+ event.getSearchKey1(),
+ event.getSearchKey2());
+ }
+
+ public interface SubscriberAction<T extends BusEvent> {
+
+ void run(T event);
+ }
+
+ public static final class SubscriberQueueHandler implements NotificationQueueHandler {
+
+ // Similar to com.google.common.eventbus.SubscriberRegistry
+ private static final LoadingCache<Class<?>, ImmutableSet<Class<?>>> FLATTEN_HIERARCHY_CACHE =
+ CacheBuilder.newBuilder()
+ .build(
+ new CacheLoader<Class<?>, ImmutableSet<Class<?>>>() {
+ @Override
+ public ImmutableSet<Class<?>> load(final Class<?> concreteClass) {
+ return ImmutableSet.<Class<?>>copyOf(TypeToken.of(concreteClass).getTypes().rawTypes());
+ }
+ });
+
+ private final Map<Class<?>, SubscriberAction<? extends BusEvent>> actions = new HashMap<Class<?>, SubscriberAction<? extends BusEvent>>();
+
+ public SubscriberQueueHandler() {}
+
+ public void subscribe(final Class<? extends BusEvent> busEventClass, final SubscriberAction<? extends BusEvent> action) {
+ actions.put(busEventClass, action);
+ }
+
+ @Override
+ public void handleReadyNotification(final NotificationEvent eventJson, final DateTime eventDateTime, final UUID userToken, final Long searchKey1, final Long searchKey2) {
+ if (!(eventJson instanceof SubscriberNotificationEvent)) {
+ log.error("SubscriberQueueHandler received an unexpected event className='{}'", eventJson.getClass());
+ } else {
+ final BusEvent busEvent = ((SubscriberNotificationEvent) eventJson).getBusEvent();
+
+ final ImmutableSet<Class<?>> eventTypes = FLATTEN_HIERARCHY_CACHE.getUnchecked(busEvent.getClass());
+ for (final Class<?> eventType : eventTypes) {
+ final SubscriberAction<BusEvent> next = (SubscriberAction<BusEvent>) actions.get(eventType);
+ if (next != null) {
+ next.run(busEvent);
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/util/src/main/java/org/killbill/billing/util/listener/RetryException.java b/util/src/main/java/org/killbill/billing/util/listener/RetryException.java
new file mode 100644
index 0000000..266f7c2
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/listener/RetryException.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.listener;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.Period;
+
+public class RetryException extends RuntimeException {
+
+ public static final List<Period> DEFAULT_RETRY_SCHEDULE = Arrays.asList(Period.minutes(5),
+ Period.minutes(15),
+ Period.hours(1),
+ Period.hours(6),
+ Period.hours(24));
+
+ private final List<Period> retrySchedule;
+
+ public RetryException() {
+ this(null, null);
+ }
+
+ public RetryException(final Exception e) {
+ this(e, null);
+ }
+
+ public RetryException(final Exception e, @Nullable final List<Period> retrySchedule) {
+ super(e);
+ this.retrySchedule = retrySchedule != null ? retrySchedule : DEFAULT_RETRY_SCHEDULE;
+ }
+
+ public List<Period> getRetrySchedule() {
+ return retrySchedule;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s (retrySchedule: %s)", super.toString(), retrySchedule);
+ }
+}
diff --git a/util/src/main/java/org/killbill/billing/util/listener/RetryNotificationEvent.java b/util/src/main/java/org/killbill/billing/util/listener/RetryNotificationEvent.java
new file mode 100644
index 0000000..4a1486f
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/listener/RetryNotificationEvent.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.listener;
+
+import org.joda.time.DateTime;
+import org.killbill.notificationq.api.NotificationEvent;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class RetryNotificationEvent implements NotificationEvent {
+
+ private final String originalEvent;
+ private final Class originalEventClass;
+ private final DateTime originalEffectiveDate;
+ private final int retryNb;
+
+ @JsonCreator
+ public RetryNotificationEvent(@JsonProperty("originalEvent") final String originalEvent,
+ @JsonProperty("originalEventClass") final Class originalEventClass,
+ @JsonProperty("originalEffectiveDate") final DateTime originalEffectiveDate,
+ @JsonProperty("retryNb") final int retryNb) {
+ this.originalEvent = originalEvent;
+ this.originalEventClass = originalEventClass;
+ this.originalEffectiveDate = originalEffectiveDate;
+ this.retryNb = retryNb;
+ }
+
+ public String getOriginalEvent() {
+ return originalEvent;
+ }
+
+ public Class getOriginalEventClass() {
+ return originalEventClass;
+ }
+
+ public DateTime getOriginalEffectiveDate() {
+ return originalEffectiveDate;
+ }
+
+ public int getRetryNb() {
+ return retryNb;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("RetryNotificationEvent{");
+ sb.append("originalEvent=").append(originalEvent);
+ sb.append(", originalEventClass=").append(originalEventClass);
+ sb.append(", originalEffectiveDate=").append(originalEffectiveDate);
+ sb.append(", retryNb=").append(retryNb);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final RetryNotificationEvent that = (RetryNotificationEvent) o;
+
+ if (retryNb != that.retryNb) {
+ return false;
+ }
+ if (originalEvent != null ? !originalEvent.equals(that.originalEvent) : that.originalEvent != null) {
+ return false;
+ }
+ if (originalEventClass != null ? !originalEventClass.equals(that.originalEventClass) : that.originalEventClass != null) {
+ return false;
+ }
+ return originalEffectiveDate != null ? originalEffectiveDate.compareTo(that.originalEffectiveDate) == 0 : that.originalEffectiveDate == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = originalEvent != null ? originalEvent.hashCode() : 0;
+ result = 31 * result + (originalEventClass != null ? originalEventClass.hashCode() : 0);
+ result = 31 * result + (originalEffectiveDate != null ? originalEffectiveDate.hashCode() : 0);
+ result = 31 * result + retryNb;
+ return result;
+ }
+}
diff --git a/util/src/main/java/org/killbill/billing/util/listener/SubscriberNotificationEvent.java b/util/src/main/java/org/killbill/billing/util/listener/SubscriberNotificationEvent.java
new file mode 100644
index 0000000..542f488
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/listener/SubscriberNotificationEvent.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.listener;
+
+import org.killbill.bus.api.BusEvent;
+import org.killbill.notificationq.api.NotificationEvent;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+@JsonDeserialize(using = SubscriberNotificationEventDeserializer.class)
+public class SubscriberNotificationEvent implements NotificationEvent {
+
+ private final BusEvent busEvent;
+ private final Class busEventClass;
+
+ @JsonCreator
+ public SubscriberNotificationEvent(@JsonProperty("busEvent") final BusEvent busEvent,
+ @JsonProperty("busEventClass") final Class busEventClass) {
+ this.busEvent = busEvent;
+ this.busEventClass = busEventClass;
+ }
+
+ public BusEvent getBusEvent() {
+ return busEvent;
+ }
+
+ public Class getBusEventClass() {
+ return busEventClass;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("SubscriberNotificationEvent{");
+ sb.append("busEvent=").append(busEvent);
+ sb.append(", busEventClass=").append(busEventClass);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final SubscriberNotificationEvent that = (SubscriberNotificationEvent) o;
+
+ if (busEvent != null ? !busEvent.equals(that.busEvent) : that.busEvent != null) {
+ return false;
+ }
+ return busEventClass != null ? busEventClass.equals(that.busEventClass) : that.busEventClass == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = busEvent != null ? busEvent.hashCode() : 0;
+ result = 31 * result + (busEventClass != null ? busEventClass.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/util/src/main/java/org/killbill/billing/util/listener/SubscriberNotificationEventDeserializer.java b/util/src/main/java/org/killbill/billing/util/listener/SubscriberNotificationEventDeserializer.java
new file mode 100644
index 0000000..aa52cff
--- /dev/null
+++ b/util/src/main/java/org/killbill/billing/util/listener/SubscriberNotificationEventDeserializer.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.listener;
+
+import java.io.IOException;
+
+import org.killbill.billing.util.jackson.ObjectMapper;
+import org.killbill.bus.api.BusEvent;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class SubscriberNotificationEventDeserializer extends JsonDeserializer<SubscriberNotificationEvent> {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ public SubscriberNotificationEvent deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException, JsonProcessingException {
+ final JsonNode node = p.getCodec().readTree(p);
+
+ final Class<BusEvent> busEventClass;
+ try {
+ busEventClass = (Class<BusEvent>) Class.forName(node.get("busEventClass").textValue());
+ } catch (final ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+
+ return new SubscriberNotificationEvent(objectMapper.treeToValue(node.get("busEvent"), busEventClass), busEventClass);
+ }
+}