killbill-aplcache
Changes
beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java 265(+265 -0)
invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java 40(+27 -13)
util/pom.xml 4(+4 -0)
Details
diff --git a/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java b/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java
new file mode 100644
index 0000000..1f54f9c
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.invoice.api;
+
+import org.killbill.billing.platform.api.KillbillService;
+
+public interface InvoiceListenerService extends KillbillService {
+}
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
index 606d7dc..fc49f94 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014 Groupon, Inc
- * Copyright 2014 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -27,10 +27,8 @@ import org.joda.time.DateTime;
import org.killbill.billing.DBTestingHelper;
import org.killbill.billing.account.api.Account;
import org.killbill.billing.api.TestApiListener.NextEvent;
-import org.killbill.billing.beatrix.extbus.DefaultBusExternalEvent;
import org.killbill.billing.callcontext.DefaultCallContext;
import org.killbill.billing.catalog.api.BillingPeriod;
-import org.killbill.billing.catalog.api.PriceListSet;
import org.killbill.billing.catalog.api.ProductCategory;
import org.killbill.billing.entitlement.api.DefaultEntitlement;
import org.killbill.billing.notification.plugin.api.ExtBusEvent;
@@ -46,11 +44,6 @@ import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.UserType;
import org.killbill.billing.util.jackson.ObjectMapper;
-import org.killbill.billing.util.nodes.NodeCommand;
-import org.killbill.billing.util.nodes.NodeCommandMetadata;
-import org.killbill.billing.util.nodes.NodeCommandProperty;
-import org.killbill.billing.util.nodes.PluginNodeCommandMetadata;
-import org.killbill.billing.util.nodes.SystemNodeCommandType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -58,65 +51,30 @@ import org.testng.annotations.Test;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.Subscribe;
-import static org.awaitility.Awaitility.await;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
import static org.testng.Assert.assertNotNull;
public class TestPublicBus extends TestIntegrationBase {
- private PublicListener publicListener;
+ private static final ObjectMapper mapper = new ObjectMapper();
+ private PublicListener publicListener;
private AtomicInteger externalBusCount;
- private final ObjectMapper mapper = new ObjectMapper();
-
@Override
protected KillbillConfigSource getConfigSource() {
- ImmutableMap additionalProperties = new ImmutableMap.Builder()
- .put("org.killbill.billing.util.broadcast.rate", "500ms")
- .build();
+ final ImmutableMap<String, String> additionalProperties = new ImmutableMap.Builder<String, String>().put("org.killbill.billing.util.broadcast.rate", "500ms")
+ .build();
return getConfigSource("/beatrix.properties", additionalProperties);
}
-
- public class PublicListener {
-
- @Subscribe
- public void handleExternalEvents(final ExtBusEvent event) {
- log.info("GOT EXT EVENT " + event);
-
- if (event.getEventType() == ExtBusEventType.SUBSCRIPTION_CREATION ||
- event.getEventType() == ExtBusEventType.SUBSCRIPTION_CANCEL ||
- event.getEventType() == ExtBusEventType.SUBSCRIPTION_PHASE ||
- event.getEventType() == ExtBusEventType.SUBSCRIPTION_CHANGE ||
- event.getEventType() == ExtBusEventType.SUBSCRIPTION_UNCANCEL ||
- event.getEventType() == ExtBusEventType.SUBSCRIPTION_BCD_CHANGE) {
- try {
- final SubscriptionMetadata obj = (SubscriptionMetadata) mapper.readValue(event.getMetaData(), SubscriptionMetadata.class);
- Assert.assertNotNull(obj.getBundleExternalKey());
- Assert.assertNotNull(obj.getActionType());
- } catch (JsonParseException e) {
- Assert.fail("Could not deserialize metada section", e);
- } catch (JsonMappingException e) {
- Assert.fail("Could not deserialize metada section", e);
- } catch (IOException e) {
- Assert.fail("Could not deserialize metada section", e);
- }
- }
-
- externalBusCount.incrementAndGet();
-
- }
- }
-
@Override
@BeforeMethod(groups = "slow")
public void beforeMethod() throws Exception {
-
/*
We copy the initialization instead of invoking the super method so we can add the registration
of the publicBus event;
@@ -162,7 +120,6 @@ public class TestPublicBus extends TestIntegrationBase {
@Test(groups = "slow")
public void testSimple() throws Exception {
-
final DateTime initialDate = new DateTime(2012, 2, 1, 0, 3, 42, 0, testTimeZone);
final int billingDay = 2;
@@ -189,7 +146,6 @@ public class TestPublicBus extends TestIntegrationBase {
addDaysAndCheckForCompletion(31, NextEvent.PHASE, NextEvent.INVOICE, NextEvent.PAYMENT, NextEvent.INVOICE_PAYMENT);
-
await().atMost(10, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
@@ -197,12 +153,10 @@ public class TestPublicBus extends TestIntegrationBase {
return externalBusCount.get() == 10;
}
});
-
}
@Test(groups = "slow")
public void testTenantKVChange() throws Exception {
-
final TenantData tenantData = new DefaultTenant(null, clock.getUTCNow(), clock.getUTCNow(), "MY_TENANT", "key", "s3Cr3T");
final CallContext contextWithNoTenant = new DefaultCallContext(null, null, "loulou", CallOrigin.EXTERNAL, UserType.ADMIN, "no reason", "hum", UUID.randomUUID(), clock);
final Tenant tenant = tenantUserApi.createTenant(tenantData, contextWithNoTenant);
@@ -219,4 +173,34 @@ public class TestPublicBus extends TestIntegrationBase {
}
});
}
+
+ public class PublicListener {
+
+ @Subscribe
+ public void handleExternalEvents(final ExtBusEvent event) {
+ log.info("GOT EXT EVENT " + event);
+
+ if (event.getEventType() == ExtBusEventType.SUBSCRIPTION_CREATION ||
+ event.getEventType() == ExtBusEventType.SUBSCRIPTION_CANCEL ||
+ event.getEventType() == ExtBusEventType.SUBSCRIPTION_PHASE ||
+ event.getEventType() == ExtBusEventType.SUBSCRIPTION_CHANGE ||
+ event.getEventType() == ExtBusEventType.SUBSCRIPTION_UNCANCEL ||
+ event.getEventType() == ExtBusEventType.SUBSCRIPTION_BCD_CHANGE) {
+ try {
+ final SubscriptionMetadata obj = (SubscriptionMetadata) mapper.readValue(event.getMetaData(), SubscriptionMetadata.class);
+ Assert.assertNotNull(obj.getBundleExternalKey());
+ Assert.assertNotNull(obj.getActionType());
+ } catch (final JsonParseException e) {
+ Assert.fail("Could not deserialize metada section", e);
+ } catch (final JsonMappingException e) {
+ Assert.fail("Could not deserialize metada section", e);
+ } catch (final IOException e) {
+ Assert.fail("Could not deserialize metada section", e);
+ }
+ }
+
+ externalBusCount.incrementAndGet();
+
+ }
+ }
}
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java
new file mode 100644
index 0000000..6ae2936
--- /dev/null
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.beatrix.integration;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.awaitility.Awaitility;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+import org.killbill.billing.account.api.Account;
+import org.killbill.billing.account.api.AccountData;
+import org.killbill.billing.api.TestApiListener.NextEvent;
+import org.killbill.billing.beatrix.util.InvoiceChecker.ExpectedInvoiceItemCheck;
+import org.killbill.billing.catalog.api.BillingPeriod;
+import org.killbill.billing.catalog.api.ProductCategory;
+import org.killbill.billing.entitlement.api.DefaultEntitlement;
+import org.killbill.billing.invoice.api.DefaultInvoiceService;
+import org.killbill.billing.invoice.api.Invoice;
+import org.killbill.billing.invoice.api.InvoiceItem;
+import org.killbill.billing.invoice.api.InvoiceItemType;
+import org.killbill.billing.invoice.model.TaxInvoiceItem;
+import org.killbill.billing.invoice.notification.DefaultNextBillingDateNotifier;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApi;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApiRetryException;
+import org.killbill.billing.osgi.api.OSGIServiceDescriptor;
+import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.util.callcontext.CallContext;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import static org.testng.Assert.assertEquals;
+
+public class TestWithInvoicePlugin extends TestIntegrationBase {
+
+ @Inject
+ private OSGIServiceRegistration<InvoicePluginApi> pluginRegistry;
+
+ @Inject
+ private NotificationQueueService notificationQueueService;
+
+ private TestInvoicePluginApi testInvoicePluginApi;
+
+ @BeforeClass(groups = "slow")
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+
+ this.testInvoicePluginApi = new TestInvoicePluginApi();
+ pluginRegistry.registerService(new OSGIServiceDescriptor() {
+ @Override
+ public String getPluginSymbolicName() {
+ return "TestInvoicePluginApi";
+ }
+
+ @Override
+ public String getPluginName() {
+ return "TestInvoicePluginApi";
+ }
+
+ @Override
+ public String getRegistrationName() {
+ return "TestInvoicePluginApi";
+ }
+ }, testInvoicePluginApi);
+ }
+
+ @Test(groups = "slow")
+ public void testWithRetries() throws Exception {
+ // We take april as it has 30 days (easier to play with BCD)
+ // Set clock to the initial start date - we implicitly assume here that the account timezone is UTC
+ clock.setDay(new LocalDate(2012, 4, 1));
+
+ final AccountData accountData = getAccountData(1);
+ final Account account = createAccountWithNonOsgiPaymentMethod(accountData);
+ accountChecker.checkAccount(account.getId(), accountData, callContext);
+
+ // Make invoice plugin fail
+ testInvoicePluginApi.shouldThrowException = true;
+
+ // Create original subscription (Trial PHASE)
+ final DefaultEntitlement bpSubscription = createBaseEntitlementAndCheckForCompletion(account.getId(), "bundleKey", "Pistol", ProductCategory.BASE, BillingPeriod.MONTHLY, NextEvent.CREATE, NextEvent.BLOCK);
+ subscriptionChecker.checkSubscriptionCreated(bpSubscription.getId(), internalCallContext);
+ // Invoice failed to generate
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 0);
+
+ // Verify bus event has moved to the retry service (can't easily check the timestamp unfortunately)
+ checkRetryBusEvents();
+
+ // Add 5'
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ checkRetryBusEvents();
+
+ // Fix invoice plugin
+ testInvoicePluginApi.shouldThrowException = false;
+
+ busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+ clock.addDeltaFromReality(10 * 60 * 1000);
+ assertListenerStatus();
+ // No notification in the main queue at this point (the PHASE event is the trigger for the next one)
+ checkNotificationsNoRetry(0);
+
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 1);
+ invoiceChecker.checkInvoice(account.getId(),
+ 1,
+ callContext,
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 4, 1), null, InvoiceItemType.FIXED, BigDecimal.ZERO),
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 4, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+ busHandler.pushExpectedEvents(NextEvent.PHASE, NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+ clock.setDay(new LocalDate("2012-05-01"));
+ assertListenerStatus();
+ checkNotificationsNoRetry(1);
+
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 2);
+ invoiceChecker.checkInvoice(account.getId(),
+ 2,
+ callContext,
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 5, 1), new LocalDate(2012, 6, 1), InvoiceItemType.RECURRING, new BigDecimal("29.95")),
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 5, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+ // Make invoice plugin fail again
+ testInvoicePluginApi.shouldThrowException = true;
+
+ clock.addMonths(1);
+ assertListenerStatus();
+
+ // Invoice failed to generate
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 2);
+
+ // Verify notification has moved to the retry service
+ checkRetryNotifications("2012-06-01T00:05:00");
+
+ // Add 5'
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ checkRetryNotifications("2012-06-01T00:15:00");
+
+ // Fix invoice plugin
+ testInvoicePluginApi.shouldThrowException = false;
+
+ busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+ clock.addDeltaFromReality(10 * 60 * 1000);
+ assertListenerStatus();
+ checkNotificationsNoRetry(1);
+
+ // Invoice was generated
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 3);
+ invoiceChecker.checkInvoice(account.getId(),
+ 3,
+ callContext,
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 1), new LocalDate(2012, 7, 1), InvoiceItemType.RECURRING, new BigDecimal("29.95")),
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+ // Make invoice plugin fail again
+ testInvoicePluginApi.shouldThrowException = true;
+
+ clock.setTime(new DateTime("2012-07-01T00:00:00"));
+ assertListenerStatus();
+
+ // Invoice failed to generate
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 3);
+
+ // Verify notification has moved to the retry service
+ checkRetryNotifications("2012-07-01T00:05:00");
+
+ testInvoicePluginApi.shouldThrowException = false;
+
+ busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.PAYMENT, NextEvent.INVOICE_PAYMENT);
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ assertListenerStatus();
+ checkNotificationsNoRetry(1);
+
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 4);
+ }
+
+ private void checkRetryBusEvents() throws NoSuchNotificationQueue {
+ // Verify notification(s) moved to the retry queue
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ final List<NotificationEventWithMetadata> futureInvoiceRetryableBusEvents = getFutureInvoiceRetryableBusEvents();
+ return futureInvoiceRetryableBusEvents.size() == 1;
+ }
+ });
+ assertEquals(getFutureInvoiceRetryableBusEvents().size(), 1);
+ assertEquals(getFutureInvoiceNotifications().size(), 0);
+ }
+
+ private void checkRetryNotifications(final String retryDateTime) throws NoSuchNotificationQueue {
+ // Verify notification(s) moved to the retry queue
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ final List<NotificationEventWithMetadata> futureInvoiceRetryableNotifications = getFutureInvoiceRetryableNotifications();
+ return futureInvoiceRetryableNotifications.size() == 1 && futureInvoiceRetryableNotifications.get(0).getEffectiveDate().compareTo(new DateTime(retryDateTime, DateTimeZone.UTC)) == 0;
+ }
+ });
+ assertEquals(getFutureInvoiceRetryableNotifications().size(), 1);
+ assertEquals(getFutureInvoiceNotifications().size(), 0);
+ }
+
+ private void checkNotificationsNoRetry(final int main) throws NoSuchNotificationQueue {
+ assertEquals(getFutureInvoiceRetryableNotifications().size(), 0);
+ assertEquals(getFutureInvoiceNotifications().size(), main);
+ }
+
+ private List<NotificationEventWithMetadata> getFutureInvoiceNotifications() throws NoSuchNotificationQueue {
+ final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME, DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
+ return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+ }
+
+ private List<NotificationEventWithMetadata> getFutureInvoiceRetryableNotifications() throws NoSuchNotificationQueue {
+ final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
+ return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+ }
+
+ private List<NotificationEventWithMetadata> getFutureInvoiceRetryableBusEvents() throws NoSuchNotificationQueue {
+ final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, "invoice-listener");
+ return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+ }
+
+ public class TestInvoicePluginApi implements InvoicePluginApi {
+
+ boolean shouldThrowException = false;
+
+ @Override
+ public List<InvoiceItem> getAdditionalInvoiceItems(final Invoice invoice, final boolean isDryRun, final Iterable<PluginProperty> pluginProperties, final CallContext callContext) {
+ if (shouldThrowException) {
+ throw new InvoicePluginApiRetryException();
+ }
+ return ImmutableList.<InvoiceItem>of(createTaxInvoiceItem(invoice));
+ }
+
+ private InvoiceItem createTaxInvoiceItem(final Invoice invoice) {
+ return new TaxInvoiceItem(invoice.getId(), invoice.getAccountId(), null, "Tax Item", clock.getUTCNow().toLocalDate(), BigDecimal.ONE, invoice.getCurrency());
+ }
+ }
+}
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java b/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
index 2fc4a1f..eabea2e 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
@@ -26,6 +26,7 @@ import org.killbill.billing.invoice.ParkedAccountsManager;
import org.killbill.billing.invoice.api.DefaultInvoiceService;
import org.killbill.billing.invoice.api.InvoiceApiHelper;
import org.killbill.billing.invoice.api.InvoiceInternalApi;
+import org.killbill.billing.invoice.api.InvoiceListenerService;
import org.killbill.billing.invoice.api.InvoicePaymentApi;
import org.killbill.billing.invoice.api.InvoiceService;
import org.killbill.billing.invoice.api.InvoiceUserApi;
@@ -99,8 +100,9 @@ public class DefaultInvoiceModule extends KillBillModule implements InvoiceModul
bind(InvoiceConfig.class).to(MultiTenantInvoiceConfig.class).asEagerSingleton();
}
- protected void installInvoiceService() {
+ protected void installInvoiceServices() {
bind(InvoiceService.class).to(DefaultInvoiceService.class).asEagerSingleton();
+ bind(InvoiceListenerService.class).to(InvoiceListener.class).asEagerSingleton();
}
protected void installResourceBundleFactory() {
@@ -143,7 +145,7 @@ public class DefaultInvoiceModule extends KillBillModule implements InvoiceModul
installConfig();
installInvoicePluginApi();
- installInvoiceService();
+ installInvoiceServices();
installNotifiers();
installInvoiceDispatcher();
installInvoiceListener();
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
index f81483d..bf7e879 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
@@ -31,13 +31,21 @@ import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
import org.killbill.billing.events.InvoiceCreationInternalEvent;
import org.killbill.billing.invoice.api.InvoiceApiException;
import org.killbill.billing.invoice.api.InvoiceInternalApi;
+import org.killbill.billing.invoice.api.InvoiceListenerService;
import org.killbill.billing.invoice.api.user.DefaultInvoiceAdjustmentEvent;
+import org.killbill.billing.platform.api.LifecycleHandlerType;
+import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.UserType;
-import org.killbill.billing.util.config.definition.InvoiceConfig;
import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.killbill.queue.retry.RetryableSubscriber;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberAction;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberQueueHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,78 +53,157 @@ import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
-public class InvoiceListener {
+@SuppressWarnings("TypeMayBeWeakened")
+public class InvoiceListener extends RetryableService implements InvoiceListenerService {
+
+ public static final String INVOICE_LISTENER_SERVICE_NAME = "invoice-listener-service";
private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);
private final InvoiceDispatcher dispatcher;
private final InternalCallContextFactory internalCallContextFactory;
- private final AccountInternalApi accountApi;
private final InvoiceInternalApi invoiceApi;
- private final InvoiceConfig invoiceConfig;
- private final Clock clock;
+ private final RetryableSubscriber retryableSubscriber;
+ private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
@Inject
- public InvoiceListener(final AccountInternalApi accountApi, final Clock clock, final InternalCallContextFactory internalCallContextFactory,
- final InvoiceConfig invoiceConfig, final InvoiceDispatcher dispatcher, InvoiceInternalApi invoiceApi) {
- this.accountApi = accountApi;
+ public InvoiceListener(final AccountInternalApi accountApi,
+ final InternalCallContextFactory internalCallContextFactory,
+ final InvoiceDispatcher dispatcher,
+ final InvoiceInternalApi invoiceApi,
+ final NotificationQueueService notificationQueueService,
+ final Clock clock) {
+ super(notificationQueueService);
this.dispatcher = dispatcher;
- this.invoiceConfig = invoiceConfig;
this.internalCallContextFactory = internalCallContextFactory;
- this.clock = clock;
this.invoiceApi = invoiceApi;
+
+ subscriberQueueHandler.subscribe(EffectiveSubscriptionInternalEvent.class,
+ new SubscriberAction<EffectiveSubscriptionInternalEvent>() {
+ @Override
+ public void run(final EffectiveSubscriptionInternalEvent event) {
+ try {
+ // Skip future uncancel event
+ // Skip events which are marked as not being the last one
+ if (event.getTransitionType() == SubscriptionBaseTransitionType.UNCANCEL ||
+ event.getRemainingEventsForUserOperation() > 0) {
+ return;
+ }
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ dispatcher.processSubscriptionForInvoiceGeneration(event, context);
+ } catch (final InvoiceApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ }
+ }
+ });
+ subscriberQueueHandler.subscribe(BlockingTransitionInternalEvent.class,
+ new SubscriberAction<BlockingTransitionInternalEvent>() {
+ @Override
+ public void run(final BlockingTransitionInternalEvent event) {
+ // We are only interested in blockBilling or unblockBilling transitions.
+ if (!event.isTransitionedToUnblockedBilling() && !event.isTransitionedToBlockedBilling()) {
+ return;
+ }
+
+ try {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ final UUID accountId = accountApi.getByRecordId(event.getSearchKey1(), context);
+ dispatcher.processAccountFromNotificationOrBusEvent(accountId, null, null, context);
+ } catch (final InvoiceApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ } catch (final AccountApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ }
+ }
+ });
+ subscriberQueueHandler.subscribe(InvoiceCreationInternalEvent.class,
+ new SubscriberAction<InvoiceCreationInternalEvent>() {
+ @Override
+ public void run(final InvoiceCreationInternalEvent event) {
+ try {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "CreateParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ final Account account = accountApi.getAccountById(event.getAccountId(), context);
+
+ // catch children invoices and populate the parent summary invoice
+ if (isChildrenAccountAndPaymentDelegated(account)) {
+ dispatcher.processParentInvoiceForInvoiceGeneration(account, event.getInvoiceId(), context);
+ }
+
+ } catch (final InvoiceApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ } catch (final AccountApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ }
+ }
+ });
+ subscriberQueueHandler.subscribe(DefaultInvoiceAdjustmentEvent.class,
+ new SubscriberAction<DefaultInvoiceAdjustmentEvent>() {
+ @Override
+ public void run(final DefaultInvoiceAdjustmentEvent event) {
+ try {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "AdjustParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ final Account account = accountApi.getAccountById(event.getAccountId(), context);
+
+ // catch children invoices and populate the parent summary invoice
+ if (isChildrenAccountAndPaymentDelegated(account)) {
+ dispatcher.processParentInvoiceForAdjustments(account, event.getInvoiceId(), context);
+ }
+ } catch (final InvoiceApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ } catch (final AccountApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ }
+ }
+ });
+ this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler);
+ }
+
+ @Override
+ public String getName() {
+ return INVOICE_LISTENER_SERVICE_NAME;
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+ public void initialize() {
+ super.initialize("invoice-listener", subscriberQueueHandler);
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+ public void start() {
+ super.start();
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+ public void stop() throws NoSuchNotificationQueue {
+ super.stop();
}
@AllowConcurrentEvents
@Subscribe
public void handleSubscriptionTransition(final EffectiveSubscriptionInternalEvent event) {
- try {
- // Skip future uncancel event
- // Skip events which are marked as not being the last one
- if (event.getTransitionType() == SubscriptionBaseTransitionType.UNCANCEL ||
- event.getRemainingEventsForUserOperation() > 0) {
- return;
- }
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- dispatcher.processSubscriptionForInvoiceGeneration(event, context);
- } catch (InvoiceApiException e) {
- log.warn("Unable to process event {}", event, e);
- }
+ retryableSubscriber.handleEvent(event);
}
@AllowConcurrentEvents
@Subscribe
public void handleBlockingStateTransition(final BlockingTransitionInternalEvent event) {
- // We are only interested in blockBilling or unblockBilling transitions.
- if (!event.isTransitionedToUnblockedBilling() && !event.isTransitionedToBlockedBilling()) {
- return;
- }
-
- try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- final UUID accountId = accountApi.getByRecordId(event.getSearchKey1(), context);
- dispatcher.processAccountFromNotificationOrBusEvent(accountId, null, null, context);
- } catch (InvoiceApiException e) {
- log.warn("Unable to process event {}", event, e);
- } catch (AccountApiException e) {
- log.warn("Unable to process event {}", event, e);
- }
+ retryableSubscriber.handleEvent(event);
}
public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
dispatcher.processSubscriptionForInvoiceGeneration(subscriptionId, context.toLocalDate(eventDateTime), context);
- } catch (InvoiceApiException e) {
+ } catch (final InvoiceApiException e) {
log.warn("Unable to process subscriptionId='{}', eventDateTime='{}'", subscriptionId, eventDateTime, e);
}
}
public void handleEventForInvoiceNotification(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
dispatcher.processSubscriptionForInvoiceNotification(subscriptionId, context.toLocalDate(eventDateTime), context);
- } catch (InvoiceApiException e) {
+ } catch (final InvoiceApiException e) {
log.warn("Unable to process subscriptionId='{}', eventDateTime='{}'", subscriptionId, eventDateTime, e);
}
}
@@ -124,21 +211,7 @@ public class InvoiceListener {
@AllowConcurrentEvents
@Subscribe
public void handleChildrenInvoiceCreationEvent(final InvoiceCreationInternalEvent event) {
-
- try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "CreateParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- final Account account = accountApi.getAccountById(event.getAccountId(), context);
-
- // catch children invoices and populate the parent summary invoice
- if (isChildrenAccountAndPaymentDelegated(account)) {
- dispatcher.processParentInvoiceForInvoiceGeneration(account, event.getInvoiceId(), context);
- }
-
- } catch (InvoiceApiException e) {
- log.error(e.getMessage());
- } catch (AccountApiException e) {
- log.error(e.getMessage());
- }
+ retryableSubscriber.handleEvent(event);
}
private boolean isChildrenAccountAndPaymentDelegated(final Account account) {
@@ -149,7 +222,7 @@ public class InvoiceListener {
try {
final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Commit Invoice", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
invoiceApi.commitInvoice(invoiceId, context);
- } catch (InvoiceApiException e) {
+ } catch (final InvoiceApiException e) {
// In case we commit parent invoice earlier we expect to see an INVOICE_INVALID_STATUS status
if (ErrorCode.INVOICE_INVALID_STATUS.getCode() != e.getCode()) {
log.error(e.getMessage());
@@ -160,21 +233,6 @@ public class InvoiceListener {
@AllowConcurrentEvents
@Subscribe
public void handleChildrenInvoiceAdjustmentEvent(final DefaultInvoiceAdjustmentEvent event) {
-
- try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "AdjustParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- final Account account = accountApi.getAccountById(event.getAccountId(), context);
-
- // catch children invoices and populate the parent summary invoice
- if (isChildrenAccountAndPaymentDelegated(account)) {
- dispatcher.processParentInvoiceForAdjustments(account, event.getInvoiceId(), context);
- }
-
- } catch (InvoiceApiException e) {
- log.error(e.getMessage());
- } catch (AccountApiException e) {
- log.error(e.getMessage());
- }
+ retryableSubscriber.handleEvent(event);
}
-
}
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
index ab9b982..afb7750 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -24,10 +24,20 @@ import org.killbill.billing.ObjectType;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.events.ControlTagDeletionInternalEvent;
import org.killbill.billing.invoice.api.InvoiceApiException;
+import org.killbill.billing.platform.api.KillbillService;
+import org.killbill.billing.platform.api.LifecycleHandlerType;
+import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.UserType;
import org.killbill.billing.util.tag.ControlTagType;
+import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.killbill.queue.retry.RetryableSubscriber;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberAction;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberQueueHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,28 +45,64 @@ import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
-public class InvoiceTagHandler {
+@SuppressWarnings("TypeMayBeWeakened")
+public class InvoiceTagHandler extends RetryableService implements KillbillService {
+
+ private static final String INVOICE_TAG_HANDLER_SERVICE_NAME = "invoice-tag-handler-service";
private static final Logger log = LoggerFactory.getLogger(InvoiceTagHandler.class);
private final InvoiceDispatcher dispatcher;
- private final InternalCallContextFactory internalCallContextFactory;
+ private final RetryableSubscriber retryableSubscriber;
+
+ private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
@Inject
- public InvoiceTagHandler(final InvoiceDispatcher dispatcher,
+ public InvoiceTagHandler(final Clock clock,
+ final InvoiceDispatcher dispatcher,
+ final NotificationQueueService notificationQueueService,
final InternalCallContextFactory internalCallContextFactory) {
+ super(notificationQueueService);
this.dispatcher = dispatcher;
- this.internalCallContextFactory = internalCallContextFactory;
+
+ final SubscriberAction<ControlTagDeletionInternalEvent> action = new SubscriberAction<ControlTagDeletionInternalEvent>() {
+ @Override
+ public void run(final ControlTagDeletionInternalEvent event) {
+ if (event.getTagDefinition().getName().equals(ControlTagType.AUTO_INVOICING_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
+ final UUID accountId = event.getObjectId();
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "InvoiceTagHandler", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ processUnpaid_AUTO_INVOICING_OFF_invoices(accountId, context);
+ }
+ }
+ };
+ subscriberQueueHandler.subscribe(ControlTagDeletionInternalEvent.class, action);
+ this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler);
+ }
+
+ @Override
+ public String getName() {
+ return INVOICE_TAG_HANDLER_SERVICE_NAME;
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+ public void initialize() {
+ super.initialize("invoice-tag-handler", subscriberQueueHandler);
}
@AllowConcurrentEvents
@Subscribe
public void process_AUTO_INVOICING_OFF_removal(final ControlTagDeletionInternalEvent event) {
- if (event.getTagDefinition().getName().equals(ControlTagType.AUTO_INVOICING_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
- final UUID accountId = event.getObjectId();
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "InvoiceTagHandler", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- processUnpaid_AUTO_INVOICING_OFF_invoices(accountId, context);
- }
+ retryableSubscriber.handleEvent(event);
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+ public void start() {
+ super.start();
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+ public void stop() throws NoSuchNotificationQueue {
+ super.stop();
}
private void processUnpaid_AUTO_INVOICING_OFF_invoices(final UUID accountId, final InternalCallContext context) {
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 644cec3..85391a5 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -1,7 +1,9 @@
/*
* Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
@@ -25,45 +27,50 @@ import org.killbill.billing.subscription.api.SubscriptionBase;
import org.killbill.billing.subscription.api.SubscriptionBaseInternalApi;
import org.killbill.billing.subscription.api.user.SubscriptionBaseApiException;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
-import org.killbill.billing.util.config.definition.InvoiceConfig;
+import org.killbill.clock.Clock;
import org.killbill.notificationq.api.NotificationEvent;
import org.killbill.notificationq.api.NotificationQueue;
import org.killbill.notificationq.api.NotificationQueueService;
import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
+import org.killbill.queue.retry.RetryableHandler;
+import org.killbill.queue.retry.RetryableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
-public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
-
- private static final Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+public class DefaultNextBillingDateNotifier extends RetryableService implements NextBillingDateNotifier {
public static final String NEXT_BILLING_DATE_NOTIFIER_QUEUE = "next-billing-date-queue";
+ private static final Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+
+ private final Clock clock;
private final NotificationQueueService notificationQueueService;
private final SubscriptionBaseInternalApi subscriptionApi;
private final InvoiceListener listener;
- private final InternalCallContextFactory callContextFactory;
+ private final InternalCallContextFactory internalCallContextFactory;
private NotificationQueue nextBillingQueue;
@Inject
- public DefaultNextBillingDateNotifier(final NotificationQueueService notificationQueueService,
+ public DefaultNextBillingDateNotifier(final Clock clock,
+ final NotificationQueueService notificationQueueService,
final SubscriptionBaseInternalApi subscriptionApi,
final InvoiceListener listener,
- final InternalCallContextFactory callContextFactory) {
+ final InternalCallContextFactory internalCallContextFactory) {
+ super(notificationQueueService);
+ this.clock = clock;
this.notificationQueueService = notificationQueueService;
this.subscriptionApi = subscriptionApi;
this.listener = listener;
- this.callContextFactory = callContextFactory;
+ this.internalCallContextFactory = internalCallContextFactory;
}
@Override
public void initialize() throws NotificationQueueAlreadyExists {
-
final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
@Override
public void handleReadyNotification(final NotificationEvent notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
@@ -77,7 +84,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
// Just to ensure compatibility with json that might not have that targetDate field (old versions < 0.13.6)
final DateTime targetDate = key.getTargetDate() != null ? key.getTargetDate() : eventDate;
try {
- final SubscriptionBase subscription = subscriptionApi.getSubscriptionFromId(key.getUuidKey(), callContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId));
+ final SubscriptionBase subscription = subscriptionApi.getSubscriptionFromId(key.getUuidKey(), internalCallContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId));
if (subscription == null) {
log.warn("Unable to retrieve subscriptionId='{}' for event {}", key.getUuidKey(), key);
return;
@@ -88,19 +95,24 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
} else {
processEventForInvoiceGeneration(key.getUuidKey(), targetDate, userToken, accountRecordId, tenantRecordId);
}
- } catch (SubscriptionBaseApiException e) {
+ } catch (final SubscriptionBaseApiException e) {
log.warn("Error retrieving subscriptionId='{}'", key.getUuidKey(), e);
}
}
};
+ final NotificationQueueHandler retryableHandler = new RetryableHandler(clock, this, notificationQueueHandler);
nextBillingQueue = notificationQueueService.createNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
NEXT_BILLING_DATE_NOTIFIER_QUEUE,
- notificationQueueHandler);
+ retryableHandler);
+
+ super.initialize(nextBillingQueue, notificationQueueHandler);
}
@Override
public void start() {
+ super.start();
+
nextBillingQueue.startQueue();
}
@@ -110,6 +122,8 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
nextBillingQueue.stopQueue();
notificationQueueService.deleteNotificationQueue(nextBillingQueue.getServiceName(), nextBillingQueue.getQueueName());
}
+
+ super.stop();
}
private void processEventForInvoiceGeneration(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
index 128616f..ef72b84 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
@@ -26,6 +26,7 @@ import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.invoice.api.InvoiceInternalApi;
import org.killbill.clock.Clock;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.notificationq.api.NotificationQueueService;
public class TestInvoiceNotificationQListener extends InvoiceListener {
@@ -33,8 +34,13 @@ public class TestInvoiceNotificationQListener extends InvoiceListener {
UUID latestSubscriptionId = null;
@Inject
- public TestInvoiceNotificationQListener(final AccountInternalApi accountApi, final Clock clock, final InternalCallContextFactory internalCallContextFactory, final InvoiceDispatcher dispatcher, final InvoiceInternalApi invoiceApi) {
- super(accountApi, clock, internalCallContextFactory, null, dispatcher, invoiceApi);
+ public TestInvoiceNotificationQListener(final AccountInternalApi accountApi,
+ final Clock clock,
+ final InternalCallContextFactory internalCallContextFactory,
+ final InvoiceDispatcher dispatcher,
+ final InvoiceInternalApi invoiceApi,
+ final NotificationQueueService notificationQueueService) {
+ super(accountApi, internalCallContextFactory, dispatcher, invoiceApi, notificationQueueService, clock);
}
@Override
util/pom.xml 4(+4 -0)
diff --git a/util/pom.xml b/util/pom.xml
index b6369da..8222bf9 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -210,6 +210,10 @@
</dependency>
<dependency>
<groupId>org.kill-bill.billing.plugin</groupId>
+ <artifactId>killbill-plugin-api-invoice</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.kill-bill.billing.plugin</groupId>
<artifactId>killbill-plugin-api-notification</artifactId>
</dependency>
<dependency>
diff --git a/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java b/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java
new file mode 100644
index 0000000..c33b881
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.listener;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.killbill.billing.ObjectType;
+import org.killbill.billing.account.api.ImmutableAccountData;
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.BusInternalEvent;
+import org.killbill.billing.events.ControlTagCreationInternalEvent;
+import org.killbill.billing.events.ControlTagDeletionInternalEvent;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApiRetryException;
+import org.killbill.billing.util.UtilTestSuiteWithEmbeddedDB;
+import org.killbill.billing.util.tag.DefaultTagDefinition;
+import org.killbill.billing.util.tag.api.user.DefaultControlTagCreationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.killbill.queue.retry.RetryableSubscriber;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberAction;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberQueueHandler;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class TestRetryableService extends UtilTestSuiteWithEmbeddedDB {
+
+ private static final String TEST_LISTENER = "TestListener";
+ private static final ImmutableList<Period> RETRY_SCHEDULE = ImmutableList.<Period>of(Period.hours(1), Period.days(1));
+
+ private ControlTagCreationInternalEvent event;
+ private TestListener testListener;
+
+ @BeforeClass(groups = "slow")
+ public void setUpClass() throws Exception {
+ final ImmutableAccountData immutableAccountData = Mockito.mock(ImmutableAccountData.class);
+ Mockito.when(immutableAccountInternalApi.getImmutableAccountDataByRecordId(Mockito.<Long>eq(internalCallContext.getAccountRecordId()), Mockito.<InternalTenantContext>any())).thenReturn(immutableAccountData);
+ }
+
+ @BeforeMethod(groups = "slow")
+ public void setUp() throws Exception {
+ event = new DefaultControlTagCreationEvent(UUID.randomUUID(),
+ UUID.randomUUID(),
+ ObjectType.ACCOUNT,
+ new DefaultTagDefinition("name", "description", false),
+ internalCallContext.getAccountRecordId(),
+ internalCallContext.getTenantRecordId(),
+ UUID.randomUUID());
+
+ testListener = new TestListener();
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+ }
+
+ @AfterMethod(groups = "slow")
+ public void tearDown() throws Exception {
+ testListener.stop();
+ }
+
+ @Test(groups = "slow")
+ public void testFixUp() throws Exception {
+ testListener.throwRetryableException = true;
+
+ final DateTime startTime = clock.getUTCNow();
+ testListener.handleEvent(event);
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ List<NotificationEventWithMetadata> futureRetryableEvents = getFutureRetryableEvents();
+ Assert.assertEquals(futureRetryableEvents.size(), 1);
+ Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(0))), 0);
+
+ clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ futureRetryableEvents = getFutureRetryableEvents();
+ Assert.assertEquals(futureRetryableEvents.size(), 1);
+ Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(1))), 0);
+
+ testListener.throwRetryableException = false;
+
+ clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 1);
+ Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+ }
+
+ @Test(groups = "slow")
+ public void testGiveUp() throws Exception {
+ testListener.throwRetryableException = true;
+
+ final DateTime startTime = clock.getUTCNow();
+ testListener.handleEvent(event);
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ List<NotificationEventWithMetadata> futureRetryableEvents = getFutureRetryableEvents();
+ Assert.assertEquals(futureRetryableEvents.size(), 1);
+ Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(0))), 0);
+
+ clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ futureRetryableEvents = getFutureRetryableEvents();
+ Assert.assertEquals(futureRetryableEvents.size(), 1);
+ Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(1))), 0);
+
+ clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ // Give up
+ Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+ }
+
+ @Test(groups = "slow")
+ public void testNotRetryableException() throws Exception {
+ testListener.throwOtherException = true;
+
+ try {
+ testListener.handleEvent(event);
+ Assert.fail("Expected exception");
+ } catch (final IllegalArgumentException e) {
+ Assert.assertTrue(true);
+ }
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+ }
+
+ private List<NotificationEventWithMetadata> getFutureRetryableEvents() throws NoSuchNotificationQueue {
+ final NotificationQueue notificationQueue = queueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, TEST_LISTENER);
+ return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+ }
+
+ private final class TestListener extends RetryableService {
+
+ private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
+ private final Collection<BusInternalEvent> eventsSeen = new LinkedList<BusInternalEvent>();
+
+ private final RetryableSubscriber retryableSubscriber;
+
+ private boolean throwRetryableException = false;
+ private boolean throwOtherException = false;
+
+ public TestListener() {
+ super(queueService);
+
+ subscriberQueueHandler.subscribe(ControlTagDeletionInternalEvent.class,
+ new SubscriberAction<ControlTagDeletionInternalEvent>() {
+ @Override
+ public void run(final ControlTagDeletionInternalEvent event) {
+ Assert.fail("No handler registered");
+ }
+ });
+ subscriberQueueHandler.subscribe(ControlTagCreationInternalEvent.class,
+ new SubscriberAction<ControlTagCreationInternalEvent>() {
+ @Override
+ public void run(final ControlTagCreationInternalEvent event) {
+ if (throwRetryableException) {
+ throw new InvoicePluginApiRetryException(RETRY_SCHEDULE);
+ } else if (throwOtherException) {
+ throw new IllegalArgumentException("EXPECTED");
+ } else {
+ eventsSeen.add(event);
+ }
+ }
+ });
+ this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler);
+
+ initialize(TEST_LISTENER, subscriberQueueHandler);
+ start();
+ }
+
+ public void handleEvent(final ControlTagCreationInternalEvent event) {
+ retryableSubscriber.handleEvent(event);
+ }
+ }
+}