killbill-aplcache
Changes
beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java 266(+266 -0)
catalog/src/main/resources/org/killbill/billing/catalog/migration/V20161220000000__unit_price_override.sql 15(+7 -8)
invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java 40(+27 -13)
pom.xml 2(+1 -1)
util/pom.xml 4(+4 -0)
Details
diff --git a/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java b/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java
new file mode 100644
index 0000000..1f54f9c
--- /dev/null
+++ b/api/src/main/java/org/killbill/billing/invoice/api/InvoiceListenerService.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.invoice.api;
+
+import org.killbill.billing.platform.api.KillbillService;
+
+public interface InvoiceListenerService extends KillbillService {
+}
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
index 606d7dc..fc49f94 100644
--- a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestPublicBus.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014 Groupon, Inc
- * Copyright 2014 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -27,10 +27,8 @@ import org.joda.time.DateTime;
import org.killbill.billing.DBTestingHelper;
import org.killbill.billing.account.api.Account;
import org.killbill.billing.api.TestApiListener.NextEvent;
-import org.killbill.billing.beatrix.extbus.DefaultBusExternalEvent;
import org.killbill.billing.callcontext.DefaultCallContext;
import org.killbill.billing.catalog.api.BillingPeriod;
-import org.killbill.billing.catalog.api.PriceListSet;
import org.killbill.billing.catalog.api.ProductCategory;
import org.killbill.billing.entitlement.api.DefaultEntitlement;
import org.killbill.billing.notification.plugin.api.ExtBusEvent;
@@ -46,11 +44,6 @@ import org.killbill.billing.util.callcontext.CallContext;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.UserType;
import org.killbill.billing.util.jackson.ObjectMapper;
-import org.killbill.billing.util.nodes.NodeCommand;
-import org.killbill.billing.util.nodes.NodeCommandMetadata;
-import org.killbill.billing.util.nodes.NodeCommandProperty;
-import org.killbill.billing.util.nodes.PluginNodeCommandMetadata;
-import org.killbill.billing.util.nodes.SystemNodeCommandType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -58,65 +51,30 @@ import org.testng.annotations.Test;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.Subscribe;
-import static org.awaitility.Awaitility.await;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
import static org.testng.Assert.assertNotNull;
public class TestPublicBus extends TestIntegrationBase {
- private PublicListener publicListener;
+ private static final ObjectMapper mapper = new ObjectMapper();
+ private PublicListener publicListener;
private AtomicInteger externalBusCount;
- private final ObjectMapper mapper = new ObjectMapper();
-
@Override
protected KillbillConfigSource getConfigSource() {
- ImmutableMap additionalProperties = new ImmutableMap.Builder()
- .put("org.killbill.billing.util.broadcast.rate", "500ms")
- .build();
+ final ImmutableMap<String, String> additionalProperties = new ImmutableMap.Builder<String, String>().put("org.killbill.billing.util.broadcast.rate", "500ms")
+ .build();
return getConfigSource("/beatrix.properties", additionalProperties);
}
-
- public class PublicListener {
-
- @Subscribe
- public void handleExternalEvents(final ExtBusEvent event) {
- log.info("GOT EXT EVENT " + event);
-
- if (event.getEventType() == ExtBusEventType.SUBSCRIPTION_CREATION ||
- event.getEventType() == ExtBusEventType.SUBSCRIPTION_CANCEL ||
- event.getEventType() == ExtBusEventType.SUBSCRIPTION_PHASE ||
- event.getEventType() == ExtBusEventType.SUBSCRIPTION_CHANGE ||
- event.getEventType() == ExtBusEventType.SUBSCRIPTION_UNCANCEL ||
- event.getEventType() == ExtBusEventType.SUBSCRIPTION_BCD_CHANGE) {
- try {
- final SubscriptionMetadata obj = (SubscriptionMetadata) mapper.readValue(event.getMetaData(), SubscriptionMetadata.class);
- Assert.assertNotNull(obj.getBundleExternalKey());
- Assert.assertNotNull(obj.getActionType());
- } catch (JsonParseException e) {
- Assert.fail("Could not deserialize metada section", e);
- } catch (JsonMappingException e) {
- Assert.fail("Could not deserialize metada section", e);
- } catch (IOException e) {
- Assert.fail("Could not deserialize metada section", e);
- }
- }
-
- externalBusCount.incrementAndGet();
-
- }
- }
-
@Override
@BeforeMethod(groups = "slow")
public void beforeMethod() throws Exception {
-
/*
We copy the initialization instead of invoking the super method so we can add the registration
of the publicBus event;
@@ -162,7 +120,6 @@ public class TestPublicBus extends TestIntegrationBase {
@Test(groups = "slow")
public void testSimple() throws Exception {
-
final DateTime initialDate = new DateTime(2012, 2, 1, 0, 3, 42, 0, testTimeZone);
final int billingDay = 2;
@@ -189,7 +146,6 @@ public class TestPublicBus extends TestIntegrationBase {
addDaysAndCheckForCompletion(31, NextEvent.PHASE, NextEvent.INVOICE, NextEvent.PAYMENT, NextEvent.INVOICE_PAYMENT);
-
await().atMost(10, SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
@@ -197,12 +153,10 @@ public class TestPublicBus extends TestIntegrationBase {
return externalBusCount.get() == 10;
}
});
-
}
@Test(groups = "slow")
public void testTenantKVChange() throws Exception {
-
final TenantData tenantData = new DefaultTenant(null, clock.getUTCNow(), clock.getUTCNow(), "MY_TENANT", "key", "s3Cr3T");
final CallContext contextWithNoTenant = new DefaultCallContext(null, null, "loulou", CallOrigin.EXTERNAL, UserType.ADMIN, "no reason", "hum", UUID.randomUUID(), clock);
final Tenant tenant = tenantUserApi.createTenant(tenantData, contextWithNoTenant);
@@ -219,4 +173,34 @@ public class TestPublicBus extends TestIntegrationBase {
}
});
}
+
+ public class PublicListener {
+
+ @Subscribe
+ public void handleExternalEvents(final ExtBusEvent event) {
+ log.info("GOT EXT EVENT " + event);
+
+ if (event.getEventType() == ExtBusEventType.SUBSCRIPTION_CREATION ||
+ event.getEventType() == ExtBusEventType.SUBSCRIPTION_CANCEL ||
+ event.getEventType() == ExtBusEventType.SUBSCRIPTION_PHASE ||
+ event.getEventType() == ExtBusEventType.SUBSCRIPTION_CHANGE ||
+ event.getEventType() == ExtBusEventType.SUBSCRIPTION_UNCANCEL ||
+ event.getEventType() == ExtBusEventType.SUBSCRIPTION_BCD_CHANGE) {
+ try {
+ final SubscriptionMetadata obj = (SubscriptionMetadata) mapper.readValue(event.getMetaData(), SubscriptionMetadata.class);
+ Assert.assertNotNull(obj.getBundleExternalKey());
+ Assert.assertNotNull(obj.getActionType());
+ } catch (final JsonParseException e) {
+ Assert.fail("Could not deserialize metada section", e);
+ } catch (final JsonMappingException e) {
+ Assert.fail("Could not deserialize metada section", e);
+ } catch (final IOException e) {
+ Assert.fail("Could not deserialize metada section", e);
+ }
+ }
+
+ externalBusCount.incrementAndGet();
+
+ }
+ }
}
diff --git a/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java
new file mode 100644
index 0000000..03dcc30
--- /dev/null
+++ b/beatrix/src/test/java/org/killbill/billing/beatrix/integration/TestWithInvoicePlugin.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.beatrix.integration;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.awaitility.Awaitility;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.LocalDate;
+import org.killbill.billing.account.api.Account;
+import org.killbill.billing.account.api.AccountData;
+import org.killbill.billing.api.TestApiListener.NextEvent;
+import org.killbill.billing.beatrix.util.InvoiceChecker.ExpectedInvoiceItemCheck;
+import org.killbill.billing.catalog.api.BillingPeriod;
+import org.killbill.billing.catalog.api.ProductCategory;
+import org.killbill.billing.entitlement.api.DefaultEntitlement;
+import org.killbill.billing.invoice.api.DefaultInvoiceService;
+import org.killbill.billing.invoice.api.Invoice;
+import org.killbill.billing.invoice.api.InvoiceItem;
+import org.killbill.billing.invoice.api.InvoiceItemType;
+import org.killbill.billing.invoice.model.TaxInvoiceItem;
+import org.killbill.billing.invoice.notification.DefaultNextBillingDateNotifier;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApi;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApiRetryException;
+import org.killbill.billing.osgi.api.OSGIServiceDescriptor;
+import org.killbill.billing.osgi.api.OSGIServiceRegistration;
+import org.killbill.billing.payment.api.PluginProperty;
+import org.killbill.billing.util.callcontext.CallContext;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryNotificationEvent;
+import org.killbill.queue.retry.RetryableService;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import static org.testng.Assert.assertEquals;
+
+public class TestWithInvoicePlugin extends TestIntegrationBase {
+
+ @Inject
+ private OSGIServiceRegistration<InvoicePluginApi> pluginRegistry;
+
+ @Inject
+ private NotificationQueueService notificationQueueService;
+
+ private TestInvoicePluginApi testInvoicePluginApi;
+
+ @BeforeClass(groups = "slow")
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+
+ this.testInvoicePluginApi = new TestInvoicePluginApi();
+ pluginRegistry.registerService(new OSGIServiceDescriptor() {
+ @Override
+ public String getPluginSymbolicName() {
+ return "TestInvoicePluginApi";
+ }
+
+ @Override
+ public String getPluginName() {
+ return "TestInvoicePluginApi";
+ }
+
+ @Override
+ public String getRegistrationName() {
+ return "TestInvoicePluginApi";
+ }
+ }, testInvoicePluginApi);
+ }
+
+ @Test(groups = "slow")
+ public void testWithRetries() throws Exception {
+ // We take april as it has 30 days (easier to play with BCD)
+ // Set clock to the initial start date - we implicitly assume here that the account timezone is UTC
+ clock.setDay(new LocalDate(2012, 4, 1));
+
+ final AccountData accountData = getAccountData(1);
+ final Account account = createAccountWithNonOsgiPaymentMethod(accountData);
+ accountChecker.checkAccount(account.getId(), accountData, callContext);
+
+ // Make invoice plugin fail
+ testInvoicePluginApi.shouldThrowException = true;
+
+ // Create original subscription (Trial PHASE)
+ final DefaultEntitlement bpSubscription = createBaseEntitlementAndCheckForCompletion(account.getId(), "bundleKey", "Pistol", ProductCategory.BASE, BillingPeriod.MONTHLY, NextEvent.CREATE, NextEvent.BLOCK);
+ subscriptionChecker.checkSubscriptionCreated(bpSubscription.getId(), internalCallContext);
+ // Invoice failed to generate
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 0);
+
+ // Verify bus event has moved to the retry service (can't easily check the timestamp unfortunately)
+ // No future notification at this point (FIXED item, the PHASE event is the trigger for the next one)
+ checkRetryBusEvents(1, 0);
+
+ // Add 5'
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ checkRetryBusEvents(2, 0);
+
+ // Fix invoice plugin
+ testInvoicePluginApi.shouldThrowException = false;
+
+ busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+ clock.addDeltaFromReality(10 * 60 * 1000);
+ assertListenerStatus();
+ // No notification in the main queue at this point (the PHASE event is the trigger for the next one)
+ checkNotificationsNoRetry(0);
+
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 1);
+ invoiceChecker.checkInvoice(account.getId(),
+ 1,
+ callContext,
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 4, 1), null, InvoiceItemType.FIXED, BigDecimal.ZERO),
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 4, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+ busHandler.pushExpectedEvents(NextEvent.PHASE, NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+ clock.setDay(new LocalDate("2012-05-01"));
+ assertListenerStatus();
+ checkNotificationsNoRetry(1);
+
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 2);
+ invoiceChecker.checkInvoice(account.getId(),
+ 2,
+ callContext,
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 5, 1), new LocalDate(2012, 6, 1), InvoiceItemType.RECURRING, new BigDecimal("29.95")),
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 5, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+ // Make invoice plugin fail again
+ testInvoicePluginApi.shouldThrowException = true;
+
+ clock.addMonths(1);
+ assertListenerStatus();
+
+ // Invoice failed to generate
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 2);
+
+ // Verify notification has moved to the retry service
+ checkRetryNotifications("2012-06-01T00:05:00", 1);
+
+ // Add 5'
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ // Verify there are no notification duplicates
+ checkRetryNotifications("2012-06-01T00:15:00", 1);
+
+ // Fix invoice plugin
+ testInvoicePluginApi.shouldThrowException = false;
+
+ busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.INVOICE_PAYMENT, NextEvent.PAYMENT);
+ clock.addDeltaFromReality(10 * 60 * 1000);
+ assertListenerStatus();
+ checkNotificationsNoRetry(1);
+
+ // Invoice was generated
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 3);
+ invoiceChecker.checkInvoice(account.getId(),
+ 3,
+ callContext,
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 1), new LocalDate(2012, 7, 1), InvoiceItemType.RECURRING, new BigDecimal("29.95")),
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 1), null, InvoiceItemType.TAX, new BigDecimal("1.0")));
+
+ // Make invoice plugin fail again
+ testInvoicePluginApi.shouldThrowException = true;
+
+ clock.setTime(new DateTime("2012-07-01T00:00:00"));
+ assertListenerStatus();
+
+ // Invoice failed to generate
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 3);
+
+ // Verify notification has moved to the retry service
+ checkRetryNotifications("2012-07-01T00:05:00", 1);
+
+ testInvoicePluginApi.shouldThrowException = false;
+
+ busHandler.pushExpectedEvents(NextEvent.INVOICE, NextEvent.PAYMENT, NextEvent.INVOICE_PAYMENT);
+ clock.addDeltaFromReality(5 * 60 * 1000);
+ assertListenerStatus();
+ checkNotificationsNoRetry(1);
+
+ assertEquals(invoiceUserApi.getInvoicesByAccount(account.getId(), false, callContext).size(), 4);
+ }
+
+ private void checkRetryBusEvents(final int retryNb, final int expectedFutureInvoiceNotifications) throws NoSuchNotificationQueue {
+ // Verify notification(s) moved to the retry queue
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ final List<NotificationEventWithMetadata> futureInvoiceRetryableBusEvents = getFutureInvoiceRetryableBusEvents();
+ return futureInvoiceRetryableBusEvents.size() == 1 && ((RetryNotificationEvent) futureInvoiceRetryableBusEvents.get(0).getEvent()).getRetryNb() == retryNb;
+ }
+ });
+ assertEquals(getFutureInvoiceNotifications().size(), expectedFutureInvoiceNotifications);
+ }
+
+ private void checkRetryNotifications(final String retryDateTime, final int expectedFutureInvoiceNotifications) throws NoSuchNotificationQueue {
+ // Verify notification(s) moved to the retry queue
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ final List<NotificationEventWithMetadata> futureInvoiceRetryableNotifications = getFutureInvoiceRetryableNotifications();
+ return futureInvoiceRetryableNotifications.size() == 1 && futureInvoiceRetryableNotifications.get(0).getEffectiveDate().compareTo(new DateTime(retryDateTime, DateTimeZone.UTC)) == 0;
+ }
+ });
+ assertEquals(getFutureInvoiceNotifications().size(), expectedFutureInvoiceNotifications);
+ }
+
+ private void checkNotificationsNoRetry(final int main) throws NoSuchNotificationQueue {
+ assertEquals(getFutureInvoiceRetryableNotifications().size(), 0);
+ assertEquals(getFutureInvoiceNotifications().size(), main);
+ }
+
+ private List<NotificationEventWithMetadata> getFutureInvoiceNotifications() throws NoSuchNotificationQueue {
+ final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME, DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
+ return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+ }
+
+ private List<NotificationEventWithMetadata> getFutureInvoiceRetryableNotifications() throws NoSuchNotificationQueue {
+ final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
+ return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+ }
+
+ private List<NotificationEventWithMetadata> getFutureInvoiceRetryableBusEvents() throws NoSuchNotificationQueue {
+ final NotificationQueue notificationQueue = notificationQueueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, "invoice-listener");
+ return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+ }
+
+ public class TestInvoicePluginApi implements InvoicePluginApi {
+
+ boolean shouldThrowException = false;
+
+ @Override
+ public List<InvoiceItem> getAdditionalInvoiceItems(final Invoice invoice, final boolean isDryRun, final Iterable<PluginProperty> pluginProperties, final CallContext callContext) {
+ if (shouldThrowException) {
+ throw new InvoicePluginApiRetryException();
+ }
+ return ImmutableList.<InvoiceItem>of(createTaxInvoiceItem(invoice));
+ }
+
+ private InvoiceItem createTaxInvoiceItem(final Invoice invoice) {
+ return new TaxInvoiceItem(invoice.getId(), invoice.getAccountId(), null, "Tax Item", clock.getUTCNow().toLocalDate(), BigDecimal.ONE, invoice.getCurrency());
+ }
+ }
+}
diff --git a/catalog/src/main/resources/org/killbill/billing/catalog/ddl.sql b/catalog/src/main/resources/org/killbill/billing/catalog/ddl.sql
index 0bbd08d..dcae41e 100644
--- a/catalog/src/main/resources/org/killbill/billing/catalog/ddl.sql
+++ b/catalog/src/main/resources/org/killbill/billing/catalog/ddl.sql
@@ -32,7 +32,7 @@ CREATE INDEX catalog_override_phase_definition_idx ON catalog_override_phase_def
DROP TABLE IF EXISTS catalog_override_plan_phase;
CREATE TABLE catalog_override_plan_phase (
record_id serial unique,
- phase_number smallint /*! unsigned */ NOT NULL,
+ phase_number int /*! unsigned */ NOT NULL,
phase_def_record_id bigint /*! unsigned */ not null,
target_plan_def_record_id bigint /*! unsigned */ not null,
created_date datetime NOT NULL,
@@ -80,8 +80,8 @@ create table catalog_override_block_definition
(
record_id serial unique,
parent_unit_name varchar(255) NOT NULL,
-size double NOT NULL,
-max double NULL,
+size decimal(15,9) NOT NULL,
+max decimal(15,9) NULL,
currency varchar(3) NOT NULL,
price decimal(15,9) NOT NULL,
effective_date datetime NOT NULL,
@@ -97,7 +97,7 @@ DROP TABLE IF EXISTS catalog_override_phase_usage;
create table catalog_override_phase_usage
(
record_id serial unique,
-usage_number smallint(5) unsigned,
+usage_number int /*! unsigned */,
usage_def_record_id bigint /*! unsigned */ not null,
target_phase_def_record_id bigint /*! unsigned */ not null,
created_date datetime NOT NULL,
@@ -111,7 +111,7 @@ DROP TABLE IF EXISTS catalog_override_usage_tier;
create table catalog_override_usage_tier
(
record_id serial unique,
-tier_number smallint(5) unsigned,
+tier_number int /*! unsigned */,
tier_def_record_id bigint /*! unsigned */ not null,
target_usage_def_record_id bigint /*! unsigned */ not null,
created_date datetime NOT NULL,
@@ -126,7 +126,7 @@ DROP TABLE IF EXISTS catalog_override_tier_block;
create table catalog_override_tier_block
(
record_id serial unique,
-block_number smallint(5) unsigned,
+block_number int /*! unsigned */,
block_def_record_id bigint /*! unsigned */ not null,
target_tier_def_record_id bigint /*! unsigned */ not null,
created_date datetime NOT NULL,
diff --git a/catalog/src/main/resources/org/killbill/billing/catalog/migration/V20161220000000__unit_price_override.sql b/catalog/src/main/resources/org/killbill/billing/catalog/migration/V20161220000000__unit_price_override.sql
index 85f5589..0abe200 100644
--- a/catalog/src/main/resources/org/killbill/billing/catalog/migration/V20161220000000__unit_price_override.sql
+++ b/catalog/src/main/resources/org/killbill/billing/catalog/migration/V20161220000000__unit_price_override.sql
@@ -15,6 +15,7 @@ PRIMARY KEY(record_id)
);
CREATE INDEX catalog_override_usage_definition_idx ON catalog_override_usage_definition(tenant_record_id, parent_usage_name, currency);
+
DROP TABLE IF EXISTS catalog_override_tier_definition;
create table catalog_override_tier_definition
(
@@ -35,8 +36,8 @@ create table catalog_override_block_definition
(
record_id serial unique,
parent_unit_name varchar(255) NOT NULL,
-size double NOT NULL,
-max double NULL,
+size decimal(15,9) NOT NULL,
+max decimal(15,9) NULL,
currency varchar(3) NOT NULL,
price decimal(15,9) NOT NULL,
effective_date datetime NOT NULL,
@@ -52,7 +53,7 @@ DROP TABLE IF EXISTS catalog_override_phase_usage;
create table catalog_override_phase_usage
(
record_id serial unique,
-usage_number smallint(5) unsigned,
+usage_number int /*! unsigned */,
usage_def_record_id bigint /*! unsigned */ not null,
target_phase_def_record_id bigint /*! unsigned */ not null,
created_date datetime NOT NULL,
@@ -66,7 +67,7 @@ DROP TABLE IF EXISTS catalog_override_usage_tier;
create table catalog_override_usage_tier
(
record_id serial unique,
-tier_number smallint(5) unsigned,
+tier_number int /*! unsigned */,
tier_def_record_id bigint /*! unsigned */ not null,
target_usage_def_record_id bigint /*! unsigned */ not null,
created_date datetime NOT NULL,
@@ -81,7 +82,7 @@ DROP TABLE IF EXISTS catalog_override_tier_block;
create table catalog_override_tier_block
(
record_id serial unique,
-block_number smallint(5) unsigned,
+block_number int /*! unsigned */,
block_def_record_id bigint /*! unsigned */ not null,
target_tier_def_record_id bigint /*! unsigned */ not null,
created_date datetime NOT NULL,
@@ -89,6 +90,4 @@ created_by varchar(50) NOT NULL,
tenant_record_id bigint /*! unsigned */ NOT NULL default 0,
PRIMARY KEY(record_id)
);
-CREATE INDEX catalog_override_tier_block_idx ON catalog_override_tier_block(tenant_record_id, block_number, block_def_record_id);
-
-
+CREATE INDEX catalog_override_tier_block_idx ON catalog_override_tier_block(tenant_record_id, block_number, block_def_record_id);
\ No newline at end of file
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java b/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
index 2fc4a1f..eabea2e 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/glue/DefaultInvoiceModule.java
@@ -26,6 +26,7 @@ import org.killbill.billing.invoice.ParkedAccountsManager;
import org.killbill.billing.invoice.api.DefaultInvoiceService;
import org.killbill.billing.invoice.api.InvoiceApiHelper;
import org.killbill.billing.invoice.api.InvoiceInternalApi;
+import org.killbill.billing.invoice.api.InvoiceListenerService;
import org.killbill.billing.invoice.api.InvoicePaymentApi;
import org.killbill.billing.invoice.api.InvoiceService;
import org.killbill.billing.invoice.api.InvoiceUserApi;
@@ -99,8 +100,9 @@ public class DefaultInvoiceModule extends KillBillModule implements InvoiceModul
bind(InvoiceConfig.class).to(MultiTenantInvoiceConfig.class).asEagerSingleton();
}
- protected void installInvoiceService() {
+ protected void installInvoiceServices() {
bind(InvoiceService.class).to(DefaultInvoiceService.class).asEagerSingleton();
+ bind(InvoiceListenerService.class).to(InvoiceListener.class).asEagerSingleton();
}
protected void installResourceBundleFactory() {
@@ -143,7 +145,7 @@ public class DefaultInvoiceModule extends KillBillModule implements InvoiceModul
installConfig();
installInvoicePluginApi();
- installInvoiceService();
+ installInvoiceServices();
installNotifiers();
installInvoiceDispatcher();
installInvoiceListener();
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
index 2d547ac..d55ebac 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceDispatcher.java
@@ -364,54 +364,38 @@ public class InvoiceDispatcher {
private Invoice processAccountWithLockAndInputTargetDate(final UUID accountId, final LocalDate targetDate,
final BillingEventSet billingEvents, final boolean isDryRun, final InternalCallContext context) throws InvoiceApiException {
+ final ImmutableAccountData account;
try {
- final ImmutableAccountData account = accountApi.getImmutableAccountDataById(accountId, context);
-
- final List<Invoice> invoices = billingEvents.isAccountAutoInvoiceOff() ?
- ImmutableList.<Invoice>of() :
- ImmutableList.<Invoice>copyOf(Collections2.transform(invoiceDao.getInvoicesByAccount(context),
- new Function<InvoiceModelDao, Invoice>() {
- @Override
- public Invoice apply(final InvoiceModelDao input) {
- return new DefaultInvoice(input);
- }
- }));
-
- final Currency targetCurrency = account.getCurrency();
-
- final UUID targetInvoiceId;
- if (billingEvents.isAccountAutoInvoiceReuseDraft()) {
- final InvoiceModelDao earliestDraftInvoice = invoiceDao.getEarliestDraftInvoiceByAccount(context);
- targetInvoiceId = earliestDraftInvoice != null ? earliestDraftInvoice.getId() : null;
- } else {
- targetInvoiceId = null;
- }
-
- final InvoiceWithMetadata invoiceWithMetadata = generator.generateInvoice(account, billingEvents, invoices, targetInvoiceId, targetDate, targetCurrency, context);
- final DefaultInvoice invoice = invoiceWithMetadata.getInvoice();
+ account = accountApi.getImmutableAccountDataById(accountId, context);
+ } catch (final AccountApiException e) {
+ log.error("Unable to generate invoice for accountId='{}', a future notification has NOT been recorded", accountId, e);
+ return null;
+ }
- // Compute future notifications
- final FutureAccountNotifications futureAccountNotifications = createNextFutureNotificationDate(invoiceWithMetadata, context);
+ final InvoiceWithMetadata invoiceWithMetadata = generateKillBillInvoice(account, targetDate, billingEvents, context);
+ final DefaultInvoice invoice = invoiceWithMetadata.getInvoice();
- //
+ // Compute future notifications
+ final FutureAccountNotifications futureAccountNotifications = createNextFutureNotificationDate(invoiceWithMetadata, context);
- // If invoice comes back null, there is nothing new to generate, we can bail early
- //
- if (invoice == null) {
- if (isDryRun) {
- log.info("Generated null dryRun invoice for accountId='{}', targetDate='{}'", accountId, targetDate);
- } else {
- log.info("Generated null invoice for accountId='{}', targetDate='{}'", accountId, targetDate);
+ // If invoice comes back null, there is nothing new to generate, we can bail early
+ if (invoice == null) {
+ if (isDryRun) {
+ log.info("Generated null dryRun invoice for accountId='{}', targetDate='{}'", accountId, targetDate);
+ } else {
+ log.info("Generated null invoice for accountId='{}', targetDate='{}'", accountId, targetDate);
- final BusInternalEvent event = new DefaultNullInvoiceEvent(accountId, clock.getUTCToday(),
- context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken());
+ final BusInternalEvent event = new DefaultNullInvoiceEvent(accountId, clock.getUTCToday(),
+ context.getAccountRecordId(), context.getTenantRecordId(), context.getUserToken());
- commitInvoiceAndSetFutureNotifications(account, null, futureAccountNotifications, context);
- postEvent(event);
- }
- return null;
+ commitInvoiceAndSetFutureNotifications(account, null, futureAccountNotifications, context);
+ postEvent(event);
}
+ return null;
+ }
+ boolean success = false;
+ try {
// Generate missing credit (> 0 for generation and < 0 for use) prior we call the plugin
final InvoiceItem cbaItemPreInvoicePlugins = computeCBAOnExistingInvoice(invoice, context);
DefaultInvoice tmpInvoiceForInvoicePlugins = invoice;
@@ -453,21 +437,47 @@ public class InvoiceDispatcher {
invoiceModelDao.addInvoiceItems(invoiceItemModelDaos);
// Commit invoice on disk
- final boolean isThereAnyItemsLeft = commitInvoiceAndSetFutureNotifications(account, invoiceModelDao, futureAccountNotifications, context);
+ commitInvoiceAndSetFutureNotifications(account, invoiceModelDao, futureAccountNotifications, context);
+ success = true;
+
+ try {
+ setChargedThroughDates(invoice.getInvoiceItems(FixedPriceInvoiceItem.class), invoice.getInvoiceItems(RecurringInvoiceItem.class), context);
+ } catch (final SubscriptionBaseApiException e) {
+ log.error("Failed handling SubscriptionBase change.", e);
+ return null;
+ }
+ }
+ } finally {
+ // Make sure we always set future notifications in case of errors
+ if (!isDryRun && !success) {
+ commitInvoiceAndSetFutureNotifications(account, null, futureAccountNotifications, context);
+ }
+ }
- final boolean isRealInvoiceWithNonEmptyItems = isThereAnyItemsLeft ? isRealInvoiceWithItems : false;
+ return invoice;
+ }
- setChargedThroughDates(invoice.getInvoiceItems(FixedPriceInvoiceItem.class), invoice.getInvoiceItems(RecurringInvoiceItem.class), context);
+ private InvoiceWithMetadata generateKillBillInvoice(final ImmutableAccountData account, final LocalDate targetDate, final BillingEventSet billingEvents, final InternalCallContext context) throws InvoiceApiException {
+ final List<Invoice> invoices = billingEvents.isAccountAutoInvoiceOff() ?
+ ImmutableList.<Invoice>of() :
+ ImmutableList.<Invoice>copyOf(Collections2.transform(invoiceDao.getInvoicesByAccount(context),
+ new Function<InvoiceModelDao, Invoice>() {
+ @Override
+ public Invoice apply(final InvoiceModelDao input) {
+ return new DefaultInvoice(input);
+ }
+ }));
+ final Currency targetCurrency = account.getCurrency();
- }
- return invoice;
- } catch (final AccountApiException e) {
- log.error("Failed handling SubscriptionBase change.", e);
- return null;
- } catch (final SubscriptionBaseApiException e) {
- log.error("Failed handling SubscriptionBase change.", e);
- return null;
+ final UUID targetInvoiceId;
+ if (billingEvents.isAccountAutoInvoiceReuseDraft()) {
+ final InvoiceModelDao earliestDraftInvoice = invoiceDao.getEarliestDraftInvoiceByAccount(context);
+ targetInvoiceId = earliestDraftInvoice != null ? earliestDraftInvoice.getId() : null;
+ } else {
+ targetInvoiceId = null;
}
+
+ return generator.generateInvoice(account, billingEvents, invoices, targetInvoiceId, targetDate, targetCurrency, context);
}
private FutureAccountNotifications createNextFutureNotificationDate(final InvoiceWithMetadata invoiceWithMetadata, final InternalCallContext context) {
@@ -549,17 +559,16 @@ public class InvoiceDispatcher {
log.info(tmp.toString());
}
- private boolean commitInvoiceAndSetFutureNotifications(final ImmutableAccountData account,
- @Nullable final InvoiceModelDao invoiceModelDao,
- final FutureAccountNotifications futureAccountNotifications,
- final InternalCallContext context) throws SubscriptionBaseApiException, InvoiceApiException {
+ private void commitInvoiceAndSetFutureNotifications(final ImmutableAccountData account,
+ @Nullable final InvoiceModelDao invoiceModelDao,
+ final FutureAccountNotifications futureAccountNotifications,
+ final InternalCallContext context) {
final boolean isThereAnyItemsLeft = invoiceModelDao != null && !invoiceModelDao.getInvoiceItems().isEmpty();
if (isThereAnyItemsLeft) {
invoiceDao.createInvoice(invoiceModelDao, futureAccountNotifications, context);
} else {
invoiceDao.setFutureAccountNotificationsForEmptyInvoice(account.getId(), futureAccountNotifications, context);
}
- return isThereAnyItemsLeft;
}
private InvoiceItem computeCBAOnExistingInvoice(final Invoice invoice, final InternalCallContext context) throws InvoiceApiException {
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
index f81483d..bf7e879 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceListener.java
@@ -31,13 +31,21 @@ import org.killbill.billing.events.EffectiveSubscriptionInternalEvent;
import org.killbill.billing.events.InvoiceCreationInternalEvent;
import org.killbill.billing.invoice.api.InvoiceApiException;
import org.killbill.billing.invoice.api.InvoiceInternalApi;
+import org.killbill.billing.invoice.api.InvoiceListenerService;
import org.killbill.billing.invoice.api.user.DefaultInvoiceAdjustmentEvent;
+import org.killbill.billing.platform.api.LifecycleHandlerType;
+import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
import org.killbill.billing.subscription.api.SubscriptionBaseTransitionType;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.UserType;
-import org.killbill.billing.util.config.definition.InvoiceConfig;
import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.killbill.queue.retry.RetryableSubscriber;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberAction;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberQueueHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,78 +53,157 @@ import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
-public class InvoiceListener {
+@SuppressWarnings("TypeMayBeWeakened")
+public class InvoiceListener extends RetryableService implements InvoiceListenerService {
+
+ public static final String INVOICE_LISTENER_SERVICE_NAME = "invoice-listener-service";
private static final Logger log = LoggerFactory.getLogger(InvoiceListener.class);
private final InvoiceDispatcher dispatcher;
private final InternalCallContextFactory internalCallContextFactory;
- private final AccountInternalApi accountApi;
private final InvoiceInternalApi invoiceApi;
- private final InvoiceConfig invoiceConfig;
- private final Clock clock;
+ private final RetryableSubscriber retryableSubscriber;
+ private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
@Inject
- public InvoiceListener(final AccountInternalApi accountApi, final Clock clock, final InternalCallContextFactory internalCallContextFactory,
- final InvoiceConfig invoiceConfig, final InvoiceDispatcher dispatcher, InvoiceInternalApi invoiceApi) {
- this.accountApi = accountApi;
+ public InvoiceListener(final AccountInternalApi accountApi,
+ final InternalCallContextFactory internalCallContextFactory,
+ final InvoiceDispatcher dispatcher,
+ final InvoiceInternalApi invoiceApi,
+ final NotificationQueueService notificationQueueService,
+ final Clock clock) {
+ super(notificationQueueService);
this.dispatcher = dispatcher;
- this.invoiceConfig = invoiceConfig;
this.internalCallContextFactory = internalCallContextFactory;
- this.clock = clock;
this.invoiceApi = invoiceApi;
+
+ subscriberQueueHandler.subscribe(EffectiveSubscriptionInternalEvent.class,
+ new SubscriberAction<EffectiveSubscriptionInternalEvent>() {
+ @Override
+ public void run(final EffectiveSubscriptionInternalEvent event) {
+ try {
+ // Skip future uncancel event
+ // Skip events which are marked as not being the last one
+ if (event.getTransitionType() == SubscriptionBaseTransitionType.UNCANCEL ||
+ event.getRemainingEventsForUserOperation() > 0) {
+ return;
+ }
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ dispatcher.processSubscriptionForInvoiceGeneration(event, context);
+ } catch (final InvoiceApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ }
+ }
+ });
+ subscriberQueueHandler.subscribe(BlockingTransitionInternalEvent.class,
+ new SubscriberAction<BlockingTransitionInternalEvent>() {
+ @Override
+ public void run(final BlockingTransitionInternalEvent event) {
+ // We are only interested in blockBilling or unblockBilling transitions.
+ if (!event.isTransitionedToUnblockedBilling() && !event.isTransitionedToBlockedBilling()) {
+ return;
+ }
+
+ try {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ final UUID accountId = accountApi.getByRecordId(event.getSearchKey1(), context);
+ dispatcher.processAccountFromNotificationOrBusEvent(accountId, null, null, context);
+ } catch (final InvoiceApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ } catch (final AccountApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ }
+ }
+ });
+ subscriberQueueHandler.subscribe(InvoiceCreationInternalEvent.class,
+ new SubscriberAction<InvoiceCreationInternalEvent>() {
+ @Override
+ public void run(final InvoiceCreationInternalEvent event) {
+ try {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "CreateParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ final Account account = accountApi.getAccountById(event.getAccountId(), context);
+
+ // catch children invoices and populate the parent summary invoice
+ if (isChildrenAccountAndPaymentDelegated(account)) {
+ dispatcher.processParentInvoiceForInvoiceGeneration(account, event.getInvoiceId(), context);
+ }
+
+ } catch (final InvoiceApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ } catch (final AccountApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ }
+ }
+ });
+ subscriberQueueHandler.subscribe(DefaultInvoiceAdjustmentEvent.class,
+ new SubscriberAction<DefaultInvoiceAdjustmentEvent>() {
+ @Override
+ public void run(final DefaultInvoiceAdjustmentEvent event) {
+ try {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "AdjustParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ final Account account = accountApi.getAccountById(event.getAccountId(), context);
+
+ // catch children invoices and populate the parent summary invoice
+ if (isChildrenAccountAndPaymentDelegated(account)) {
+ dispatcher.processParentInvoiceForAdjustments(account, event.getInvoiceId(), context);
+ }
+ } catch (final InvoiceApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ } catch (final AccountApiException e) {
+ log.warn("Unable to process event {}", event, e);
+ }
+ }
+ });
+ this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler);
+ }
+
+ @Override
+ public String getName() {
+ return INVOICE_LISTENER_SERVICE_NAME;
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+ public void initialize() {
+ super.initialize("invoice-listener", subscriberQueueHandler);
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+ public void start() {
+ super.start();
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+ public void stop() throws NoSuchNotificationQueue {
+ super.stop();
}
@AllowConcurrentEvents
@Subscribe
public void handleSubscriptionTransition(final EffectiveSubscriptionInternalEvent event) {
- try {
- // Skip future uncancel event
- // Skip events which are marked as not being the last one
- if (event.getTransitionType() == SubscriptionBaseTransitionType.UNCANCEL ||
- event.getRemainingEventsForUserOperation() > 0) {
- return;
- }
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- dispatcher.processSubscriptionForInvoiceGeneration(event, context);
- } catch (InvoiceApiException e) {
- log.warn("Unable to process event {}", event, e);
- }
+ retryableSubscriber.handleEvent(event);
}
@AllowConcurrentEvents
@Subscribe
public void handleBlockingStateTransition(final BlockingTransitionInternalEvent event) {
- // We are only interested in blockBilling or unblockBilling transitions.
- if (!event.isTransitionedToUnblockedBilling() && !event.isTransitionedToBlockedBilling()) {
- return;
- }
-
- try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "SubscriptionBaseTransition", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- final UUID accountId = accountApi.getByRecordId(event.getSearchKey1(), context);
- dispatcher.processAccountFromNotificationOrBusEvent(accountId, null, null, context);
- } catch (InvoiceApiException e) {
- log.warn("Unable to process event {}", event, e);
- } catch (AccountApiException e) {
- log.warn("Unable to process event {}", event, e);
- }
+ retryableSubscriber.handleEvent(event);
}
public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
dispatcher.processSubscriptionForInvoiceGeneration(subscriptionId, context.toLocalDate(eventDateTime), context);
- } catch (InvoiceApiException e) {
+ } catch (final InvoiceApiException e) {
log.warn("Unable to process subscriptionId='{}', eventDateTime='{}'", subscriptionId, eventDateTime, e);
}
}
public void handleEventForInvoiceNotification(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
dispatcher.processSubscriptionForInvoiceNotification(subscriptionId, context.toLocalDate(eventDateTime), context);
- } catch (InvoiceApiException e) {
+ } catch (final InvoiceApiException e) {
log.warn("Unable to process subscriptionId='{}', eventDateTime='{}'", subscriptionId, eventDateTime, e);
}
}
@@ -124,21 +211,7 @@ public class InvoiceListener {
@AllowConcurrentEvents
@Subscribe
public void handleChildrenInvoiceCreationEvent(final InvoiceCreationInternalEvent event) {
-
- try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "CreateParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- final Account account = accountApi.getAccountById(event.getAccountId(), context);
-
- // catch children invoices and populate the parent summary invoice
- if (isChildrenAccountAndPaymentDelegated(account)) {
- dispatcher.processParentInvoiceForInvoiceGeneration(account, event.getInvoiceId(), context);
- }
-
- } catch (InvoiceApiException e) {
- log.error(e.getMessage());
- } catch (AccountApiException e) {
- log.error(e.getMessage());
- }
+ retryableSubscriber.handleEvent(event);
}
private boolean isChildrenAccountAndPaymentDelegated(final Account account) {
@@ -149,7 +222,7 @@ public class InvoiceListener {
try {
final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Commit Invoice", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
invoiceApi.commitInvoice(invoiceId, context);
- } catch (InvoiceApiException e) {
+ } catch (final InvoiceApiException e) {
// In case we commit parent invoice earlier we expect to see an INVOICE_INVALID_STATUS status
if (ErrorCode.INVOICE_INVALID_STATUS.getCode() != e.getCode()) {
log.error(e.getMessage());
@@ -160,21 +233,6 @@ public class InvoiceListener {
@AllowConcurrentEvents
@Subscribe
public void handleChildrenInvoiceAdjustmentEvent(final DefaultInvoiceAdjustmentEvent event) {
-
- try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "AdjustParentInvoice", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- final Account account = accountApi.getAccountById(event.getAccountId(), context);
-
- // catch children invoices and populate the parent summary invoice
- if (isChildrenAccountAndPaymentDelegated(account)) {
- dispatcher.processParentInvoiceForAdjustments(account, event.getInvoiceId(), context);
- }
-
- } catch (InvoiceApiException e) {
- log.error(e.getMessage());
- } catch (AccountApiException e) {
- log.error(e.getMessage());
- }
+ retryableSubscriber.handleEvent(event);
}
-
}
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
index ab9b982..afb7750 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/InvoiceTagHandler.java
@@ -1,7 +1,7 @@
/*
* Copyright 2010-2013 Ning, Inc.
- * Copyright 2014-2016 Groupon, Inc
- * Copyright 2014-2016 The Billing Project, LLC
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -24,10 +24,20 @@ import org.killbill.billing.ObjectType;
import org.killbill.billing.callcontext.InternalCallContext;
import org.killbill.billing.events.ControlTagDeletionInternalEvent;
import org.killbill.billing.invoice.api.InvoiceApiException;
+import org.killbill.billing.platform.api.KillbillService;
+import org.killbill.billing.platform.api.LifecycleHandlerType;
+import org.killbill.billing.platform.api.LifecycleHandlerType.LifecycleLevel;
import org.killbill.billing.util.callcontext.CallOrigin;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
import org.killbill.billing.util.callcontext.UserType;
import org.killbill.billing.util.tag.ControlTagType;
+import org.killbill.clock.Clock;
+import org.killbill.notificationq.api.NotificationQueueService;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.killbill.queue.retry.RetryableSubscriber;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberAction;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberQueueHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,28 +45,64 @@ import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
-public class InvoiceTagHandler {
+@SuppressWarnings("TypeMayBeWeakened")
+public class InvoiceTagHandler extends RetryableService implements KillbillService {
+
+ private static final String INVOICE_TAG_HANDLER_SERVICE_NAME = "invoice-tag-handler-service";
private static final Logger log = LoggerFactory.getLogger(InvoiceTagHandler.class);
private final InvoiceDispatcher dispatcher;
- private final InternalCallContextFactory internalCallContextFactory;
+ private final RetryableSubscriber retryableSubscriber;
+
+ private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
@Inject
- public InvoiceTagHandler(final InvoiceDispatcher dispatcher,
+ public InvoiceTagHandler(final Clock clock,
+ final InvoiceDispatcher dispatcher,
+ final NotificationQueueService notificationQueueService,
final InternalCallContextFactory internalCallContextFactory) {
+ super(notificationQueueService);
this.dispatcher = dispatcher;
- this.internalCallContextFactory = internalCallContextFactory;
+
+ final SubscriberAction<ControlTagDeletionInternalEvent> action = new SubscriberAction<ControlTagDeletionInternalEvent>() {
+ @Override
+ public void run(final ControlTagDeletionInternalEvent event) {
+ if (event.getTagDefinition().getName().equals(ControlTagType.AUTO_INVOICING_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
+ final UUID accountId = event.getObjectId();
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "InvoiceTagHandler", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ processUnpaid_AUTO_INVOICING_OFF_invoices(accountId, context);
+ }
+ }
+ };
+ subscriberQueueHandler.subscribe(ControlTagDeletionInternalEvent.class, action);
+ this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler);
+ }
+
+ @Override
+ public String getName() {
+ return INVOICE_TAG_HANDLER_SERVICE_NAME;
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
+ public void initialize() {
+ super.initialize("invoice-tag-handler", subscriberQueueHandler);
}
@AllowConcurrentEvents
@Subscribe
public void process_AUTO_INVOICING_OFF_removal(final ControlTagDeletionInternalEvent event) {
- if (event.getTagDefinition().getName().equals(ControlTagType.AUTO_INVOICING_OFF.toString()) && event.getObjectType() == ObjectType.ACCOUNT) {
- final UUID accountId = event.getObjectId();
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(event.getSearchKey2(), event.getSearchKey1(), "InvoiceTagHandler", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
- processUnpaid_AUTO_INVOICING_OFF_invoices(accountId, context);
- }
+ retryableSubscriber.handleEvent(event);
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.START_SERVICE)
+ public void start() {
+ super.start();
+ }
+
+ @LifecycleHandlerType(LifecycleLevel.STOP_SERVICE)
+ public void stop() throws NoSuchNotificationQueue {
+ super.stop();
}
private void processUnpaid_AUTO_INVOICING_OFF_invoices(final UUID accountId, final InternalCallContext context) {
diff --git a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 644cec3..85391a5 100644
--- a/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/org/killbill/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -1,7 +1,9 @@
/*
* Copyright 2010-2013 Ning, Inc.
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
*
- * Ning licenses this file to you under the Apache License, version 2.0
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
@@ -25,45 +27,50 @@ import org.killbill.billing.subscription.api.SubscriptionBase;
import org.killbill.billing.subscription.api.SubscriptionBaseInternalApi;
import org.killbill.billing.subscription.api.user.SubscriptionBaseApiException;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
-import org.killbill.billing.util.config.definition.InvoiceConfig;
+import org.killbill.clock.Clock;
import org.killbill.notificationq.api.NotificationEvent;
import org.killbill.notificationq.api.NotificationQueue;
import org.killbill.notificationq.api.NotificationQueueService;
import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueAlreadyExists;
import org.killbill.notificationq.api.NotificationQueueService.NotificationQueueHandler;
+import org.killbill.queue.retry.RetryableHandler;
+import org.killbill.queue.retry.RetryableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
-public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
-
- private static final Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+public class DefaultNextBillingDateNotifier extends RetryableService implements NextBillingDateNotifier {
public static final String NEXT_BILLING_DATE_NOTIFIER_QUEUE = "next-billing-date-queue";
+ private static final Logger log = LoggerFactory.getLogger(DefaultNextBillingDateNotifier.class);
+
+ private final Clock clock;
private final NotificationQueueService notificationQueueService;
private final SubscriptionBaseInternalApi subscriptionApi;
private final InvoiceListener listener;
- private final InternalCallContextFactory callContextFactory;
+ private final InternalCallContextFactory internalCallContextFactory;
private NotificationQueue nextBillingQueue;
@Inject
- public DefaultNextBillingDateNotifier(final NotificationQueueService notificationQueueService,
+ public DefaultNextBillingDateNotifier(final Clock clock,
+ final NotificationQueueService notificationQueueService,
final SubscriptionBaseInternalApi subscriptionApi,
final InvoiceListener listener,
- final InternalCallContextFactory callContextFactory) {
+ final InternalCallContextFactory internalCallContextFactory) {
+ super(notificationQueueService);
+ this.clock = clock;
this.notificationQueueService = notificationQueueService;
this.subscriptionApi = subscriptionApi;
this.listener = listener;
- this.callContextFactory = callContextFactory;
+ this.internalCallContextFactory = internalCallContextFactory;
}
@Override
public void initialize() throws NotificationQueueAlreadyExists {
-
final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
@Override
public void handleReadyNotification(final NotificationEvent notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
@@ -77,7 +84,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
// Just to ensure compatibility with json that might not have that targetDate field (old versions < 0.13.6)
final DateTime targetDate = key.getTargetDate() != null ? key.getTargetDate() : eventDate;
try {
- final SubscriptionBase subscription = subscriptionApi.getSubscriptionFromId(key.getUuidKey(), callContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId));
+ final SubscriptionBase subscription = subscriptionApi.getSubscriptionFromId(key.getUuidKey(), internalCallContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId));
if (subscription == null) {
log.warn("Unable to retrieve subscriptionId='{}' for event {}", key.getUuidKey(), key);
return;
@@ -88,19 +95,24 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
} else {
processEventForInvoiceGeneration(key.getUuidKey(), targetDate, userToken, accountRecordId, tenantRecordId);
}
- } catch (SubscriptionBaseApiException e) {
+ } catch (final SubscriptionBaseApiException e) {
log.warn("Error retrieving subscriptionId='{}'", key.getUuidKey(), e);
}
}
};
+ final NotificationQueueHandler retryableHandler = new RetryableHandler(clock, this, notificationQueueHandler);
nextBillingQueue = notificationQueueService.createNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
NEXT_BILLING_DATE_NOTIFIER_QUEUE,
- notificationQueueHandler);
+ retryableHandler);
+
+ super.initialize(nextBillingQueue, notificationQueueHandler);
}
@Override
public void start() {
+ super.start();
+
nextBillingQueue.startQueue();
}
@@ -110,6 +122,8 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
nextBillingQueue.stopQueue();
notificationQueueService.deleteNotificationQueue(nextBillingQueue.getServiceName(), nextBillingQueue.getQueueName());
}
+
+ super.stop();
}
private void processEventForInvoiceGeneration(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
diff --git a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
index 128616f..ef72b84 100644
--- a/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
+++ b/invoice/src/test/java/org/killbill/billing/invoice/TestInvoiceNotificationQListener.java
@@ -26,6 +26,7 @@ import org.killbill.billing.account.api.AccountInternalApi;
import org.killbill.billing.invoice.api.InvoiceInternalApi;
import org.killbill.clock.Clock;
import org.killbill.billing.util.callcontext.InternalCallContextFactory;
+import org.killbill.notificationq.api.NotificationQueueService;
public class TestInvoiceNotificationQListener extends InvoiceListener {
@@ -33,8 +34,13 @@ public class TestInvoiceNotificationQListener extends InvoiceListener {
UUID latestSubscriptionId = null;
@Inject
- public TestInvoiceNotificationQListener(final AccountInternalApi accountApi, final Clock clock, final InternalCallContextFactory internalCallContextFactory, final InvoiceDispatcher dispatcher, final InvoiceInternalApi invoiceApi) {
- super(accountApi, clock, internalCallContextFactory, null, dispatcher, invoiceApi);
+ public TestInvoiceNotificationQListener(final AccountInternalApi accountApi,
+ final Clock clock,
+ final InternalCallContextFactory internalCallContextFactory,
+ final InvoiceDispatcher dispatcher,
+ final InvoiceInternalApi invoiceApi,
+ final NotificationQueueService notificationQueueService) {
+ super(accountApi, internalCallContextFactory, dispatcher, invoiceApi, notificationQueueService, clock);
}
@Override
diff --git a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AccountResource.java b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AccountResource.java
index 8d30a76..6cbcebb 100644
--- a/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AccountResource.java
+++ b/jaxrs/src/main/java/org/killbill/billing/jaxrs/resources/AccountResource.java
@@ -821,6 +821,7 @@ public class AccountResource extends JaxRsResourceBase {
@PathParam("accountId") final String accountIdStr,
@QueryParam(QUERY_PAYMENT_METHOD_IS_DEFAULT) @DefaultValue("false") final Boolean isDefault,
@QueryParam(QUERY_PAY_ALL_UNPAID_INVOICES) @DefaultValue("false") final Boolean payAllUnpaidInvoices,
+ @QueryParam(QUERY_PAYMENT_CONTROL_PLUGIN_NAME) final List<String> paymentControlPluginNames,
@QueryParam(QUERY_PLUGIN_PROPERTY) final List<String> pluginPropertiesString,
@HeaderParam(HDR_CREATED_BY) final String createdBy,
@HeaderParam(HDR_REASON) final String reason,
@@ -845,7 +846,9 @@ public class AccountResource extends JaxRsResourceBase {
return Response.status(Status.BAD_REQUEST).build();
}
- final UUID paymentMethodId = paymentApi.addPaymentMethod(account, data.getExternalKey(), data.getPluginName(), isDefault, data.getPluginDetail(), pluginProperties, callContext);
+
+ final PaymentOptions paymentOptions = createControlPluginApiPaymentOptions(paymentControlPluginNames);
+ final UUID paymentMethodId = paymentApi.addPaymentMethodWithPaymentControl(account, data.getExternalKey(), data.getPluginName(), isDefault, data.getPluginDetail(), pluginProperties, paymentOptions, callContext);
if (payAllUnpaidInvoices && unpaidInvoices.size() > 0) {
for (final Invoice invoice : unpaidInvoices) {
createPurchaseForInvoice(account, invoice.getId(), invoice.getBalance(), paymentMethodId, false, null, null, pluginProperties, callContext);
pom.xml 2(+1 -1)
diff --git a/pom.xml b/pom.xml
index aa616f6..817b368 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>killbill-oss-parent</artifactId>
<groupId>org.kill-bill.billing</groupId>
- <version>0.141.1-SNAPSHOT</version>
+ <version>0.141.4</version>
</parent>
<artifactId>killbill</artifactId>
<version>0.19.0-SNAPSHOT</version>
util/pom.xml 4(+4 -0)
diff --git a/util/pom.xml b/util/pom.xml
index b6369da..8222bf9 100644
--- a/util/pom.xml
+++ b/util/pom.xml
@@ -210,6 +210,10 @@
</dependency>
<dependency>
<groupId>org.kill-bill.billing.plugin</groupId>
+ <artifactId>killbill-plugin-api-invoice</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.kill-bill.billing.plugin</groupId>
<artifactId>killbill-plugin-api-notification</artifactId>
</dependency>
<dependency>
diff --git a/util/src/main/resources/org/killbill/billing/util/ddl-postgresql.sql b/util/src/main/resources/org/killbill/billing/util/ddl-postgresql.sql
index 49d05f7..6fac566 100644
--- a/util/src/main/resources/org/killbill/billing/util/ddl-postgresql.sql
+++ b/util/src/main/resources/org/killbill/billing/util/ddl-postgresql.sql
@@ -45,3 +45,28 @@ CREATE OR REPLACE FUNCTION hour(ts TIMESTAMP WITH TIME ZONE) RETURNS INTEGER AS
RETURN result;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
+
+/* Alter 'serial' columns to 'bigint' because 'serial' is 32bit in PG while 64bit in MySQL */
+CREATE OR REPLACE FUNCTION update_serial_to_bigint_oncreate()
+ RETURNS event_trigger LANGUAGE plpgsql AS $$
+DECLARE
+ r record;
+ matches text[];
+BEGIN
+ FOR r IN SELECT * FROM pg_event_trigger_ddl_commands()
+ LOOP
+ SELECT regexp_matches(current_query(), E'\\m(\\w+)\\s+serial\\M') INTO matches;
+ IF r.object_type = 'table' AND array_length(matches, 1) > 0 THEN
+ RAISE NOTICE 'Altering % % column % from serial to bigint',
+ r.object_type,
+ r.object_identity,
+ matches[1];
+ EXECUTE 'ALTER TABLE ' || r.object_identity || ' ALTER COLUMN ' || matches[1] || ' TYPE bigint';
+ END IF;
+ END LOOP;
+END
+$$;
+
+CREATE EVENT TRIGGER update_serial_to_bigint_oncreate
+ ON ddl_command_end WHEN TAG IN ('CREATE TABLE')
+ EXECUTE PROCEDURE update_serial_to_bigint_oncreate();
diff --git a/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java b/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java
new file mode 100644
index 0000000..c33b881
--- /dev/null
+++ b/util/src/test/java/org/killbill/billing/util/listener/TestRetryableService.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2014-2017 Groupon, Inc
+ * Copyright 2014-2017 The Billing Project, LLC
+ *
+ * The Billing Project licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.killbill.billing.util.listener;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.killbill.billing.ObjectType;
+import org.killbill.billing.account.api.ImmutableAccountData;
+import org.killbill.billing.callcontext.InternalTenantContext;
+import org.killbill.billing.events.BusInternalEvent;
+import org.killbill.billing.events.ControlTagCreationInternalEvent;
+import org.killbill.billing.events.ControlTagDeletionInternalEvent;
+import org.killbill.billing.invoice.plugin.api.InvoicePluginApiRetryException;
+import org.killbill.billing.util.UtilTestSuiteWithEmbeddedDB;
+import org.killbill.billing.util.tag.DefaultTagDefinition;
+import org.killbill.billing.util.tag.api.user.DefaultControlTagCreationEvent;
+import org.killbill.notificationq.api.NotificationEventWithMetadata;
+import org.killbill.notificationq.api.NotificationQueue;
+import org.killbill.notificationq.api.NotificationQueueService.NoSuchNotificationQueue;
+import org.killbill.queue.retry.RetryableService;
+import org.killbill.queue.retry.RetryableSubscriber;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberAction;
+import org.killbill.queue.retry.RetryableSubscriber.SubscriberQueueHandler;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class TestRetryableService extends UtilTestSuiteWithEmbeddedDB {
+
+ private static final String TEST_LISTENER = "TestListener";
+ private static final ImmutableList<Period> RETRY_SCHEDULE = ImmutableList.<Period>of(Period.hours(1), Period.days(1));
+
+ private ControlTagCreationInternalEvent event;
+ private TestListener testListener;
+
+ @BeforeClass(groups = "slow")
+ public void setUpClass() throws Exception {
+ final ImmutableAccountData immutableAccountData = Mockito.mock(ImmutableAccountData.class);
+ Mockito.when(immutableAccountInternalApi.getImmutableAccountDataByRecordId(Mockito.<Long>eq(internalCallContext.getAccountRecordId()), Mockito.<InternalTenantContext>any())).thenReturn(immutableAccountData);
+ }
+
+ @BeforeMethod(groups = "slow")
+ public void setUp() throws Exception {
+ event = new DefaultControlTagCreationEvent(UUID.randomUUID(),
+ UUID.randomUUID(),
+ ObjectType.ACCOUNT,
+ new DefaultTagDefinition("name", "description", false),
+ internalCallContext.getAccountRecordId(),
+ internalCallContext.getTenantRecordId(),
+ UUID.randomUUID());
+
+ testListener = new TestListener();
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+ }
+
+ @AfterMethod(groups = "slow")
+ public void tearDown() throws Exception {
+ testListener.stop();
+ }
+
+ @Test(groups = "slow")
+ public void testFixUp() throws Exception {
+ testListener.throwRetryableException = true;
+
+ final DateTime startTime = clock.getUTCNow();
+ testListener.handleEvent(event);
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ List<NotificationEventWithMetadata> futureRetryableEvents = getFutureRetryableEvents();
+ Assert.assertEquals(futureRetryableEvents.size(), 1);
+ Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(0))), 0);
+
+ clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ futureRetryableEvents = getFutureRetryableEvents();
+ Assert.assertEquals(futureRetryableEvents.size(), 1);
+ Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(1))), 0);
+
+ testListener.throwRetryableException = false;
+
+ clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 1);
+ Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+ }
+
+ @Test(groups = "slow")
+ public void testGiveUp() throws Exception {
+ testListener.throwRetryableException = true;
+
+ final DateTime startTime = clock.getUTCNow();
+ testListener.handleEvent(event);
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ List<NotificationEventWithMetadata> futureRetryableEvents = getFutureRetryableEvents();
+ Assert.assertEquals(futureRetryableEvents.size(), 1);
+ Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(0))), 0);
+
+ clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ futureRetryableEvents = getFutureRetryableEvents();
+ Assert.assertEquals(futureRetryableEvents.size(), 1);
+ Assert.assertEquals(futureRetryableEvents.get(0).getEffectiveDate().compareTo(startTime.plus(RETRY_SCHEDULE.get(1))), 0);
+
+ clock.setTime(futureRetryableEvents.get(0).getEffectiveDate());
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ // Give up
+ Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+ }
+
+ @Test(groups = "slow")
+ public void testNotRetryableException() throws Exception {
+ testListener.throwOtherException = true;
+
+ try {
+ testListener.handleEvent(event);
+ Assert.fail("Expected exception");
+ } catch (final IllegalArgumentException e) {
+ Assert.assertTrue(true);
+ }
+ assertListenerStatus();
+
+ Assert.assertEquals(testListener.eventsSeen.size(), 0);
+ Assert.assertEquals(getFutureRetryableEvents().size(), 0);
+ }
+
+ private List<NotificationEventWithMetadata> getFutureRetryableEvents() throws NoSuchNotificationQueue {
+ final NotificationQueue notificationQueue = queueService.getNotificationQueue(RetryableService.RETRYABLE_SERVICE_NAME, TEST_LISTENER);
+ return ImmutableList.<NotificationEventWithMetadata>copyOf(notificationQueue.getFutureNotificationForSearchKeys(internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()));
+ }
+
+ private final class TestListener extends RetryableService {
+
+ private final SubscriberQueueHandler subscriberQueueHandler = new SubscriberQueueHandler();
+ private final Collection<BusInternalEvent> eventsSeen = new LinkedList<BusInternalEvent>();
+
+ private final RetryableSubscriber retryableSubscriber;
+
+ private boolean throwRetryableException = false;
+ private boolean throwOtherException = false;
+
+ public TestListener() {
+ super(queueService);
+
+ subscriberQueueHandler.subscribe(ControlTagDeletionInternalEvent.class,
+ new SubscriberAction<ControlTagDeletionInternalEvent>() {
+ @Override
+ public void run(final ControlTagDeletionInternalEvent event) {
+ Assert.fail("No handler registered");
+ }
+ });
+ subscriberQueueHandler.subscribe(ControlTagCreationInternalEvent.class,
+ new SubscriberAction<ControlTagCreationInternalEvent>() {
+ @Override
+ public void run(final ControlTagCreationInternalEvent event) {
+ if (throwRetryableException) {
+ throw new InvoicePluginApiRetryException(RETRY_SCHEDULE);
+ } else if (throwOtherException) {
+ throw new IllegalArgumentException("EXPECTED");
+ } else {
+ eventsSeen.add(event);
+ }
+ }
+ });
+ this.retryableSubscriber = new RetryableSubscriber(clock, this, subscriberQueueHandler);
+
+ initialize(TEST_LISTENER, subscriberQueueHandler);
+ start();
+ }
+
+ public void handleEvent(final ControlTagCreationInternalEvent event) {
+ retryableSubscriber.handleEvent(event);
+ }
+ }
+}