killbill-memoizeit
Changes
beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java 8(+4 -4)
entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java 3(+2 -1)
entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java 46(+22 -24)
entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java 44(+26 -18)
entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java 58(+42 -16)
entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java 14(+13 -1)
entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionFactory.java 63(+0 -63)
entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java 25(+15 -10)
entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java 33(+15 -18)
entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java 1(+1 -0)
entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java 37(+16 -21)
entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java 65(+62 -3)
entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionBuilder.java 251(+104 -147)
entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java 174(+126 -48)
entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java 2(+1 -1)
entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java 13(+8 -5)
entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java 8(+4 -4)
entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java 39(+22 -17)
invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java 14(+1 -13)
invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java 10(+0 -10)
invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java 10(+0 -10)
overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java 15(+1 -14)
util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java 15(+8 -7)
util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java 209(+170 -39)
Details
diff --git a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
index 7bd0ae1..c80e303 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
@@ -44,7 +44,6 @@ import com.ning.billing.entitlement.alignment.PlanAligner;
import com.ning.billing.entitlement.api.svcs.DefaultEntitlementInternalApi;
import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory;
import com.ning.billing.entitlement.api.user.EntitlementUserApi;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.engine.addon.AddonUtils;
@@ -58,9 +57,11 @@ import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.config.CatalogConfig;
import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
+
public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
private BusinessAccountTagSqlDao accountTagSqlDao;
@@ -73,6 +74,19 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
private EntitlementUserApi entitlementUserApi;
private BusinessTagDao tagDao;
+
+ private NotificationQueueConfig config = new NotificationQueueConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return false;
+ }
+
+ @Override
+ public long getSleepTimeMs() {
+ return 3000;
+ }
+ };
+
@BeforeMethod(groups = "slow")
public void setUp() throws Exception {
final Clock clock = new ClockMock();
@@ -89,13 +103,12 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
accountUserApi = new DefaultAccountUserApi(callContextFactory, internalCallContextFactory, accountDao);
final CatalogService catalogService = new DefaultCatalogService(Mockito.mock(CatalogConfig.class), Mockito.mock(VersionedCatalogLoader.class));
final AddonUtils addonUtils = new AddonUtils(catalogService);
- final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, internalCallContextFactory);
+ final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, config, internalCallContextFactory);
final EntitlementDao entitlementDao = new DefaultEntitlementDao(dbi, clock, addonUtils, notificationQueueService, eventBus, catalogService);
final PlanAligner planAligner = new PlanAligner(catalogService);
- final DefaultSubscriptionApiService apiService = new DefaultSubscriptionApiService(clock, entitlementDao, catalogService, planAligner, internalCallContextFactory);
- final DefaultSubscriptionFactory subscriptionFactory = new DefaultSubscriptionFactory(apiService, clock, catalogService);
- entitlementApi = new DefaultEntitlementInternalApi(entitlementDao, subscriptionFactory);
- entitlementUserApi = new DefaultEntitlementUserApi(clock, entitlementDao, catalogService, apiService, subscriptionFactory, addonUtils, internalCallContextFactory);
+ final DefaultSubscriptionApiService apiService = new DefaultSubscriptionApiService(clock, entitlementDao, catalogService, planAligner, addonUtils, internalCallContextFactory);
+ entitlementApi = new DefaultEntitlementInternalApi(entitlementDao, apiService, clock, catalogService);
+ entitlementUserApi = new DefaultEntitlementUserApi(clock, entitlementDao, catalogService, apiService, addonUtils, internalCallContextFactory);
tagDao = new BusinessTagDao(accountTagSqlDao, invoicePaymentTagSqlDao, invoiceTagSqlDao, subscriptionTransitionTagSqlDao,
accountApi, entitlementApi);
diff --git a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
index e04fed7..5ecdbb6 100644
--- a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
+++ b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
@@ -292,4 +292,12 @@ public interface InvoiceUserApi {
* @throws InvoiceApiException
*/
public String getInvoiceAsHTML(UUID invoiceId, TenantContext context) throws AccountApiException, IOException, InvoiceApiException;
+
+ /**
+ * Rebalance CBA for account which have credit and unpaid invoices-- only needed if system is configured to not rebalance automatically.
+ *
+ * @param accountId account id
+ * @param context the callcontext
+ */
+ public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final CallContext context);
}
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
index 81c832a..eddf4b5 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
@@ -63,6 +63,8 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
private final InternalCallContextFactory internalCallContextFactory;
private final AccountInternalApi accountApi;
+ private volatile boolean isStarted;
+
private static final class EventBusDelegate extends EventBus {
public EventBusDelegate(final String busName) {
@@ -90,10 +92,12 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
public void start() {
startQueue();
+ isStarted = true;
}
public void stop() {
stopQueue();
+ isStarted = false;
}
@Override
@@ -119,6 +123,11 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
return result;
}
+ @Override
+ public boolean isStarted() {
+ return isStarted;
+ }
+
private final UUID getAccountIdFromRecordId(final Long recordId, final InternalCallContext context) {
try {
final Account account = accountApi.getAccountByRecordId(recordId, context);
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
index 38b3a50..fcd10f8 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
@@ -179,17 +179,17 @@ public class TestOverdueIntegration extends TestOverdueBase {
new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 30), new LocalDate(2012, 7, 31), InvoiceItemType.REPAIR_ADJ, new BigDecimal("-249.95")),
new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 23), InvoiceItemType.CBA_ADJ, new BigDecimal("249.95")));
invoiceChecker.checkInvoice(account.getId(), 4, callContext,
- // Note the end date here is not 07-25, but 07-9. The overdue configuration disabled invoicing between 07-09 and 07-23 (e.g. the bundle
+ // Note the end date here is not 07-25, but 07-10. The overdue configuration disabled invoicing between 07-10 and 07-23 (e.g. the bundle
// was inaccessible, hence we didn't want to charge the customer for that period, even though the account was overdue).
- new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 30), new LocalDate(2012, 7, 9), InvoiceItemType.RECURRING, new BigDecimal("74.99")),
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 30), new LocalDate(2012, 7, 10), InvoiceItemType.RECURRING, new BigDecimal("83.31")),
// Item for the upgraded recurring plan
new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 31), InvoiceItemType.RECURRING, new BigDecimal("154.85")),
// Credits consumed
- new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 23), InvoiceItemType.CBA_ADJ, new BigDecimal("-229.84")));
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 23), InvoiceItemType.CBA_ADJ, new BigDecimal("-238.16")));
invoiceChecker.checkChargedThroughDate(baseSubscription.getId(), new LocalDate(2012, 7, 31), callContext);
// Verify the account balance: 249.95 - 74.99 - 154.85
- assertEquals(invoiceUserApi.getAccountBalance(account.getId(), callContext).compareTo(new BigDecimal("-20.11")), 0);
+ assertEquals(invoiceUserApi.getAccountBalance(account.getId(), callContext).compareTo(new BigDecimal("-11.79")), 0);
}
@Test(groups = "slow")
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/EntitlementApiBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/EntitlementApiBase.java
new file mode 100644
index 0000000..39d636b
--- /dev/null
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/EntitlementApiBase.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.entitlement.api;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.ning.billing.catalog.api.CatalogService;
+import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.engine.dao.EntitlementDao;
+import com.ning.billing.entitlement.events.EntitlementEvent;
+import com.ning.billing.util.clock.Clock;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
+public class EntitlementApiBase {
+
+ protected final EntitlementDao dao;
+
+ protected final SubscriptionApiService apiService;
+ protected final Clock clock;
+ protected final CatalogService catalogService;
+
+ public EntitlementApiBase(final EntitlementDao dao, final SubscriptionApiService apiService, final Clock clock, final CatalogService catalogService) {
+ this.dao = dao;
+ this.apiService = apiService;
+ this.clock = clock;
+ this.catalogService = catalogService;
+ }
+
+ protected List<Subscription> createSubscriptionsForApiUse(final List<Subscription> internalSubscriptions) {
+ return new ArrayList<Subscription>(Collections2.transform(internalSubscriptions, new Function<Subscription, Subscription>() {
+ @Override
+ public Subscription apply(final Subscription subscription) {
+ return createSubscriptionForApiUse((SubscriptionData) subscription);
+ }
+ }));
+ }
+
+ protected SubscriptionData createSubscriptionForApiUse(final Subscription internalSubscription) {
+ return new SubscriptionData((SubscriptionData) internalSubscription, apiService, clock);
+ }
+
+ protected SubscriptionData createSubscriptionForApiUse(SubscriptionBuilder builder, List<EntitlementEvent> events) {
+ final SubscriptionData subscription = new SubscriptionData(builder, apiService, clock);
+ if (events.size() > 0) {
+ subscription.rebuildTransitions(events, catalogService.getFullCatalog());
+ }
+ return subscription;
+ }
+}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java
index a6eb3d9..30bb487 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java
@@ -20,7 +20,8 @@ import java.util.List;
import org.joda.time.DateTime;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.events.EntitlementEvent;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
index e836d59..1b42632 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
@@ -25,13 +25,15 @@ import java.util.UUID;
import org.joda.time.DateTime;
+import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.entitlement.alignment.MigrationPlanAligner;
import com.ning.billing.entitlement.alignment.TimedMigration;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
@@ -54,24 +56,20 @@ import com.ning.billing.util.clock.Clock;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
-public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
+public class DefaultEntitlementMigrationApi extends EntitlementApiBase implements EntitlementMigrationApi {
- private final EntitlementDao dao;
private final MigrationPlanAligner migrationAligner;
- private final SubscriptionFactory factory;
- private final Clock clock;
private final InternalCallContextFactory internalCallContextFactory;
@Inject
public DefaultEntitlementMigrationApi(final MigrationPlanAligner migrationAligner,
- final SubscriptionFactory factory,
+ final SubscriptionApiService apiService,
+ final CatalogService catalogService,
final EntitlementDao dao,
final Clock clock,
final InternalCallContextFactory internalCallContextFactory) {
- this.dao = dao;
+ super(dao, apiService, clock, catalogService);
this.migrationAligner = migrationAligner;
- this.factory = factory;
- this.clock = clock;
this.internalCallContextFactory = internalCallContextFactory;
}
@@ -141,13 +139,13 @@ public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
final TimedMigration[] events = migrationAligner.getEventsMigration(input, now);
final DateTime migrationStartDate = events[0].getEventTime();
final List<EntitlementEvent> emptyEvents = Collections.emptyList();
- final SubscriptionData subscriptionData = factory.createSubscription(new SubscriptionBuilder()
- .setId(UUID.randomUUID())
- .setBundleId(bundleId)
- .setCategory(productCategory)
- .setBundleStartDate(migrationStartDate)
- .setAlignStartDate(migrationStartDate),
- emptyEvents);
+ final SubscriptionData subscriptionData = createSubscriptionForApiUse(new SubscriptionBuilder()
+ .setId(UUID.randomUUID())
+ .setBundleId(bundleId)
+ .setCategory(productCategory)
+ .setBundleStartDate(migrationStartDate)
+ .setAlignStartDate(migrationStartDate),
+ emptyEvents);
return new SubscriptionMigrationData(subscriptionData, toEvents(subscriptionData, now, ctd, events, context), ctd);
}
@@ -157,13 +155,13 @@ public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
final TimedMigration[] events = migrationAligner.getEventsMigration(input, now);
final DateTime migrationStartDate = events[0].getEventTime();
final List<EntitlementEvent> emptyEvents = Collections.emptyList();
- final SubscriptionData subscriptionData = factory.createSubscription(new SubscriptionBuilder()
- .setId(UUID.randomUUID())
- .setBundleId(bundleId)
- .setCategory(productCategory)
- .setBundleStartDate(bundleStartDate)
- .setAlignStartDate(migrationStartDate),
- emptyEvents);
+ final SubscriptionData subscriptionData = createSubscriptionForApiUse(new SubscriptionBuilder()
+ .setId(UUID.randomUUID())
+ .setBundleId(bundleId)
+ .setCategory(productCategory)
+ .setBundleStartDate(bundleStartDate)
+ .setAlignStartDate(migrationStartDate),
+ emptyEvents);
return new SubscriptionMigrationData(subscriptionData, toEvents(subscriptionData, now, ctd, events, context), ctd);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
index 4721c66..db0af82 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
@@ -23,10 +23,11 @@ import com.ning.billing.catalog.api.BillingPeriod;
import com.ning.billing.catalog.api.PhaseType;
import com.ning.billing.catalog.api.Plan;
import com.ning.billing.catalog.api.PlanPhaseSpecifier;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.InternalCallContext;
public interface SubscriptionApiService {
@@ -54,4 +55,6 @@ public interface SubscriptionApiService {
public boolean changePlanWithPolicy(SubscriptionData subscription, String productName, BillingPeriod term,
String priceList, DateTime requestedDate, ActionPolicy policy, CallContext context)
throws EntitlementUserApiException;
+
+ public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java
index 1596030..f4b0420 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java
@@ -29,17 +29,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ning.billing.ErrorCode;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.catalog.api.CatalogService;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
+import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.events.EffectiveSubscriptionInternalEvent;
import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
@@ -48,18 +51,17 @@ import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
-public class DefaultEntitlementInternalApi implements EntitlementInternalApi {
+public class DefaultEntitlementInternalApi extends EntitlementApiBase implements EntitlementInternalApi {
private final Logger log = LoggerFactory.getLogger(DefaultEntitlementInternalApi.class);
- private final EntitlementDao dao;
- private final SubscriptionFactory subscriptionFactory;
@Inject
public DefaultEntitlementInternalApi(final EntitlementDao dao,
- final SubscriptionFactory subscriptionFactory) {
- this.dao = dao;
- this.subscriptionFactory = subscriptionFactory;
+ final DefaultSubscriptionApiService apiService,
+ final Clock clock,
+ final CatalogService catalogService) {
+ super(dao, apiService, clock, catalogService);
}
@Override
@@ -68,26 +70,31 @@ public class DefaultEntitlementInternalApi implements EntitlementInternalApi {
}
@Override
- public List<Subscription> getSubscriptionsForBundle(final UUID bundleId, final InternalTenantContext context) {
- return dao.getSubscriptions(subscriptionFactory, bundleId, context);
+ public List<Subscription> getSubscriptionsForBundle(UUID bundleId,
+ InternalTenantContext context) {
+ final List<Subscription> internalSubscriptions = dao.getSubscriptions(bundleId, context);
+ return createSubscriptionsForApiUse(internalSubscriptions);
}
@Override
- public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) throws EntitlementUserApiException {
- final Subscription result = dao.getBaseSubscription(subscriptionFactory, bundleId, context);
+ public Subscription getBaseSubscription(UUID bundleId,
+ InternalTenantContext context) throws EntitlementUserApiException {
+ final Subscription result = dao.getBaseSubscription(bundleId, context);
if (result == null) {
throw new EntitlementUserApiException(ErrorCode.ENT_GET_NO_SUCH_BASE_SUBSCRIPTION, bundleId);
}
- return result;
+ return createSubscriptionForApiUse(result);
}
@Override
- public Subscription getSubscriptionFromId(final UUID id, final InternalTenantContext context) throws EntitlementUserApiException {
- final Subscription result = dao.getSubscriptionFromId(subscriptionFactory, id, context);
+
+ public Subscription getSubscriptionFromId(UUID id,
+ InternalTenantContext context) throws EntitlementUserApiException {
+ final Subscription result = dao.getSubscriptionFromId(id, context);
if (result == null) {
throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_SUBSCRIPTION_ID, id);
}
- return result;
+ return createSubscriptionForApiUse(result);
}
@Override
@@ -105,8 +112,9 @@ public class DefaultEntitlementInternalApi implements EntitlementInternalApi {
}
@Override
- public void setChargedThroughDate(final UUID subscriptionId, final LocalDate localChargedThruDate, final InternalCallContext context) {
- final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, subscriptionId, context);
+ public void setChargedThroughDate(UUID subscriptionId,
+ LocalDate localChargedThruDate, InternalCallContext context) {
+ final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionId, context);
final DateTime chargedThroughDate = localChargedThruDate.toDateTime(new LocalTime(subscription.getStartDate(), DateTimeZone.UTC), DateTimeZone.UTC);
final SubscriptionBuilder builder = new SubscriptionBuilder(subscription)
.setChargedThroughDate(chargedThroughDate)
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
index f61d2a6..c44617a 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
@@ -16,6 +16,7 @@
package com.ning.billing.entitlement.api.timeline;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -25,21 +26,25 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import javax.annotation.Nullable;
+
import org.joda.time.DateTime;
import com.ning.billing.ErrorCode;
import com.ning.billing.catalog.api.CatalogApiException;
import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.SubscriptionTransitionType;
import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.NewEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.events.EntitlementEvent;
import com.ning.billing.entitlement.glue.DefaultEntitlementModule;
@@ -47,17 +52,21 @@ import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.callcontext.TenantContext;
+import com.ning.billing.util.clock.Clock;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
import com.google.inject.Inject;
import com.google.inject.name.Named;
-public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
+public class DefaultEntitlementTimelineApi extends EntitlementApiBase implements EntitlementTimelineApi {
- private final EntitlementDao dao;
- private final SubscriptionFactory factory;
private final RepairEntitlementLifecycleDao repairDao;
private final CatalogService catalogService;
private final InternalCallContextFactory internalCallContextFactory;
+ private final AddonUtils addonUtils;
+
+ private final SubscriptionApiService repairApiService;
private enum RepairType {
BASE_REPAIR,
@@ -66,14 +75,17 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
}
@Inject
- public DefaultEntitlementTimelineApi(@Named(DefaultEntitlementModule.REPAIR_NAMED) final SubscriptionFactory factory, final CatalogService catalogService,
+ public DefaultEntitlementTimelineApi(final CatalogService catalogService,
+ final SubscriptionApiService apiService,
@Named(DefaultEntitlementModule.REPAIR_NAMED) final RepairEntitlementLifecycleDao repairDao, final EntitlementDao dao,
- final InternalCallContextFactory internalCallContextFactory) {
+ @Named(DefaultEntitlementModule.REPAIR_NAMED) final SubscriptionApiService repairApiService,
+ final InternalCallContextFactory internalCallContextFactory, final Clock clock, final AddonUtils addonUtils) {
+ super(dao, apiService, clock, catalogService);
this.catalogService = catalogService;
- this.dao = dao;
this.repairDao = repairDao;
- this.factory = factory;
this.internalCallContextFactory = internalCallContextFactory;
+ this.repairApiService = repairApiService;
+ this.addonUtils = addonUtils;
}
@Override
@@ -101,7 +113,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
if (bundle == null) {
throw new EntitlementRepairException(ErrorCode.ENT_REPAIR_UNKNOWN_BUNDLE, descBundle);
}
- final List<Subscription> subscriptions = dao.getSubscriptions(factory, bundle.getId(), internalCallContextFactory.createInternalTenantContext(context));
+ final List<SubscriptionDataRepair> subscriptions = convertToSubscriptionsDataRepair(dao.getSubscriptions(bundle.getId(), internalCallContextFactory.createInternalTenantContext(context)));
if (subscriptions.size() == 0) {
throw new EntitlementRepairException(ErrorCode.ENT_REPAIR_NO_ACTIVE_SUBSCRIPTIONS, bundle.getId());
}
@@ -113,6 +125,18 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
}
}
+ private List<SubscriptionDataRepair> convertToSubscriptionsDataRepair(List<Subscription> input) {
+ return new ArrayList<SubscriptionDataRepair>(Collections2.transform(input, new Function<Subscription, SubscriptionDataRepair>() {
+ @Override
+ public SubscriptionDataRepair apply(@Nullable final Subscription subscription) {
+ return convertToSubscriptionDataRepair((SubscriptionData) subscription);
+ }
+ }));
+ }
+ private SubscriptionDataRepair convertToSubscriptionDataRepair(SubscriptionData input) {
+ return new SubscriptionDataRepair(input, repairApiService, (EntitlementDao) repairDao, clock, addonUtils, catalogService, internalCallContextFactory);
+ }
+
@Override
public BundleTimeline repairBundle(final BundleTimeline input, final boolean dryRun, final CallContext context) throws EntitlementRepairException {
final InternalTenantContext tenantContext = internalCallContextFactory.createInternalTenantContext(context);
@@ -123,7 +147,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
}
// Subscriptions are ordered with BASE subscription first-- if exists
- final List<Subscription> subscriptions = dao.getSubscriptions(factory, input.getId(), tenantContext);
+ final List<SubscriptionDataRepair> subscriptions = convertToSubscriptionsDataRepair(dao.getSubscriptions(input.getId(), tenantContext));
if (subscriptions.size() == 0) {
throw new EntitlementRepairException(ErrorCode.ENT_REPAIR_NO_ACTIVE_SUBSCRIPTIONS, input.getId());
}
@@ -252,7 +276,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
}
}
- private void validateBasePlanRecreate(final boolean isBasePlanRecreate, final List<Subscription> subscriptions, final List<SubscriptionTimeline> input)
+ private void validateBasePlanRecreate(final boolean isBasePlanRecreate, final List<SubscriptionDataRepair> subscriptions, final List<SubscriptionTimeline> input)
throws EntitlementRepairException {
if (!isBasePlanRecreate) {
return;
@@ -269,7 +293,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
}
}
- private void validateInputSubscriptionsKnown(final List<Subscription> subscriptions, final List<SubscriptionTimeline> input)
+ private void validateInputSubscriptionsKnown(final List<SubscriptionDataRepair> subscriptions, final List<SubscriptionTimeline> input)
throws EntitlementRepairException {
for (final SubscriptionTimeline cur : input) {
boolean found = false;
@@ -365,7 +389,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
return result;
}
- private String getViewId(final DateTime lastUpdateBundleDate, final List<Subscription> subscriptions) {
+ private String getViewId(final DateTime lastUpdateBundleDate, final List<SubscriptionDataRepair> subscriptions) {
final StringBuilder tmp = new StringBuilder();
long lastOrderedId = -1;
for (final Subscription cur : subscriptions) {
@@ -412,7 +436,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
};
}
- private List<SubscriptionTimeline> createGetSubscriptionRepairList(final List<Subscription> subscriptions, final List<SubscriptionTimeline> inRepair) throws CatalogApiException {
+ private List<SubscriptionTimeline> createGetSubscriptionRepairList(final List<SubscriptionDataRepair> subscriptions, final List<SubscriptionTimeline> inRepair) throws CatalogApiException {
final List<SubscriptionTimeline> result = new LinkedList<SubscriptionTimeline>();
final Set<UUID> repairIds = new TreeSet<UUID>();
@@ -464,7 +488,9 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
}
}
- return (SubscriptionDataRepair) factory.createSubscription(builder, initialEvents);
+ final SubscriptionDataRepair subscriptiondataRepair = new SubscriptionDataRepair(builder, curData.getEvents(), repairApiService, (EntitlementDao) repairDao, clock, addonUtils, catalogService, internalCallContextFactory);
+ subscriptiondataRepair.rebuildTransitions(curData.getEvents(), catalogService.getFullCatalog());
+ return subscriptiondataRepair;
}
private SubscriptionTimeline findAndCreateSubscriptionRepair(final UUID target, final List<SubscriptionTimeline> input) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
index 1870bfe..9f5d397 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
@@ -16,12 +16,17 @@
package com.ning.billing.entitlement.api.timeline;
+import org.joda.time.DateTime;
+
import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.entitlement.alignment.PlanAligner;
import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
+import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.glue.DefaultEntitlementModule;
+import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
@@ -35,7 +40,14 @@ public class RepairSubscriptionApiService extends DefaultSubscriptionApiService
@Named(DefaultEntitlementModule.REPAIR_NAMED) final EntitlementDao dao,
final CatalogService catalogService,
final PlanAligner planAligner,
+ final AddonUtils addonUtils,
final InternalCallContextFactory internalCallContextFactory) {
- super(clock, dao, catalogService, planAligner, internalCallContextFactory);
+ super(clock, dao, catalogService, planAligner, addonUtils, internalCallContextFactory);
+ }
+
+ // Nothing to do for repair as we pass all the repair events in the stream
+ @Override
+ public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context) {
+ return 0;
}
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
index 301f15f..310469c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
@@ -24,7 +24,6 @@ import org.joda.time.DateTime;
import com.ning.billing.ErrorCode;
import com.ning.billing.ObjectType;
-import com.ning.billing.catalog.api.Catalog;
import com.ning.billing.catalog.api.CatalogApiException;
import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.Plan;
@@ -33,8 +32,8 @@ import com.ning.billing.catalog.api.Product;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.SubscriptionTransitionType;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.api.user.SubscriptionTransition;
import com.ning.billing.entitlement.engine.addon.AddonUtils;
@@ -59,8 +58,6 @@ public class SubscriptionDataRepair extends SubscriptionData {
private final List<EntitlementEvent> initialEvents;
private final InternalCallContextFactory internalCallContextFactory;
- // Low level events are ONLY used for Repair APIs
- private List<EntitlementEvent> events;
public SubscriptionDataRepair(final SubscriptionBuilder builder, final List<EntitlementEvent> initialEvents, final SubscriptionApiService apiService,
final EntitlementDao dao, final Clock clock, final AddonUtils addonUtils, final CatalogService catalogService,
@@ -74,6 +71,20 @@ public class SubscriptionDataRepair extends SubscriptionData {
this.internalCallContextFactory = internalCallContextFactory;
}
+
+
+ public SubscriptionDataRepair(final SubscriptionData subscriptionData, final SubscriptionApiService apiService,
+ final EntitlementDao dao, final Clock clock, final AddonUtils addonUtils, final CatalogService catalogService,
+ final InternalCallContextFactory internalCallContextFactory) {
+ super(subscriptionData, apiService , clock);
+ this.repairDao = dao;
+ this.addonUtils = addonUtils;
+ this.clock = clock;
+ this.catalogService = catalogService;
+ this.initialEvents = subscriptionData.getEvents();
+ this.internalCallContextFactory = internalCallContextFactory;
+ }
+
DateTime getLastUserEventEffectiveDate() {
DateTime res = null;
for (final EntitlementEvent cur : events) {
@@ -185,12 +196,6 @@ public class SubscriptionDataRepair extends SubscriptionData {
}
}
- @Override
- public void rebuildTransitions(final List<EntitlementEvent> inputEvents, final Catalog catalog) {
- this.events = inputEvents;
- super.rebuildTransitions(inputEvents, catalog);
- }
-
public List<EntitlementEvent> getEvents() {
return events;
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java
index 466d466..651b297 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java
@@ -29,7 +29,8 @@ import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.PlanPhase;
import com.ning.billing.catalog.api.PlanPhaseSpecifier;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
import com.ning.billing.entitlement.api.timeline.BundleTimeline;
@@ -37,7 +38,8 @@ import com.ning.billing.entitlement.api.timeline.EntitlementRepairException;
import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline;
import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.ExistingEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
@@ -58,22 +60,17 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
-public class DefaultEntitlementTransferApi implements EntitlementTransferApi {
+public class DefaultEntitlementTransferApi extends EntitlementApiBase implements EntitlementTransferApi {
- private final Clock clock;
- private final EntitlementDao dao;
private final CatalogService catalogService;
- private final SubscriptionFactory subscriptionFactory;
private final EntitlementTimelineApi timelineApi;
private final InternalCallContextFactory internalCallContextFactory;
@Inject
public DefaultEntitlementTransferApi(final Clock clock, final EntitlementDao dao, final EntitlementTimelineApi timelineApi, final CatalogService catalogService,
- final SubscriptionFactory subscriptionFactory, final InternalCallContextFactory internalCallContextFactory) {
- this.clock = clock;
- this.dao = dao;
+ final SubscriptionApiService apiService, final InternalCallContextFactory internalCallContextFactory) {
+ super(dao, apiService, clock, catalogService);
this.catalogService = catalogService;
- this.subscriptionFactory = subscriptionFactory;
this.timelineApi = timelineApi;
this.internalCallContextFactory = internalCallContextFactory;
}
@@ -220,7 +217,7 @@ public class DefaultEntitlementTransferApi implements EntitlementTransferApi {
DateTime bundleStartdate = null;
for (final SubscriptionTimeline cur : bundleTimeline.getSubscriptions()) {
- final SubscriptionData oldSubscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, cur.getId(), fromInternalCallContext);
+ final SubscriptionData oldSubscription = (SubscriptionData) dao.getSubscriptionFromId(cur.getId(), fromInternalCallContext);
final List<ExistingEvent> existingEvents = cur.getExistingEvents();
final ProductCategory productCategory = existingEvents.get(0).getPlanPhaseSpecifier().getProductCategory();
if (productCategory == ProductCategory.ADD_ON) {
@@ -254,13 +251,13 @@ public class DefaultEntitlementTransferApi implements EntitlementTransferApi {
}
// Create the new subscription for the new bundle on the new account
- final SubscriptionData subscriptionData = subscriptionFactory.createSubscription(new SubscriptionBuilder()
- .setId(UUID.randomUUID())
- .setBundleId(subscriptionBundleData.getId())
- .setCategory(productCategory)
- .setBundleStartDate(effectiveTransferDate)
- .setAlignStartDate(subscriptionAlignStartDate),
- ImmutableList.<EntitlementEvent>of());
+ final SubscriptionData subscriptionData = createSubscriptionForApiUse(new SubscriptionBuilder()
+ .setId(UUID.randomUUID())
+ .setBundleId(subscriptionBundleData.getId())
+ .setCategory(productCategory)
+ .setBundleStartDate(effectiveTransferDate)
+ .setAlignStartDate(subscriptionAlignStartDate),
+ ImmutableList.<EntitlementEvent>of());
final List<EntitlementEvent> events = toEvents(existingEvents, subscriptionData, effectiveTransferDate, context);
final SubscriptionMigrationData curData = new SubscriptionMigrationData(subscriptionData, events, null);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
index 32d968c..4272038 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DefaultEffectiveSubscriptionEvent extends DefaultSubscriptionEvent implements EffectiveSubscriptionInternalEvent {
+
public DefaultEffectiveSubscriptionEvent(final SubscriptionTransitionData in, final DateTime startDate, final Long accountRecordId, final Long tenantRecordId) {
super(in, startDate, accountRecordId, tenantRecordId);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
index d7b8d6e..d3d128c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
@@ -33,8 +33,7 @@ import com.ning.billing.catalog.api.PlanPhase;
import com.ning.billing.catalog.api.PlanPhaseSpecifier;
import com.ning.billing.catalog.api.PriceListSet;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
import com.ning.billing.entitlement.api.user.SubscriptionStatusDryRun.DryRunChangeReason;
import com.ning.billing.entitlement.engine.addon.AddonUtils;
@@ -48,26 +47,19 @@ import com.ning.billing.util.clock.DefaultClock;
import com.google.inject.Inject;
-public class DefaultEntitlementUserApi implements EntitlementUserApi {
+public class DefaultEntitlementUserApi extends EntitlementApiBase implements EntitlementUserApi {
- private final Clock clock;
- private final EntitlementDao dao;
private final CatalogService catalogService;
- private final DefaultSubscriptionApiService apiService;
private final AddonUtils addonUtils;
- private final SubscriptionFactory subscriptionFactory;
private final InternalCallContextFactory internalCallContextFactory;
@Inject
public DefaultEntitlementUserApi(final Clock clock, final EntitlementDao dao, final CatalogService catalogService,
- final DefaultSubscriptionApiService apiService, final SubscriptionFactory subscriptionFactory,
+ final DefaultSubscriptionApiService apiService,
final AddonUtils addonUtils, final InternalCallContextFactory internalCallContextFactory) {
- this.clock = clock;
- this.apiService = apiService;
- this.dao = dao;
+ super(dao, apiService, clock, catalogService);
this.catalogService = catalogService;
this.addonUtils = addonUtils;
- this.subscriptionFactory = subscriptionFactory;
this.internalCallContextFactory = internalCallContextFactory;
}
@@ -82,11 +74,11 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
@Override
public Subscription getSubscriptionFromId(final UUID id, final TenantContext context) throws EntitlementUserApiException {
- final Subscription result = dao.getSubscriptionFromId(subscriptionFactory, id, internalCallContextFactory.createInternalTenantContext(context));
+ final Subscription result = dao.getSubscriptionFromId(id, internalCallContextFactory.createInternalTenantContext(context));
if (result == null) {
throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_SUBSCRIPTION_ID, id);
}
- return result;
+ return createSubscriptionForApiUse(result);
}
@Override
@@ -111,23 +103,26 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
@Override
public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId, final String bundleKey, final TenantContext context) {
- return dao.getSubscriptionsForAccountAndKey(subscriptionFactory, accountId, bundleKey, internalCallContextFactory.createInternalTenantContext(context));
+ final List<Subscription> internalSubscriptions = dao.getSubscriptionsForAccountAndKey(accountId, bundleKey, internalCallContextFactory.createInternalTenantContext(context));
+ return createSubscriptionsForApiUse(internalSubscriptions);
}
@Override
public List<Subscription> getSubscriptionsForBundle(final UUID bundleId, final TenantContext context) {
- return dao.getSubscriptions(subscriptionFactory, bundleId, internalCallContextFactory.createInternalTenantContext(context));
+ final List<Subscription> internalSubscriptions = dao.getSubscriptions(bundleId, internalCallContextFactory.createInternalTenantContext(context));
+ return createSubscriptionsForApiUse(internalSubscriptions);
}
@Override
public Subscription getBaseSubscription(final UUID bundleId, final TenantContext context) throws EntitlementUserApiException {
- final Subscription result = dao.getBaseSubscription(subscriptionFactory, bundleId, internalCallContextFactory.createInternalTenantContext(context));
+ final Subscription result = dao.getBaseSubscription(bundleId, internalCallContextFactory.createInternalTenantContext(context));
if (result == null) {
throw new EntitlementUserApiException(ErrorCode.ENT_GET_NO_SUCH_BASE_SUBSCRIPTION, bundleId);
}
- return result;
+ return createSubscriptionForApiUse(result);
}
+
@Override
public SubscriptionBundle createBundleForAccount(final UUID accountId, final String bundleName, final CallContext context)
throws EntitlementUserApiException {
@@ -162,7 +157,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
}
DateTime bundleStartDate = null;
- final SubscriptionData baseSubscription = (SubscriptionData) dao.getBaseSubscription(subscriptionFactory, bundleId, internalCallContextFactory.createInternalTenantContext(context));
+ final SubscriptionData baseSubscription = (SubscriptionData) dao.getBaseSubscription(bundleId, internalCallContextFactory.createInternalTenantContext(context));
switch (plan.getProduct().getCategory()) {
case BASE:
if (baseSubscription != null) {
@@ -231,7 +226,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
public List<SubscriptionStatusDryRun> getDryRunChangePlanStatus(final UUID subscriptionId, @Nullable final String baseProductName,
final DateTime requestedDate, final TenantContext context)
throws EntitlementUserApiException {
- final Subscription subscription = dao.getSubscriptionFromId(subscriptionFactory, subscriptionId, internalCallContextFactory.createInternalTenantContext(context));
+ final Subscription subscription = dao.getSubscriptionFromId(subscriptionId, internalCallContextFactory.createInternalTenantContext(context));
if (subscription == null) {
throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_SUBSCRIPTION_ID, subscriptionId);
}
@@ -241,7 +236,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
final List<SubscriptionStatusDryRun> result = new LinkedList<SubscriptionStatusDryRun>();
- final List<Subscription> bundleSubscriptions = dao.getSubscriptions(subscriptionFactory, subscription.getBundleId(), internalCallContextFactory.createInternalTenantContext(context));
+ final List<Subscription> bundleSubscriptions = dao.getSubscriptions(subscription.getBundleId(), internalCallContextFactory.createInternalTenantContext(context));
for (final Subscription cur : bundleSubscriptions) {
if (cur.getId().equals(subscriptionId)) {
continue;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
index 3dd5850..d60a898 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
@@ -17,6 +17,8 @@
package com.ning.billing.entitlement.api.user;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
@@ -37,11 +39,12 @@ import com.ning.billing.catalog.api.PlanSpecifier;
import com.ning.billing.catalog.api.PriceList;
import com.ning.billing.catalog.api.PriceListSet;
import com.ning.billing.catalog.api.Product;
+import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.entitlement.alignment.PlanAligner;
import com.ning.billing.entitlement.alignment.TimedPhase;
import com.ning.billing.entitlement.api.SubscriptionApiService;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.events.EntitlementEvent;
import com.ning.billing.entitlement.events.phase.PhaseEvent;
@@ -68,18 +71,22 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
private final EntitlementDao dao;
private final CatalogService catalogService;
private final PlanAligner planAligner;
+ private final AddonUtils addonUtils;
private final InternalCallContextFactory internalCallContextFactory;
@Inject
public DefaultSubscriptionApiService(final Clock clock, final EntitlementDao dao, final CatalogService catalogService,
- final PlanAligner planAligner, final InternalCallContextFactory internalCallContextFactory) {
+ final PlanAligner planAligner, final AddonUtils addonUtils,
+ final InternalCallContextFactory internalCallContextFactory) {
this.clock = clock;
this.catalogService = catalogService;
this.planAligner = planAligner;
this.dao = dao;
+ this.addonUtils = addonUtils;
this.internalCallContextFactory = internalCallContextFactory;
}
+
@Override
public SubscriptionData createPlan(final SubscriptionBuilder builder, final Plan plan, final PhaseType initialPhase,
final String realPriceList, final DateTime requestedDate, final DateTime effectiveDate, final DateTime processedDate,
@@ -215,6 +222,9 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
final InternalCallContext internalCallContext = createCallContextFromBundleId(subscription.getBundleId(), context);
dao.cancelSubscription(subscription, cancelEvent, internalCallContext, 0);
subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId(), internalCallContext), catalogService.getFullCatalog());
+
+ cancelAddOnsIfRequired(subscription, effectiveDate, internalCallContext);
+
return (policy == ActionPolicy.IMMEDIATE);
}
@@ -354,9 +364,58 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
dao.changePlan(subscription, changeEvents, internalCallContext);
subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId(), internalCallContext), catalogService.getFullCatalog());
+ cancelAddOnsIfRequired(subscription, effectiveDate, internalCallContext);
+
return (policy == ActionPolicy.IMMEDIATE);
}
+
+ public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context) {
+
+ // If cancellation/change occur in the future, there is nothing to do
+ final DateTime now = clock.getUTCNow();
+ if (effectiveDate.compareTo(now) > 0) {
+ return 0;
+ }
+
+ final Product baseProduct = (baseSubscription.getState() == SubscriptionState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
+
+ final List<Subscription> subscriptions = dao.getSubscriptions(baseSubscription.getBundleId(), context);
+
+ final List<SubscriptionData> subscriptionsToBeCancelled = new LinkedList<SubscriptionData>();
+ final List<EntitlementEvent> cancelEvents = new LinkedList<EntitlementEvent>();
+
+ for (final Subscription subscription : subscriptions) {
+ final SubscriptionData cur = (SubscriptionData) subscription;
+ if (cur.getState() == SubscriptionState.CANCELLED ||
+ cur.getCategory() != ProductCategory.ADD_ON) {
+ continue;
+ }
+
+ final Plan addonCurrentPlan = cur.getCurrentPlan();
+ if (baseProduct == null ||
+ addonUtils.isAddonIncluded(baseProduct, addonCurrentPlan) ||
+ !addonUtils.isAddonAvailable(baseProduct, addonCurrentPlan)) {
+ //
+ // Perform AO cancellation using the effectiveDate of the BP
+ //
+ final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
+ .setSubscriptionId(cur.getId())
+ .setActiveVersion(cur.getActiveVersion())
+ .setProcessedDate(now)
+ .setEffectiveDate(effectiveDate)
+ .setRequestedDate(now)
+ .setUserToken(context.getUserToken())
+ .setFromDisk(true));
+ subscriptionsToBeCancelled.add(cur);
+ cancelEvents.add(cancelEvent);
+ }
+ }
+
+ dao.cancelSubscriptions(subscriptionsToBeCancelled, cancelEvents, context);
+ return subscriptionsToBeCancelled.size();
+ }
+
private void validateRequestedDate(final SubscriptionData subscription, final DateTime now, final DateTime requestedDate)
throws EntitlementUserApiException {
@@ -364,7 +423,7 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_FUTURE_DATE, requestedDate.toString());
}
- final SubscriptionTransition previousTransition = subscription.getPreviousTransition();
+ final SubscriptionTransition previousTransition = subscription.getPreviousTransition();
if (previousTransition != null && previousTransition.getEffectiveTransitionTime().isAfter(requestedDate)) {
throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_DATE,
requestedDate.toString(), previousTransition.getEffectiveTransitionTime());
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
index 2903504..7d88024 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
@@ -39,7 +39,6 @@ import com.ning.billing.catalog.api.PriceList;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.SubscriptionTransitionType;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionDataIterator.Kind;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionDataIterator.Order;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionDataIterator.TimeLimit;
@@ -85,6 +84,14 @@ public class SubscriptionData extends EntityBase implements Subscription {
//
private LinkedList<SubscriptionTransitionData> transitions;
+ // Low level events are ONLY used for Repair APIs
+ protected List<EntitlementEvent> events;
+
+
+ public List<EntitlementEvent> getEvents() {
+ return events;
+ }
+
// Transient object never returned at the API
public SubscriptionData(final SubscriptionBuilder builder) {
this(builder, null, null);
@@ -103,6 +110,22 @@ public class SubscriptionData extends EntityBase implements Subscription {
this.paidThroughDate = builder.getPaidThroughDate();
}
+ // Used for API to make sure we have a clock and an apiService set before we return the object
+ public SubscriptionData(final SubscriptionData internalSubscription, final SubscriptionApiService apiService, final Clock clock) {
+ super(internalSubscription.getId(), internalSubscription.getCreatedDate(), internalSubscription.getUpdatedDate());
+ this.apiService = apiService;
+ this.clock = clock;
+ this.bundleId = internalSubscription.getBundleId();
+ this.alignStartDate = internalSubscription.getAlignStartDate();
+ this.bundleStartDate = internalSubscription.getBundleStartDate();
+ this.category = internalSubscription.getCategory();
+ this.activeVersion = internalSubscription.getActiveVersion();
+ this.chargedThroughDate = internalSubscription.getChargedThroughDate();
+ this.paidThroughDate = internalSubscription.getPaidThroughDate();
+ this.transitions = new LinkedList<SubscriptionTransitionData>(internalSubscription.getAllTransitions());
+ this.events = internalSubscription.getEvents();
+ }
+
@Override
public UUID getBundleId() {
return bundleId;
@@ -484,13 +507,14 @@ public class SubscriptionData extends EntityBase implements Subscription {
"Failed to find CurrentPhaseStart id = %s", getId().toString()));
}
- public void rebuildTransitions(final List<EntitlementEvent> inputEvents,
- final Catalog catalog) {
+ public void rebuildTransitions(final List<EntitlementEvent> inputEvents, final Catalog catalog) {
if (inputEvents == null) {
return;
}
+ this.events = inputEvents;
+
SubscriptionState nextState = null;
String nextPlanName = null;
String nextPhaseName = null;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 10ac6c9..ce206ed 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -28,12 +28,10 @@ import org.slf4j.LoggerFactory;
import com.ning.billing.catalog.api.Plan;
import com.ning.billing.catalog.api.Product;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.util.config.EntitlementConfig;
-import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.entitlement.alignment.PlanAligner;
import com.ning.billing.entitlement.alignment.TimedPhase;
import com.ning.billing.entitlement.api.EntitlementService;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
import com.ning.billing.entitlement.api.user.Subscription;
import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
@@ -80,29 +78,25 @@ public class Engine implements EventListener, EntitlementService {
private final PlanAligner planAligner;
private final AddonUtils addonUtils;
private final InternalBus eventBus;
- private final EntitlementConfig config;
private final NotificationQueueService notificationQueueService;
- private final SubscriptionFactory subscriptionFactory;
private final InternalCallContextFactory internalCallContextFactory;
-
private NotificationQueue subscriptionEventQueue;
+ private final SubscriptionApiService apiService;
@Inject
public Engine(final Clock clock, final EntitlementDao dao, final PlanAligner planAligner,
- final EntitlementConfig config,
final AddonUtils addonUtils, final InternalBus eventBus,
final NotificationQueueService notificationQueueService,
- final SubscriptionFactory subscriptionFactory,
- final InternalCallContextFactory internalCallContextFactory) {
+ final InternalCallContextFactory internalCallContextFactory,
+ final SubscriptionApiService apiService) {
this.clock = clock;
this.dao = dao;
this.planAligner = planAligner;
this.addonUtils = addonUtils;
- this.config = config;
this.eventBus = eventBus;
this.notificationQueueService = notificationQueueService;
- this.subscriptionFactory = subscriptionFactory;
this.internalCallContextFactory = internalCallContextFactory;
+ this.apiService = apiService;
}
@Override
@@ -135,22 +129,9 @@ public class Engine implements EventListener, EntitlementService {
}
};
- final NotificationConfig notificationConfig = new NotificationConfig() {
- @Override
- public long getSleepTimeMs() {
- return config.getSleepTimeMs();
- }
-
- @Override
- public boolean isNotificationProcessingOff() {
- return config.isNotificationProcessingOff();
- }
- };
-
subscriptionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
NOTIFICATION_QUEUE_NAME,
- queueHandler,
- notificationConfig);
+ queueHandler);
} catch (NotificationQueueAlreadyExists e) {
throw new RuntimeException(e);
}
@@ -175,7 +156,7 @@ public class Engine implements EventListener, EntitlementService {
return;
}
- final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, event.getSubscriptionId(), context);
+ final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(event.getSubscriptionId(), context);
if (subscription == null) {
log.warn("Failed to retrieve subscription for id %s", event.getSubscriptionId());
return;
@@ -188,7 +169,6 @@ public class Engine implements EventListener, EntitlementService {
//
// Do any internal processing on that event before we send the event to the bus
//
-
int theRealSeqId = seqId;
if (event.getType() == EventType.PHASE) {
onPhaseEvent(subscription, context);
@@ -199,7 +179,7 @@ public class Engine implements EventListener, EntitlementService {
try {
final SubscriptionTransitionData transition = (subscription.getTransitionFromEvent(event, theRealSeqId));
final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, subscription.getAlignStartDate(),
- context.getAccountRecordId(), context.getTenantRecordId());
+ context.getAccountRecordId(), context.getTenantRecordId());
eventBus.post(busEvent, context);
} catch (EventBusException e) {
log.warn("Failed to post entitlement event " + event, e);
@@ -222,47 +202,8 @@ public class Engine implements EventListener, EntitlementService {
}
private int onBasePlanEvent(final SubscriptionData baseSubscription, final ApiEvent event, final InternalCallContext context) {
- final DateTime now = clock.getUTCNow();
- final Product baseProduct = (baseSubscription.getState() == SubscriptionState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
-
- final List<Subscription> subscriptions = dao.getSubscriptions(subscriptionFactory, baseSubscription.getBundleId(), context);
-
- final Map<UUID, EntitlementEvent> addOnCancellations = new HashMap<UUID, EntitlementEvent>();
- final Map<UUID, SubscriptionData> addOnCancellationSubscriptions = new HashMap<UUID, SubscriptionData>();
- for (final Subscription subscription : subscriptions) {
- final SubscriptionData cur = (SubscriptionData) subscription;
- if (cur.getState() == SubscriptionState.CANCELLED ||
- cur.getCategory() != ProductCategory.ADD_ON) {
- continue;
- }
+ return apiService.cancelAddOnsIfRequired(baseSubscription, event.getEffectiveDate(), context);
+ }
- final Plan addonCurrentPlan = cur.getCurrentPlan();
- if (baseProduct == null ||
- addonUtils.isAddonIncluded(baseProduct, addonCurrentPlan) ||
- !addonUtils.isAddonAvailable(baseProduct, addonCurrentPlan)) {
- //
- // Perform AO cancellation using the effectiveDate of the BP
- //
- final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
- .setSubscriptionId(cur.getId())
- .setActiveVersion(cur.getActiveVersion())
- .setProcessedDate(now)
- .setEffectiveDate(event.getEffectiveDate())
- .setRequestedDate(now)
- .setUserToken(context.getUserToken())
- .setFromDisk(true));
- addOnCancellations.put(cur.getId(), cancelEvent);
- addOnCancellationSubscriptions.put(cur.getId(), cur);
- }
- }
-
- final int addOnSize = addOnCancellations.size();
- int cancelSeq = addOnSize - 1;
- for (final UUID key : addOnCancellations.keySet()) {
- dao.cancelSubscription(addOnCancellationSubscriptions.get(key), addOnCancellations.get(key), context, cancelSeq);
- cancelSeq--;
- }
- return addOnSize;
- }
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
index c033364..7f077ce 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2010-2011 Ning, Inc.
+ * Copyright 2010-2012 Ning, Inc.
*
* Ning licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -39,19 +40,20 @@ import com.ning.billing.ErrorCode;
import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.Plan;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
import com.ning.billing.entitlement.api.migration.AccountMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
import com.ning.billing.entitlement.api.timeline.DefaultRepairEntitlementEvent;
import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
import com.ning.billing.entitlement.api.transfer.TransferCancelData;
+import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
import com.ning.billing.entitlement.api.user.DefaultRequestedSubscriptionEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
import com.ning.billing.entitlement.engine.addon.AddonUtils;
import com.ning.billing.entitlement.engine.core.Engine;
import com.ning.billing.entitlement.engine.core.EntitlementNotificationKey;
@@ -76,6 +78,7 @@ import com.ning.billing.util.entity.dao.EntitySqlDao;
import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import com.ning.billing.util.events.EffectiveSubscriptionInternalEvent;
import com.ning.billing.util.events.RepairEntitlementInternalEvent;
import com.ning.billing.util.notificationq.NotificationKey;
import com.ning.billing.util.notificationq.NotificationQueue;
@@ -98,6 +101,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
private final NotificationQueueService notificationQueueService;
private final AddonUtils addonUtils;
private final InternalBus eventBus;
+ private final CatalogService catalogService;
@Inject
public DefaultEntitlementDao(final IDBI dbi, final Clock clock, final AddonUtils addonUtils,
@@ -107,6 +111,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
this.notificationQueueService = notificationQueueService;
this.addonUtils = addonUtils;
this.eventBus = eventBus;
+ this.catalogService = catalogService;
}
@Override
@@ -206,29 +211,26 @@ public class DefaultEntitlementDao implements EntitlementDao {
}
@Override
- public Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
- return getBaseSubscription(factory, bundleId, true, context);
+ public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) {
+ return getBaseSubscription(bundleId, true, context);
}
@Override
- public Subscription getSubscriptionFromId(final SubscriptionFactory factory, final UUID subscriptionId, final InternalTenantContext context) {
- return buildSubscription(factory, getSubscriptionFromId(subscriptionId, context), context);
- }
-
- private Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
- return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Subscription>() {
+ public Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
+ final Subscription shellSubscription = transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Subscription>() {
@Override
public Subscription inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
final SubscriptionModelDao model = entitySqlDaoWrapperFactory.become(SubscriptionSqlDao.class).getById(subscriptionId.toString(), context);
return SubscriptionModelDao.toSubscription(model);
}
});
+ return buildSubscription(shellSubscription, context);
}
@Override
- public List<Subscription> getSubscriptions(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
- return buildBundleSubscriptions(bundleId, factory, getSubscriptionFromBundleId(bundleId, context), context);
+ public List<Subscription> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
+ return buildBundleSubscriptions(bundleId, getSubscriptionFromBundleId(bundleId, context), context);
}
private List<Subscription> getSubscriptionFromBundleId(final UUID bundleId, final InternalTenantContext context) {
@@ -248,7 +250,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
@Override
- public List<Subscription> getSubscriptionsForAccountAndKey(final SubscriptionFactory factory, final UUID accountId,
+ public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId,
final String bundleKey, final InternalTenantContext context) {
return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<Subscription>>() {
@Override
@@ -257,7 +259,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
if (bundleModel == null) {
return Collections.emptyList();
}
- return getSubscriptions(factory, bundleModel.getId(), context);
+ return getSubscriptions(bundleModel.getId(), context);
}
});
}
@@ -389,10 +391,10 @@ public class DefaultEntitlementDao implements EntitlementDao {
final EntitlementEventSqlDao eventsDaoFromSameTransaction = entitySqlDaoWrapperFactory.become(EntitlementEventSqlDao.class);
for (final EntitlementEvent cur : initialEvents) {
eventsDaoFromSameTransaction.create(new EntitlementEventModelDao(cur), context);
- recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
- cur.getEffectiveDate(),
- new EntitlementNotificationKey(cur.getId()),
- context);
+
+ final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+ recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
+
}
// Notify the Bus of the latest requested change, if needed
if (initialEvents.size() > 0) {
@@ -403,6 +405,8 @@ public class DefaultEntitlementDao implements EntitlementDao {
});
}
+
+
@Override
public void recreateSubscription(final SubscriptionData subscription, final List<EntitlementEvent> recreateEvents, final InternalCallContext context) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -412,11 +416,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
for (final EntitlementEvent cur : recreateEvents) {
transactional.create(new EntitlementEventModelDao(cur), context);
- recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
- cur.getEffectiveDate(),
- new EntitlementNotificationKey(cur.getId()),
- context);
+ final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+ recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
}
// Notify the Bus of the latest requested change
@@ -428,6 +430,24 @@ public class DefaultEntitlementDao implements EntitlementDao {
}
@Override
+ public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+
+ transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+ @Override
+ public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+ for (int i = 0; i < subscriptions.size(); i++) {
+ final SubscriptionData subscription = subscriptions.get(i);
+ final EntitlementEvent cancelEvent = cancelEvents.get(i);
+ cancelSubscriptionFromTransaction(subscription, cancelEvent, entitySqlDaoWrapperFactory, context, i);
+ }
+ return null;
+ }
+ });
+ }
+
+
+
+ @Override
public void cancelSubscription(final SubscriptionData subscription, final EntitlementEvent cancelEvent, final InternalCallContext context, final int seqId) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@Override
@@ -498,10 +518,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
for (final EntitlementEvent cur : changeEventsTweakedWithMigrateBilling) {
transactional.create(new EntitlementEventModelDao(cur), context);
- recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
- cur.getEffectiveDate(),
- new EntitlementNotificationKey(cur.getId()),
- context);
+
+ final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+ recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
}
// Notify the Bus of the latest requested change
@@ -608,10 +627,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
final UUID subscriptionId = subscription.getId();
cancelFutureEventsFromTransaction(subscriptionId, entitySqlDaoWrapperFactory, context);
entitySqlDaoWrapperFactory.become(EntitlementEventSqlDao.class).create(new EntitlementEventModelDao(cancelEvent), context);
- recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
- cancelEvent.getEffectiveDate(),
- new EntitlementNotificationKey(cancelEvent.getId(), seqId),
- context);
+
+ final boolean isBusEvent = cancelEvent.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0;
+ recordBusOrFutureNotificationFromTransaction(subscription, cancelEvent, entitySqlDaoWrapperFactory, isBusEvent, seqId, context);
// Notify the Bus of the requested change
notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, cancelEvent, context);
@@ -663,14 +681,14 @@ public class DefaultEntitlementDao implements EntitlementDao {
}
}
- private Subscription buildSubscription(final SubscriptionFactory factory, final Subscription input, final InternalTenantContext context) {
+ private Subscription buildSubscription(final Subscription input, final InternalTenantContext context) {
if (input == null) {
return null;
}
final List<Subscription> bundleInput = new ArrayList<Subscription>();
if (input.getCategory() == ProductCategory.ADD_ON) {
- final Subscription baseSubscription = getBaseSubscription(factory, input.getBundleId(), false, context);
+ final Subscription baseSubscription = getBaseSubscription(input.getBundleId(), false, context);
if (baseSubscription == null) {
return null;
}
@@ -681,7 +699,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
bundleInput.add(input);
}
- final List<Subscription> reloadedSubscriptions = buildBundleSubscriptions(input.getBundleId(), factory, bundleInput, context);
+ final List<Subscription> reloadedSubscriptions = buildBundleSubscriptions(input.getBundleId(), bundleInput, context);
for (final Subscription cur : reloadedSubscriptions) {
if (cur.getId().equals(input.getId())) {
return cur;
@@ -691,7 +709,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
throw new EntitlementError("Unexpected code path in buildSubscription");
}
- private List<Subscription> buildBundleSubscriptions(final UUID bundleId, final SubscriptionFactory factory, final List<Subscription> input, final InternalTenantContext context) {
+ private List<Subscription> buildBundleSubscriptions(final UUID bundleId, final List<Subscription> input, final InternalTenantContext context) {
if (input == null || input.size() == 0) {
return Collections.emptyList();
}
@@ -714,7 +732,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
final List<Subscription> result = new ArrayList<Subscription>(input.size());
for (final Subscription cur : input) {
final List<EntitlementEvent> events = getEventsForSubscription(cur.getId(), context);
- Subscription reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
+ Subscription reloaded = createSubscriptionForInternalUse(cur, events);
switch (cur.getCategory()) {
case BASE:
@@ -752,7 +770,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
events.add(addOnCancelEvent);
// Finally reload subscription with full set of events
- reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
+ reloaded = createSubscriptionForInternalUse(cur, events);
}
break;
default:
@@ -838,26 +856,58 @@ public class DefaultEntitlementDao implements EntitlementDao {
});
}
- private Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final boolean rebuildSubscription, final InternalTenantContext context) {
+ private SubscriptionData createSubscriptionForInternalUse(final Subscription shellSubscription, final List<EntitlementEvent> events) {
+ final SubscriptionData result = new SubscriptionData(new SubscriptionBuilder(((SubscriptionData) shellSubscription)), null, clock);
+ if (events.size() > 0) {
+ result.rebuildTransitions(events, catalogService.getFullCatalog());
+ }
+ return result;
+ }
+
+ private Subscription getBaseSubscription(final UUID bundleId, final boolean rebuildSubscription, final InternalTenantContext context) {
final List<Subscription> subscriptions = getSubscriptionFromBundleId(bundleId, context);
for (final Subscription cur : subscriptions) {
if (cur.getCategory() == ProductCategory.BASE) {
- return rebuildSubscription ? buildSubscription(factory, cur, context) : cur;
+ return rebuildSubscription ? buildSubscription(cur, context) : cur;
}
}
return null;
}
- private void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final DateTime effectiveDate,
- final NotificationKey notificationKey, final InternalCallContext context) {
+
+ //
+ // Either records a notfication or sends a bus event is operation is immediate
+ //
+ private void recordBusOrFutureNotificationFromTransaction(final SubscriptionData subscription, final EntitlementEvent event, final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final boolean busEvent,
+ final int seqId, final InternalCallContext context) {
+ if (busEvent) {
+ notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, seqId, context);
+ } else {
+ recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
+ event.getEffectiveDate(),
+ new EntitlementNotificationKey(event.getId()),
+ context);
+ }
+ }
+
+ //
+ // Sends bus notification for event on effecfive date-- only used for operation that happen immediately:
+ // - CREATE,
+ // - IMM CANCEL or CHANGE
+ //
+ private void notifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final SubscriptionData subscription,
+ final EntitlementEvent immediateEvent, final int seqId, final InternalCallContext context) {
try {
- final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
- Engine.NOTIFICATION_QUEUE_NAME);
- subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, notificationKey, context);
- } catch (NoSuchNotificationQueue e) {
- throw new RuntimeException(e);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ final SubscriptionData upToDateSubscription = createSubscriptionWithNewEvent(subscription, immediateEvent);
+
+ final SubscriptionTransitionData transition = upToDateSubscription.getTransitionFromEvent(immediateEvent, seqId);
+ final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, upToDateSubscription.getAlignStartDate(),
+ context.getAccountRecordId(), context.getTenantRecordId());
+
+
+ eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory, context);
+ } catch (EventBusException e) {
+ log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
}
}
@@ -870,6 +920,18 @@ public class DefaultEntitlementDao implements EntitlementDao {
}
}
+ private void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final DateTime effectiveDate,
+ final NotificationKey notificationKey, final InternalCallContext context) {
+ try {
+ final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+ Engine.NOTIFICATION_QUEUE_NAME);
+ subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, notificationKey, context);
+ } catch (NoSuchNotificationQueue e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
private void migrateBundleDataFromTransaction(final BundleMigrationData bundleTransferData, final EntitlementEventSqlDao transactional,
final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final InternalCallContext context) throws EntityPersistenceException {
@@ -902,4 +964,20 @@ public class DefaultEntitlementDao implements EntitlementDao {
transBundleDao.create(new SubscriptionBundleModelDao(bundleData), context);
}
+
+ //
+ // Creates a copy of the existing subscriptions whose 'transitions' will reflect the new event
+ //
+ private SubscriptionData createSubscriptionWithNewEvent(final SubscriptionData subscription, EntitlementEvent newEvent) {
+
+ final SubscriptionData subscriptionWithNewEvent = new SubscriptionData(subscription, null, clock);
+ final List<EntitlementEvent> allEvents = new LinkedList<EntitlementEvent>();
+ if (subscriptionWithNewEvent.getEvents() != null) {
+ allEvents.addAll(subscriptionWithNewEvent.getEvents());
+ }
+ allEvents.add(newEvent);
+ subscriptionWithNewEvent.rebuildTransitions(allEvents, catalogService.getFullCatalog());
+ return subscriptionWithNewEvent;
+ }
+
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
index 1c17929..2b83ab2 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
@@ -20,7 +20,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
import com.ning.billing.entitlement.api.migration.AccountMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
@@ -46,17 +45,17 @@ public interface EntitlementDao {
public SubscriptionBundle createSubscriptionBundle(SubscriptionBundleData bundle, InternalCallContext context);
- public Subscription getSubscriptionFromId(SubscriptionFactory factory, UUID subscriptionId, InternalTenantContext context);
+ public Subscription getSubscriptionFromId(UUID subscriptionId, InternalTenantContext context);
// ACCOUNT retrieval
public UUID getAccountIdFromSubscriptionId(UUID subscriptionId, InternalTenantContext context);
// Subscription retrieval
- public Subscription getBaseSubscription(SubscriptionFactory factory, UUID bundleId, InternalTenantContext context);
+ public Subscription getBaseSubscription(UUID bundleId, InternalTenantContext context);
- public List<Subscription> getSubscriptions(SubscriptionFactory factory, UUID bundleId, InternalTenantContext context);
+ public List<Subscription> getSubscriptions(UUID bundleId, InternalTenantContext context);
- public List<Subscription> getSubscriptionsForAccountAndKey(SubscriptionFactory factory, UUID accountId, String bundleKey, InternalTenantContext context);
+ public List<Subscription> getSubscriptionsForAccountAndKey(UUID accountId, String bundleKey, InternalTenantContext context);
// Update
public void updateChargedThroughDate(SubscriptionData subscription, InternalCallContext context);
@@ -79,6 +78,8 @@ public interface EntitlementDao {
public void cancelSubscription(SubscriptionData subscription, EntitlementEvent cancelEvent, InternalCallContext context, int cancelSeq);
+ public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context);
+
public void uncancelSubscription(SubscriptionData subscription, List<EntitlementEvent> uncancelEvents, InternalCallContext context);
public void changePlan(SubscriptionData subscription, List<EntitlementEvent> changeEvents, InternalCallContext context);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java
index 0f0e18d..b208f18 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java
@@ -21,8 +21,8 @@ import java.util.UUID;
import org.joda.time.DateTime;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.util.dao.TableName;
import com.ning.billing.util.entity.EntityBase;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
index b4d3899..38695ac 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
@@ -27,7 +27,6 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
import com.ning.billing.entitlement.api.migration.AccountMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
@@ -176,6 +175,10 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
}
@Override
+ public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+ }
+
+ @Override
public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents, final InternalCallContext context) {
addEvents(subscription.getId(), changeEvents);
}
@@ -228,7 +231,7 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
}
@Override
- public Subscription getSubscriptionFromId(final SubscriptionFactory factory, final UUID subscriptionId, final InternalTenantContext context) {
+ public Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
throw new EntitlementError(NOT_IMPLEMENTED);
}
@@ -238,17 +241,17 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
}
@Override
- public Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+ public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) {
throw new EntitlementError(NOT_IMPLEMENTED);
}
@Override
- public List<Subscription> getSubscriptions(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+ public List<Subscription> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
throw new EntitlementError(NOT_IMPLEMENTED);
}
@Override
- public List<Subscription> getSubscriptionsForAccountAndKey(final SubscriptionFactory factory, final UUID accountId,
+ public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId,
final String bundleKey, final InternalTenantContext context) {
throw new EntitlementError(NOT_IMPLEMENTED);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java b/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
index 459cae7..f3dc0b9 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
@@ -24,7 +24,6 @@ import com.ning.billing.entitlement.alignment.MigrationPlanAligner;
import com.ning.billing.entitlement.alignment.PlanAligner;
import com.ning.billing.entitlement.api.EntitlementService;
import com.ning.billing.entitlement.api.SubscriptionApiService;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
import com.ning.billing.entitlement.api.migration.DefaultEntitlementMigrationApi;
import com.ning.billing.entitlement.api.migration.EntitlementMigrationApi;
import com.ning.billing.entitlement.api.svcs.DefaultEntitlementInternalApi;
@@ -32,12 +31,10 @@ import com.ning.billing.entitlement.api.timeline.DefaultEntitlementTimelineApi;
import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
import com.ning.billing.entitlement.api.timeline.RepairSubscriptionApiService;
-import com.ning.billing.entitlement.api.timeline.RepairSubscriptionFactory;
import com.ning.billing.entitlement.api.transfer.DefaultEntitlementTransferApi;
import com.ning.billing.entitlement.api.transfer.EntitlementTransferApi;
import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory;
import com.ning.billing.entitlement.api.user.EntitlementUserApi;
import com.ning.billing.entitlement.engine.addon.AddonUtils;
import com.ning.billing.entitlement.engine.core.Engine;
@@ -68,9 +65,6 @@ public class DefaultEntitlementModule extends AbstractModule implements Entitlem
protected void installEntitlementCore() {
- bind(SubscriptionFactory.class).annotatedWith(Names.named(REPAIR_NAMED)).to(RepairSubscriptionFactory.class).asEagerSingleton();
- bind(SubscriptionFactory.class).to(DefaultSubscriptionFactory.class).asEagerSingleton();
-
bind(SubscriptionApiService.class).annotatedWith(Names.named(REPAIR_NAMED)).to(RepairSubscriptionApiService.class).asEagerSingleton();
bind(SubscriptionApiService.class).to(DefaultSubscriptionApiService.class).asEagerSingleton();
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java b/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java
index 7a3ceaf..38c949f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java
@@ -33,8 +33,8 @@ import com.ning.billing.catalog.api.PhaseType;
import com.ning.billing.catalog.api.Plan;
import com.ning.billing.catalog.api.PriceListSet;
import com.ning.billing.catalog.io.VersionedCatalogLoader;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.util.config.CatalogConfig;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory;
import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
@@ -164,7 +164,7 @@ public class TestPlanAligner extends KillbillTestSuite {
}
private SubscriptionData createSubscriptionStartedInThePast(final String productName, final PhaseType phaseType) {
- final DefaultSubscriptionFactory.SubscriptionBuilder builder = new DefaultSubscriptionFactory.SubscriptionBuilder();
+ final SubscriptionBuilder builder = new SubscriptionBuilder();
builder.setBundleStartDate(clock.getUTCNow().minusHours(10));
// Make sure to set the dates apart
builder.setAlignStartDate(new DateTime(builder.getBundleStartDate().plusHours(5)));
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index 0b95f47..bd05062 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -275,6 +275,10 @@ public abstract class TestApiBase extends EntitlementTestSuiteWithEmbeddedDB imp
new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSet, null),
requestedDate == null ? clock.getUTCNow() : requestedDate, callContext);
assertNotNull(subscription);
+
+
+ //try {Thread.sleep(100000000); } catch (Exception e) {};
+
assertTrue(testListener.isCompleted(5000));
return subscription;
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
index db3df23..8052a4f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
@@ -342,7 +342,7 @@ public class TestRepairBP extends TestApiBaseRepair {
assertEquals(dryRunBaseSubscription.getActiveVersion(), SubscriptionEvents.INITIAL_VERSION);
assertEquals(dryRunBaseSubscription.getBundleId(), bundle.getId());
- assertEquals(dryRunBaseSubscription.getStartDate(), baseSubscription.getStartDate());
+ assertTrue(dryRunBaseSubscription.getStartDate().compareTo(baseSubscription.getStartDate()) == 0);
Plan currentPlan = dryRunBaseSubscription.getCurrentPlan();
assertNotNull(currentPlan);
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java
index 2f21fe2..cf792cc 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java
@@ -35,11 +35,11 @@ import com.ning.billing.catalog.api.PlanPhaseSpecifier;
import com.ning.billing.catalog.api.PriceListSet;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.entitlement.EntitlementTestSuite;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.SubscriptionTransitionType;
import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.ExistingEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.events.EntitlementEvent;
@@ -63,10 +63,10 @@ public class TestDefaultEntitlementTransferApi extends EntitlementTestSuite {
public void setUp() throws Exception {
final EntitlementDao dao = Mockito.mock(EntitlementDao.class);
final CatalogService catalogService = new MockCatalogService(new MockCatalog());
- final SubscriptionFactory subscriptionFactory = Mockito.mock(SubscriptionFactory.class);
+ final SubscriptionApiService apiService = Mockito.mock(SubscriptionApiService.class);
final EntitlementTimelineApi timelineApi = Mockito.mock(EntitlementTimelineApi.class);
final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(IDBI.class), clock);
- transferApi = new DefaultEntitlementTransferApi(clock, dao, timelineApi, catalogService, subscriptionFactory, internalCallContextFactory);
+ transferApi = new DefaultEntitlementTransferApi(clock, dao, timelineApi, catalogService, apiService, internalCallContextFactory);
}
@Test(groups = "fast")
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index 408a0f9..a7f3191 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -33,14 +33,13 @@ import org.slf4j.LoggerFactory;
import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.catalog.api.TimeUnit;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
import com.ning.billing.entitlement.api.migration.AccountMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
import com.ning.billing.entitlement.api.transfer.TransferCancelData;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
@@ -141,10 +140,10 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
}
@Override
- public Subscription getSubscriptionFromId(final SubscriptionFactory factory, final UUID subscriptionId, final InternalTenantContext context) {
+ public Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
for (final Subscription cur : subscriptions) {
if (cur.getId().equals(subscriptionId)) {
- return buildSubscription(factory, (SubscriptionData) cur, context);
+ return buildSubscription((SubscriptionData) cur, context);
}
}
return null;
@@ -156,11 +155,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
}
@Override
- public List<Subscription> getSubscriptionsForAccountAndKey(final SubscriptionFactory factory, final UUID accountId, final String bundleKey, final InternalTenantContext context) {
+ public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId, final String bundleKey, final InternalTenantContext context) {
for (final SubscriptionBundle cur : bundles) {
if (cur.getExternalKey().equals(bundleKey) && cur.getAccountId().equals(bundleKey)) {
- return getSubscriptions(factory, cur.getId(), context);
+ return getSubscriptions(cur.getId(), context);
}
}
return Collections.emptyList();
@@ -175,7 +174,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new EntitlementNotificationKey(cur.getId()), context);
}
}
- final Subscription updatedSubscription = buildSubscription(null, subscription, context);
+ final Subscription updatedSubscription = buildSubscription(subscription, context);
subscriptions.add(updatedSubscription);
}
@@ -190,11 +189,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
}
@Override
- public List<Subscription> getSubscriptions(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+ public List<Subscription> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
final List<Subscription> results = new ArrayList<Subscription>();
for (final Subscription cur : subscriptions) {
if (cur.getBundleId().equals(bundleId)) {
- results.add(buildSubscription(factory, (SubscriptionData) cur, context));
+ results.add(buildSubscription((SubscriptionData) cur, context));
}
}
return results;
@@ -229,11 +228,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
}
@Override
- public Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+ public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) {
for (final Subscription cur : subscriptions) {
if (cur.getBundleId().equals(bundleId) &&
cur.getCurrentPlan().getProduct().getCategory() == ProductCategory.BASE) {
- return buildSubscription(factory, (SubscriptionData) cur, context);
+ return buildSubscription((SubscriptionData) cur, context);
}
}
return null;
@@ -245,16 +244,13 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
insertEvent(nextPhase, context);
}
- private Subscription buildSubscription(final SubscriptionFactory factory, final SubscriptionData in, final InternalTenantContext context) {
- if (factory != null) {
- return factory.createSubscription(new SubscriptionBuilder(in), getEventsForSubscription(in.getId(), context));
- } else {
+ private Subscription buildSubscription(final SubscriptionData in, final InternalTenantContext context) {
final SubscriptionData subscription = new SubscriptionData(new SubscriptionBuilder(in), null, clock);
if (events.size() > 0) {
subscription.rebuildTransitions(getEventsForSubscription(in.getId(), context), catalogService.getFullCatalog());
}
return subscription;
- }
+
}
@Override
@@ -284,6 +280,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
}
@Override
+ public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+ synchronized (events) {
+ for (int i = 0; i < subscriptions.size(); i++) {
+ cancelSubscription(subscriptions.get(i), cancelEvents.get(i), context, 0);
+ }
+ }
+ }
+
+ @Override
public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents, final InternalCallContext context) {
synchronized (events) {
cancelNextChangeEvent(subscription.getId());
@@ -303,7 +308,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
}
private void cancelNextPhaseEvent(final UUID subscriptionId, final InternalTenantContext context) {
- final Subscription curSubscription = getSubscriptionFromId(null, subscriptionId, context);
+ final Subscription curSubscription = getSubscriptionFromId(subscriptionId, context);
if (curSubscription.getCurrentPhase() == null ||
curSubscription.getCurrentPhase().getDuration().getUnit() == TimeUnit.UNLIMITED) {
return;
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
index fff17a9..a580f01 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
@@ -17,6 +17,7 @@
package com.ning.billing.entitlement.glue;
import org.mockito.Mockito;
+import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.IDBI;
import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
@@ -28,6 +29,7 @@ import com.ning.billing.util.callcontext.MockCallContextSqlDao;
import com.ning.billing.util.glue.BusModule;
import com.ning.billing.util.glue.BusModule.BusType;
import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.google.inject.name.Names;
@@ -44,6 +46,12 @@ public class MockEngineModuleMemory extends MockEngineModule {
private void installNotificationQueue() {
bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
+ configureNotificationQueueConfig();
+ }
+
+ protected void configureNotificationQueueConfig() {
+ final NotificationQueueConfig config = new ConfigurationObjectFactory(System.getProperties()).build(NotificationQueueConfig.class);
+ bind(NotificationQueueConfig.class).toInstance(config);
}
protected void installDBI() {
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index fa5da08..3f43198 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -305,6 +305,11 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
return generator.generateInvoice(account, invoice, manualPay);
}
+ @Override
+ public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final CallContext context) {
+ dao.consumeExstingCBAOnAccountWithUnpaidInvoices(accountId, internalCallContextFactory.createInternalCallContext(accountId, context));
+ }
+
private void notifyBusOfInvoiceAdjustment(final UUID invoiceId, final UUID accountId, final InternalCallContext context) {
try {
eventBus.post(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId()), context);
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index 02b3ec5..f5aa44e 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -19,6 +19,7 @@ package com.ning.billing.invoice.dao;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -56,6 +57,7 @@ import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.collect.Ordering;
import com.google.inject.Inject;
public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, InvoiceApiException> implements InvoiceDao {
@@ -198,13 +200,16 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
transInvoiceItemSqlDao.create(invoiceItemModelDao, context);
}
+ // Now we check whether we generated any credit that could be used on some unpaid invoices
+ useExistingCBAFromTransaction(invoice.getAccountId(), entitySqlDaoWrapperFactory, context);
+
notifyOfFutureBillingEvents(entitySqlDaoWrapperFactory, invoice.getAccountId(), callbackDateTimePerSubscriptions, context.getUserToken());
// Create associated payments
final InvoicePaymentSqlDao invoicePaymentSqlDao = entitySqlDaoWrapperFactory.become(InvoicePaymentSqlDao.class);
invoicePaymentSqlDao.batchCreateFromTransaction(invoicePayments, context);
- }
+ }
return null;
}
});
@@ -258,18 +263,12 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<InvoiceModelDao>>() {
@Override
public List<InvoiceModelDao> inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
- final List<InvoiceModelDao> invoices = getAllInvoicesByAccountFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
- final Collection<InvoiceModelDao> unpaidInvoices = Collections2.filter(invoices, new Predicate<InvoiceModelDao>() {
- @Override
- public boolean apply(final InvoiceModelDao in) {
- return (InvoiceModelDaoHelper.getBalance(in).compareTo(BigDecimal.ZERO) >= 1) && (upToDate == null || !in.getTargetDate().isAfter(upToDate));
- }
- });
- return new ArrayList<InvoiceModelDao>(unpaidInvoices);
+ return getUnpaidInvoicesByAccountFromTransaction(accountId, entitySqlDaoWrapperFactory, upToDate, context);
}
});
}
+
@Override
public UUID getInvoiceIdByPaymentId(final UUID paymentId, final InternalTenantContext context) {
return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<UUID>() {
@@ -291,7 +290,6 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
}
@Override
-
public InvoicePaymentModelDao createRefund(final UUID paymentId, final BigDecimal requestedRefundAmount, final boolean isInvoiceAdjusted,
final Map<UUID, BigDecimal> invoiceItemIdsWithNullAmounts, final UUID paymentCookieId,
final InternalCallContext context)
@@ -367,6 +365,9 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
}
}
+ // Now we check whether we have any credit that could be used on some unpaid invoices (for which payment was just refunded)
+ useExistingCBAFromTransaction(invoice.getAccountId(), entitySqlDaoWrapperFactory, context);
+
// Notify the bus since the balance of the invoice changed
notifyBusOfInvoiceAdjustment(entitySqlDaoWrapperFactory, invoice.getId(), invoice.getAccountId(), context.getUserToken(), context);
@@ -495,18 +496,20 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
final InvoicePaymentModelDao payment = entitySqlDaoWrapperFactory.become(InvoicePaymentSqlDao.class).getById(invoicePaymentId.toString(), context);
if (payment == null) {
throw new InvoiceApiException(ErrorCode.INVOICE_PAYMENT_NOT_FOUND, invoicePaymentId.toString());
- } else {
- final InvoicePaymentModelDao chargeBack = new InvoicePaymentModelDao(UUID.randomUUID(), context.getCreatedDate(), InvoicePaymentType.CHARGED_BACK,
- payment.getInvoiceId(), payment.getPaymentId(), context.getCreatedDate(),
- requestedChargedBackAmout.negate(), payment.getCurrency(), null, payment.getId());
- transactional.create(chargeBack, context);
+ }
+ final InvoicePaymentModelDao chargeBack = new InvoicePaymentModelDao(UUID.randomUUID(), context.getCreatedDate(), InvoicePaymentType.CHARGED_BACK,
+ payment.getInvoiceId(), payment.getPaymentId(), context.getCreatedDate(),
+ requestedChargedBackAmout.negate(), payment.getCurrency(), null, payment.getId());
+ transactional.create(chargeBack, context);
- // Notify the bus since the balance of the invoice changed
- final UUID accountId = transactional.getAccountIdFromInvoicePaymentId(chargeBack.getId().toString(), context);
- notifyBusOfInvoiceAdjustment(entitySqlDaoWrapperFactory, payment.getInvoiceId(), accountId, context.getUserToken(), context);
+ // Notify the bus since the balance of the invoice changed
+ final UUID accountId = transactional.getAccountIdFromInvoicePaymentId(chargeBack.getId().toString(), context);
+ notifyBusOfInvoiceAdjustment(entitySqlDaoWrapperFactory, payment.getInvoiceId(), accountId, context.getUserToken(), context);
- return chargeBack;
- }
+ // Now we check whether we have any credit that could be used on some unpaid invoices (for which payment was just charged back)
+ useExistingCBAFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
+
+ return chargeBack;
}
});
}
@@ -624,18 +627,8 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
}
populateChildren(invoice, entitySqlDaoWrapperFactory, context);
- final BigDecimal accountCbaAvailable = getAccountCBAFromTransaction(invoice.getAccountId(), entitySqlDaoWrapperFactory, context);
- final BigDecimal balance = InvoiceModelDaoHelper.getBalance(invoice);
- if (accountCbaAvailable.compareTo(BigDecimal.ZERO) > 0 && balance.compareTo(BigDecimal.ZERO) > 0) {
- final BigDecimal cbaAmountToConsume = accountCbaAvailable.compareTo(balance) > 0 ? balance.negate() : accountCbaAvailable.negate();
- final InvoiceItemModelDao cbaAdjItem = new InvoiceItemModelDao(context.getCreatedDate(), InvoiceItemType.CBA_ADJ,
- invoice.getId(), invoice.getAccountId(),
- null, null, null, null,
- context.getCreatedDate().toLocalDate(),
- null, cbaAmountToConsume, null,
- invoice.getCurrency(), null);
- transInvoiceItemDao.create(cbaAdjItem, context);
- }
+ // Now we check whether we have any credit that could be used towards that charge
+ useExistingCBAFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
// Notify the bus since the balance of the invoice changed
notifyBusOfInvoiceAdjustment(entitySqlDaoWrapperFactory, invoiceId, accountId, context.getUserToken(), context);
@@ -798,6 +791,68 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
});
}
+ public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final InternalCallContext context) {
+ transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+ @Override
+ public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+ useExistingCBAFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
+ return null;
+ }
+ });
+ }
+
+ private void useExistingCBAFromTransaction(final UUID accountId, final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final InternalCallContext context) throws InvoiceApiException, EntityPersistenceException {
+
+ final BigDecimal accountCBA = getAccountCBAFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
+ if (accountCBA.compareTo(BigDecimal.ZERO) <= 0) {
+ return;
+ }
+
+ final List<InvoiceModelDao> unpaidInvoices = getUnpaidInvoicesByAccountFromTransaction(accountId, entitySqlDaoWrapperFactory, null, context);
+ // We order the same os BillingStateCalculator-- should really share the comparator
+ final List<InvoiceModelDao> orderedUnpaidInvoices = Ordering.from(new Comparator<InvoiceModelDao>() {
+ @Override
+ public int compare(final InvoiceModelDao i1, final InvoiceModelDao i2) {
+ return i1.getInvoiceDate().compareTo(i2.getInvoiceDate());
+ }
+ }).immutableSortedCopy(unpaidInvoices);
+
+ BigDecimal remainingAccountCBA = accountCBA;
+ for (InvoiceModelDao cur : orderedUnpaidInvoices) {
+ final BigDecimal curInvoiceBalance = InvoiceModelDaoHelper.getBalance(cur);
+ final BigDecimal cbaToApplyOnInvoice = remainingAccountCBA.compareTo(curInvoiceBalance) <= 0 ? remainingAccountCBA : curInvoiceBalance;
+ remainingAccountCBA = remainingAccountCBA.subtract(cbaToApplyOnInvoice);
+
+
+ final InvoiceItemModelDao cbaAdjItem = new InvoiceItemModelDao(context.getCreatedDate(), InvoiceItemType.CBA_ADJ,
+ cur.getId(), cur.getAccountId(),
+ null, null, null, null,
+ context.getCreatedDate().toLocalDate(),
+ null, cbaToApplyOnInvoice.negate(), null,
+ cur.getCurrency(), null);
+
+ final InvoiceItemSqlDao transInvoiceItemDao = entitySqlDaoWrapperFactory.become(InvoiceItemSqlDao.class);
+ transInvoiceItemDao.create(cbaAdjItem, context);
+
+ if (remainingAccountCBA.compareTo(BigDecimal.ZERO) <= 0) {
+ break;
+ }
+ }
+ }
+
+
+ private List<InvoiceModelDao> getUnpaidInvoicesByAccountFromTransaction(final UUID accountId, final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final LocalDate upToDate, final InternalTenantContext context) {
+ final List<InvoiceModelDao> invoices = getAllInvoicesByAccountFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
+ final Collection<InvoiceModelDao> unpaidInvoices = Collections2.filter(invoices, new Predicate<InvoiceModelDao>() {
+ @Override
+ public boolean apply(final InvoiceModelDao in) {
+ return (InvoiceModelDaoHelper.getBalance(in).compareTo(BigDecimal.ZERO) >= 1) && (upToDate == null || !in.getTargetDate().isAfter(upToDate));
+ }
+ });
+ return new ArrayList<InvoiceModelDao>(unpaidInvoices);
+ }
+
+
/**
* Create an adjustment for a given invoice item. This just creates the object in memory, it doesn't write it to disk.
*
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java
index dbaa474..3e76427 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java
@@ -163,4 +163,11 @@ public interface InvoiceDao {
void deleteCBA(UUID accountId, UUID invoiceId, UUID invoiceItemId, InternalCallContext context) throws InvoiceApiException;
void notifyOfPayment(InvoicePaymentModelDao invoicePayment, InternalCallContext context);
+
+ /**
+ *
+ * @param accountId the account for which we need to rebalance the CBA
+ * @param context the callContext
+ */
+ public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final InternalCallContext context);
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 757859d..464376d 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -68,17 +68,6 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
@Override
public void initialize() throws NotificationQueueAlreadyExists {
- final NotificationConfig notificationConfig = new NotificationConfig() {
- @Override
- public long getSleepTimeMs() {
- return config.getSleepTimeMs();
- }
-
- @Override
- public boolean isNotificationProcessingOff() {
- return config.isNotificationProcessingOff();
- }
- };
final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
@Override
@@ -108,8 +97,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
nextBillingQueue = notificationQueueService.createNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
NEXT_BILLING_DATE_NOTIFIER_QUEUE,
- notificationQueueHandler,
- notificationConfig);
+ notificationQueueHandler);
}
@Override
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
index 8ca2f72..39e6e1d 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
@@ -67,16 +67,6 @@ public class InvoiceDaoTestBase extends InvoicingTestBase {
private final InvoiceConfig invoiceConfig = new InvoiceConfig() {
@Override
- public long getSleepTimeMs() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isNotificationProcessingOff() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getNumberOfMonthsInFuture() {
return 36;
}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
index 6861d13..9371dce 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
@@ -182,6 +182,10 @@ public class MockInvoiceDao implements InvoiceDao {
}
@Override
+ public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final InternalCallContext context) {
+ }
+
+ @Override
public BigDecimal getAccountBalance(final UUID accountId, final InternalTenantContext context) {
BigDecimal balance = BigDecimal.ZERO;
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/TestInvoiceDao.java b/invoice/src/test/java/com/ning/billing/invoice/dao/TestInvoiceDao.java
index 43269b5..736bfe3 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/TestInvoiceDao.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/TestInvoiceDao.java
@@ -631,14 +631,16 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
final boolean partialRefund = refundAmount.compareTo(amount) < 0;
final BigDecimal cba = invoiceDao.getAccountCBA(accountId, internalCallContext);
final InvoiceModelDao savedInvoice = invoiceDao.getById(invoice.getId(), internalCallContext);
- assertEquals(cba.compareTo(new BigDecimal("20.0")), 0);
+
+ final BigDecimal expectedCba = balance.compareTo(BigDecimal.ZERO) < 0 ? balance.negate() : BigDecimal.ZERO;
+ assertEquals(cba.compareTo(expectedCba), 0);
if (partialRefund) {
// IB = 20 (rec) - 20 (repair) + 20 (cba) - (20 -7) = 7; AB = IB - CBA = 7 - 20 = -13
assertEquals(balance.compareTo(new BigDecimal("-13.0")), 0);
- assertEquals(savedInvoice.getInvoiceItems().size(), 3);
+ assertEquals(savedInvoice.getInvoiceItems().size(), 4);
} else {
assertEquals(balance.compareTo(new BigDecimal("0.0")), 0);
- assertEquals(savedInvoice.getInvoiceItems().size(), 3);
+ assertEquals(savedInvoice.getInvoiceItems().size(), 4);
}
}
@@ -730,7 +732,8 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
balance = invoiceDao.getAccountBalance(accountId, internalCallContext);
assertEquals(balance.compareTo(expectedFinalBalance), 0);
cba = invoiceDao.getAccountCBA(accountId, internalCallContext);
- assertEquals(cba.compareTo(new BigDecimal("10.00")), 0);
+ final BigDecimal expectedCba = balance.compareTo(BigDecimal.ZERO) < 0 ? balance.negate() : BigDecimal.ZERO;
+ assertEquals(cba.compareTo(expectedCba), 0);
}
@Test(groups = "slow")
@@ -738,13 +741,8 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
final UUID accountId = UUID.randomUUID();
final UUID bundleId = UUID.randomUUID();
- final LocalDate targetDate1 = new LocalDate(2011, 10, 6);
- final Invoice invoice1 = new DefaultInvoice(accountId, clock.getUTCToday(), targetDate1, Currency.USD);
- createInvoice(invoice1, true, internalCallContext);
- // CREATE INVOICE WITH A (just) CBA. Should not happen, but that does not matter for that test
- final CreditBalanceAdjInvoiceItem cbaItem = new CreditBalanceAdjInvoiceItem(invoice1.getId(), accountId, new LocalDate(), new BigDecimal("20.0"), Currency.USD);
- createInvoiceItem(cbaItem, internalCallContext);
+ invoiceDao.insertCredit(accountId, null, new BigDecimal("20.0"), new LocalDate(), Currency.USD, internalCallContext);
final InvoiceItemModelDao charge = invoiceDao.insertExternalCharge(accountId, null, bundleId, "bla", new BigDecimal("15.0"), clock.getUTCNow().toLocalDate(), Currency.USD, internalCallContext);
@@ -1358,7 +1356,7 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
}
@Test(groups = "slow")
- public void testDeleteCBAPartiallyConsumed() throws Exception {
+ public void testRefundWithCBAPartiallyConsumed() throws Exception {
final UUID accountId = UUID.randomUUID();
// Create invoice 1
@@ -1377,6 +1375,12 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
final CreditBalanceAdjInvoiceItem creditBalanceAdjInvoiceItem1 = new CreditBalanceAdjInvoiceItem(fixedItem1.getInvoiceId(), fixedItem1.getAccountId(),
fixedItem1.getStartDate(), fixedItem1.getAmount(),
fixedItem1.getCurrency());
+
+ final UUID paymentId = UUID.randomUUID();
+ final DefaultInvoicePayment defaultInvoicePayment = new DefaultInvoicePayment(InvoicePaymentType.ATTEMPT, paymentId, invoice1.getId(), clock.getUTCNow().plusDays(12), new BigDecimal("10.0"), Currency.USD);
+
+ invoiceDao.notifyOfPayment(new InvoicePaymentModelDao(defaultInvoicePayment), internalCallContext);
+
createInvoice(invoice1, true, internalCallContext);
createInvoiceItem(fixedItem1, internalCallContext);
createInvoiceItem(repairAdjInvoiceItem, internalCallContext);
@@ -1398,20 +1402,20 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
// Verify scenario - half of the CBA should have been used
Assert.assertEquals(invoiceDao.getAccountCBA(accountId, internalCallContext).doubleValue(), 5.00);
- verifyInvoice(invoice1.getId(), 10.00, 10.00);
+ verifyInvoice(invoice1.getId(), 0.00, 10.00);
verifyInvoice(invoice2.getId(), 0.00, -5.00);
- // Delete the CBA on invoice 1
- invoiceDao.deleteCBA(accountId, invoice1.getId(), creditBalanceAdjInvoiceItem1.getId(), internalCallContext);
+ // Refund Payment before we can deleted CBA
+ invoiceDao.createRefund(paymentId, new BigDecimal("10.0"), false, ImmutableMap.<UUID,BigDecimal>of(), UUID.randomUUID(), internalCallContext);
// Verify all three invoices were affected
Assert.assertEquals(invoiceDao.getAccountCBA(accountId, internalCallContext).doubleValue(), 0.00);
- verifyInvoice(invoice1.getId(), 0.00, 0.00);
- verifyInvoice(invoice2.getId(), 5.00, 0.00);
+ verifyInvoice(invoice1.getId(), 5.00, 5.00);
+ verifyInvoice(invoice2.getId(), 0.00, -5.00);
}
@Test(groups = "slow")
- public void testDeleteCBAFullyConsumedTwice() throws Exception {
+ public void testRefundCBAFullyConsumedTwice() throws Exception {
final UUID accountId = UUID.randomUUID();
// Create invoice 1
@@ -1435,6 +1439,13 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
createInvoiceItem(repairAdjInvoiceItem, internalCallContext);
createInvoiceItem(creditBalanceAdjInvoiceItem1, internalCallContext);
+
+ final BigDecimal paymentAmount = new BigDecimal("10.00");
+ final UUID paymentId = UUID.randomUUID();
+
+ final DefaultInvoicePayment defaultInvoicePayment = new DefaultInvoicePayment(InvoicePaymentType.ATTEMPT, paymentId, invoice1.getId(), clock.getUTCNow().plusDays(12), paymentAmount, Currency.USD);
+ invoiceDao.notifyOfPayment(new InvoicePaymentModelDao(defaultInvoicePayment), internalCallContext);
+
// Create invoice 2
// Scenario: single item
// * $5 item
@@ -1465,18 +1476,17 @@ public class TestInvoiceDao extends InvoiceDaoTestBase {
// Verify scenario - all CBA should have been used
Assert.assertEquals(invoiceDao.getAccountCBA(accountId, internalCallContext).doubleValue(), 0.00);
- verifyInvoice(invoice1.getId(), 10.00, 10.00);
+ verifyInvoice(invoice1.getId(), 0.00, 10.00);
verifyInvoice(invoice2.getId(), 0.00, -5.00);
verifyInvoice(invoice3.getId(), 0.00, -5.00);
- // Delete the CBA on invoice 1
- invoiceDao.deleteCBA(accountId, invoice1.getId(), creditBalanceAdjInvoiceItem1.getId(), internalCallContext);
+ invoiceDao.createRefund(paymentId, paymentAmount, false, ImmutableMap.<UUID, BigDecimal>of(), UUID.randomUUID(), internalCallContext);
// Verify all three invoices were affected
Assert.assertEquals(invoiceDao.getAccountCBA(accountId, internalCallContext).doubleValue(), 0.00);
- verifyInvoice(invoice1.getId(), 0.00, 0.00);
- verifyInvoice(invoice2.getId(), 5.00, 0.00);
- verifyInvoice(invoice3.getId(), 5.00, 0.00);
+ verifyInvoice(invoice1.getId(), 10.00, 10.00);
+ verifyInvoice(invoice2.getId(), 0.00, -5.00);
+ verifyInvoice(invoice3.getId(), 0.00, -5.00);
}
@Test(groups = "slow")
diff --git a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java
index 5864d95..9c4087a 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java
@@ -82,11 +82,6 @@ public class TestDefaultInvoiceGenerator extends InvoicingTestBase {
final Clock clock = new DefaultClock();
final InvoiceConfig invoiceConfig = new InvoiceConfig() {
@Override
- public long getSleepTimeMs() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getNumberOfMonthsInFuture() {
return 36;
}
@@ -95,11 +90,6 @@ public class TestDefaultInvoiceGenerator extends InvoicingTestBase {
public boolean isEmailNotificationsEnabled() {
return false;
}
-
- @Override
- public boolean isNotificationProcessingOff() {
- throw new UnsupportedOperationException();
- }
};
this.generator = new DefaultInvoiceGenerator(clock, invoiceConfig);
}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java
index 1fd99d3..c5c7168 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java
@@ -67,21 +67,11 @@ public class TestDefaultInvoiceGeneratorUnit extends InvoicingTestBase {
clock = new ClockMock();
gen = new TestDefaultInvoiceGeneratorMock(clock, new InvoiceConfig() {
@Override
- public boolean isNotificationProcessingOff() {
- return false;
- }
-
- @Override
public boolean isEmailNotificationsEnabled() {
return false;
}
@Override
- public long getSleepTimeMs() {
- return 100;
- }
-
- @Override
public int getNumberOfMonthsInFuture() {
return 5;
}
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
index 0162c8f..f07276d 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
@@ -332,6 +332,27 @@ public class AccountResource extends JaxRsResourceBase {
return Response.status(Status.OK).build();
}
+
+ /*
+ * ************************** INVOICE CBA REBALANCING ********************************
+ */
+ @POST
+ @Path("/{accountId:" + UUID_PATTERN + "}/" + CBA_REBALANCING)
+ @Consumes(APPLICATION_JSON)
+ @Produces(APPLICATION_JSON)
+ public Response rebalanceExistingCBAOnAccount(@PathParam("accountId") final String accountIdString,
+ @HeaderParam(HDR_CREATED_BY) final String createdBy,
+ @HeaderParam(HDR_REASON) final String reason,
+ @HeaderParam(HDR_COMMENT) final String comment,
+ @javax.ws.rs.core.Context final HttpServletRequest request) throws AccountApiException {
+
+ final CallContext callContext = context.createContext(createdBy, reason, comment, request);
+ final UUID accountId = UUID.fromString(accountIdString);
+
+ invoiceApi.consumeExstingCBAOnAccountWithUnpaidInvoices(accountId, callContext);
+ return Response.status(Status.OK).build();
+ }
+
/*
* ************************** PAYMENTS ********************************
*/
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
index 8384cf9..7d0ec23 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
@@ -132,4 +132,7 @@ public interface JaxrsResource {
public static final String EXPORT = "export";
public static final String EXPORT_PATH = PREFIX + "/" + EXPORT;
+
+ public static final String CBA_REBALANCING = "cbaRebalancing";
+
}
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
index 67e783e..d107468 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -56,18 +56,6 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
@Override
public void initialize() {
- final NotificationConfig notificationConfig = new NotificationConfig() {
- @Override
- public boolean isNotificationProcessingOff() {
- return config.isNotificationProcessingOff();
- }
-
- @Override
- public long getSleepTimeMs() {
- return config.getSleepTimeMs();
- }
- };
-
final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
@Override
public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
@@ -89,8 +77,7 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
try {
overdueQueue = notificationQueueService.createNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
OVERDUE_CHECK_NOTIFIER_QUEUE,
- notificationQueueHandler,
- notificationConfig);
+ notificationQueueHandler);
} catch (NotificationQueueAlreadyExists e) {
throw new RuntimeException(e);
}
diff --git a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
index 34aca06..9f0955c 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
@@ -66,6 +66,7 @@ import com.ning.billing.util.email.templates.TemplateModule;
import com.ning.billing.util.globallocker.GlobalLocker;
import com.ning.billing.util.globallocker.MySqlGlobalLocker;
import com.ning.billing.util.glue.BusModule;
+import com.ning.billing.util.glue.NotificationQueueModule;
import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.ning.billing.util.svcapi.account.AccountInternalApi;
@@ -120,7 +121,6 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
super.configure();
bind(Clock.class).to(ClockMock.class).asEagerSingleton();
bind(CallContextFactory.class).to(DefaultCallContextFactory.class).asEagerSingleton();
- bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
final InvoiceConfig invoiceConfig = new ConfigurationObjectFactory(System.getProperties()).build(InvoiceConfig.class);
bind(InvoiceConfig.class).toInstance(invoiceConfig);
final CatalogConfig catalogConfig = new ConfigurationObjectFactory(System.getProperties()).build(CatalogConfig.class);
@@ -136,7 +136,7 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
install(new MockJunctionModule());
install(new EmailModule());
install(new TemplateModule());
-
+ install(new NotificationQueueModule());
final AccountInternalApi accountApi = Mockito.mock(AccountInternalApi.class);
bind(AccountInternalApi.class).toInstance(accountApi);
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
index b45819d..5ffb060 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
@@ -39,7 +39,7 @@ public class AutoPayRetryService extends BaseRetryService implements RetryServic
final PaymentConfig config,
final PaymentProcessor paymentProcessor,
final InternalCallContextFactory internalCallContextFactory) {
- super(notificationQueueService, config, internalCallContextFactory);
+ super(notificationQueueService, internalCallContextFactory);
this.paymentProcessor = paymentProcessor;
}
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
index d1b78cc..c2cda2d 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
@@ -47,16 +47,13 @@ public abstract class BaseRetryService implements RetryService {
private static final String PAYMENT_RETRY_SERVICE = "PaymentRetryService";
private final NotificationQueueService notificationQueueService;
- private final PaymentConfig config;
private final InternalCallContextFactory internalCallContextFactory;
private NotificationQueue retryQueue;
public BaseRetryService(final NotificationQueueService notificationQueueService,
- final PaymentConfig config,
final InternalCallContextFactory internalCallContextFactory) {
this.notificationQueueService = notificationQueueService;
- this.config = config;
this.internalCallContextFactory = internalCallContextFactory;
}
@@ -75,8 +72,7 @@ public abstract class BaseRetryService implements RetryService {
final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
retry(key.getUuidKey(), callContext);
}
- },
- config);
+ });
}
@Override
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
index 250987b..c9a80a1 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
@@ -45,7 +45,7 @@ public class FailedPaymentRetryService extends BaseRetryService implements Retry
final PaymentConfig config,
final PaymentProcessor paymentProcessor,
final InternalCallContextFactory internalCallContextFactory) {
- super(notificationQueueService, config, internalCallContextFactory);
+ super(notificationQueueService, internalCallContextFactory);
this.paymentProcessor = paymentProcessor;
}
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
index 9217abd..2e76ea2 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
@@ -43,10 +43,9 @@ public class PluginFailureRetryService extends BaseRetryService implements Retry
@Inject
public PluginFailureRetryService(final NotificationQueueService notificationQueueService,
- final PaymentConfig config,
final PaymentProcessor paymentProcessor,
final InternalCallContextFactory internalCallContextFactory) {
- super(notificationQueueService, config, internalCallContextFactory);
+ super(notificationQueueService, internalCallContextFactory);
this.paymentProcessor = paymentProcessor;
}
diff --git a/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java b/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java
index a01dda5..0da7d2b 100644
--- a/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java
+++ b/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java
@@ -16,8 +16,6 @@
package com.ning.billing.payment.glue;
-import static org.testng.Assert.assertNotNull;
-
import java.io.IOException;
import java.net.URL;
import java.util.Properties;
@@ -28,7 +26,6 @@ import org.skife.config.SimplePropertyConfigSource;
import org.skife.jdbi.v2.IDBI;
import com.ning.billing.ObjectType;
-import com.ning.billing.util.config.PaymentConfig;
import com.ning.billing.mock.glue.MockInvoiceModule;
import com.ning.billing.mock.glue.MockNotificationQueueModule;
import com.ning.billing.payment.dao.MockPaymentDao;
@@ -37,6 +34,7 @@ import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
import com.ning.billing.util.callcontext.CallContextSqlDao;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.callcontext.MockCallContextSqlDao;
+import com.ning.billing.util.config.PaymentConfig;
import com.ning.billing.util.globallocker.GlobalLocker;
import com.ning.billing.util.globallocker.MockGlobalLocker;
import com.ning.billing.util.glue.BusModule;
@@ -46,9 +44,11 @@ import com.ning.billing.util.svcapi.tag.TagInternalApi;
import com.ning.billing.util.tag.Tag;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+
+import static org.testng.Assert.assertNotNull;
public class PaymentTestModuleWithMocks extends PaymentModule {
+
public static final String PLUGIN_TEST_NAME = "my-mock";
private void loadSystemPropertiesFromClasspath(final String resource) {
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
index 57122fe..f2fa6ab 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
@@ -59,6 +59,8 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
private final String hostname;
private final InternalCallContextFactory internalCallContextFactory;
+ private volatile boolean isStarted;
+
private static final class EventBusDelegate extends EventBus {
public EventBusDelegate(final String busName) {
@@ -95,16 +97,19 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
this.hostname = Hostname.get();
this.internalCallContextFactory = internalCallContextFactory;
+ this.isStarted = false;
}
@Override
public void start() {
startQueue();
+ isStarted = true;
}
@Override
public void stop() {
stopQueue();
+ isStarted = false;
}
@Override
@@ -130,6 +135,11 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
return result;
}
+ @Override
+ public boolean isStarted() {
+ return isStarted;
+ }
+
private List<BusEventEntry> getNextBusEvent(final InternalCallContext context) {
final Date now = clock.getUTCNow().toDate();
final Date nextAvailable = clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
diff --git a/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java b/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java
index aad7427..30c0ebd 100644
--- a/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java
@@ -19,14 +19,5 @@ package com.ning.billing.util.config;
import org.skife.config.Config;
import org.skife.config.Default;
-public interface EntitlementConfig extends NotificationConfig, KillbillConfig {
- @Override
- @Config("killbill.entitlement.engine.notifications.sleep")
- @Default("500")
- public long getSleepTimeMs();
-
- @Override
- @Config("killbill.entitlement.engine.notifications.off")
- @Default("false")
- public boolean isNotificationProcessingOff();
+public interface EntitlementConfig extends KillbillConfig {
}
diff --git a/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java b/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java
index a1f8d7b..73db4fc 100644
--- a/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java
@@ -19,17 +19,7 @@ package com.ning.billing.util.config;
import org.skife.config.Config;
import org.skife.config.Default;
-public interface InvoiceConfig extends NotificationConfig, KillbillConfig {
- @Override
- @Config("killbill.invoice.engine.notifications.sleep")
- @Default("500")
- public long getSleepTimeMs();
-
- @Override
- @Config("killbill.invoice.engine.notifications.off")
- @Default("false")
- public boolean isNotificationProcessingOff();
-
+public interface InvoiceConfig extends KillbillConfig {
@Config("killbill.invoice.maxNumberOfMonthsInFuture")
@Default("36")
public int getNumberOfMonthsInFuture();
diff --git a/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java b/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java
index 16a6e4b..64ae12d 100644
--- a/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java
@@ -22,7 +22,7 @@ import org.skife.config.Config;
import org.skife.config.Default;
-public interface PaymentConfig extends NotificationConfig, KillbillConfig {
+public interface PaymentConfig extends KillbillConfig {
@Config("killbill.payment.provider.default")
@Default("noop")
public String getDefaultPaymentProvider();
@@ -43,16 +43,6 @@ public interface PaymentConfig extends NotificationConfig, KillbillConfig {
@Default("8")
public int getPluginFailureRetryMaxAttempts();
- @Override
- @Config("killbill.payment.engine.notifications.sleep")
- @Default("500")
- public long getSleepTimeMs();
-
- @Override
- @Config("killbill.payment.engine.notifications.off")
- @Default("false")
- public boolean isNotificationProcessingOff();
-
@Config("killbill.payment.off")
@Default("false")
public boolean isPaymentOff();
diff --git a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
index 557a6e1..7e4e1c9 100644
--- a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
@@ -16,15 +16,25 @@
package com.ning.billing.util.glue;
+import org.skife.config.ConfigurationObjectFactory;
+
+import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.google.inject.AbstractModule;
public class NotificationQueueModule extends AbstractModule {
+
+ protected void configureNotificationQueueConfig() {
+ final NotificationQueueConfig config = new ConfigurationObjectFactory(System.getProperties()).build(NotificationQueueConfig.class);
+ bind(NotificationQueueConfig.class).toInstance(config);
+ }
@Override
protected void configure() {
bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+ configureNotificationQueueConfig();
}
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 66d6688..0a295ae 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -55,7 +55,6 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
public List<Notification> getReadyNotifications(@Bind("now") Date now,
@Bind("owner") String owner,
@Bind("max") int max,
- @Bind("queueName") String queueName,
@InternalTenantContextBinder final InternalTenantContext context);
@SqlQuery
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index b054d93..2780faa 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -17,14 +17,10 @@
package com.ning.billing.util.notificationq;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import javax.annotation.Nullable;
-
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
@@ -32,10 +28,11 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.ning.billing.util.callcontext.CallOrigin;
+import com.ning.billing.util.Hostname;
+import com.ning.billing.util.config.NotificationConfig;
+
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
-import com.ning.billing.util.callcontext.UserType;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.entity.dao.EntitySqlDao;
@@ -43,51 +40,41 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
-public class DefaultNotificationQueue extends NotificationQueueBase {
+public class DefaultNotificationQueue implements NotificationQueue {
private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
private final IDBI dbi;
private final NotificationSqlDao dao;
- private final InternalCallContextFactory internalCallContextFactory;
+ private final String hostname;
- public DefaultNotificationQueue(final IDBI dbi, final Clock clock, final String svcName, final String queueName,
- final NotificationQueueHandler handler, final NotificationConfig config,
- final InternalCallContextFactory internalCallContextFactory) {
- super(clock, svcName, queueName, handler, config);
- this.dbi = dbi;
- this.dao = dbi.onDemand(NotificationSqlDao.class);
- this.internalCallContextFactory = internalCallContextFactory;
- }
+ private final String svcName;
+ private final String queueName;
- @Override
- public int doProcessEvents() {
- logDebug("ENTER doProcessEvents");
- // Finding and claiming notifications is not done per tenant (yet?)
- final List<Notification> notifications = getReadyNotifications(createCallContext(null, null, null));
- if (notifications.size() == 0) {
- logDebug("EXIT doProcessEvents");
- return 0;
- }
+ private final ObjectMapper objectMapper;
- logDebug("START processing %d events at time %s", notifications.size(), getClock().getUTCNow().toDate());
-
- int result = 0;
- for (final Notification cur : notifications) {
- getNbProcessedEvents().incrementAndGet();
- logDebug("handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
- final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
- getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getFutureUserToken(), cur.getAccountRecordId(), cur.getTenantRecordId());
- result++;
- clearNotification(cur, createCallContext(cur.getUserToken(), cur.getTenantRecordId(), cur.getAccountRecordId()));
- logDebug("done handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
- }
+ private final NotificationQueueHandler handler;
+
+ private final NotificationQueueService notificationQueueService;
- return result;
+ private volatile boolean isStarted;
+
+ public DefaultNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler,
+ final IDBI dbi, final NotificationQueueService notificationQueueService) {
+ this.svcName = svcName;
+ this.queueName = queueName;
+ this.handler = handler;
+ this.dbi = dbi;
+ this.dao = dbi.onDemand(NotificationSqlDao.class);
+ this.hostname = Hostname.get();
+ this.notificationQueueService = notificationQueueService;
+ this.objectMapper = new ObjectMapper();
}
+
@Override
public void recordFutureNotification(final DateTime futureNotificationTime,
final NotificationKey notificationKey,
@@ -109,54 +96,14 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
final NotificationSqlDao thisDao,
final InternalCallContext context) throws IOException {
final String json = objectMapper.writeValueAsString(notificationKey);
- // Create a new user token. This will be used in the future, when this notification is triggered, to trace
- // generated bus events
final UUID futureUserToken = UUID.randomUUID();
- final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json,
- context.getUserToken(), futureUserToken, futureNotificationTime,
- context.getAccountRecordId(), context.getTenantRecordId());
- thisDao.insertNotification(notification, context);
- }
- private void clearNotification(final Notification cleared, final InternalCallContext context) {
- dao.clearNotification(cleared.getId().toString(), getHostname(), context);
+ final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.getClass().getName(), json,
+ context.getUserToken(), futureUserToken, futureNotificationTime, context.getAccountRecordId(), context.getTenantRecordId());
+ thisDao.insertNotification(notification, context);
}
- private List<Notification> getReadyNotifications(final InternalCallContext context) {
- final Date now = getClock().getUTCNow().toDate();
- final Date nextAvailable = getClock().getUTCNow().plus(CLAIM_TIME_MS).toDate();
- final List<Notification> input = dao.getReadyNotifications(now, getHostname(), CLAIM_TIME_MS, getFullQName(), context);
-
- final List<Notification> claimedNotifications = new ArrayList<Notification>();
- for (final Notification cur : input) {
- logDebug("about to claim notification %s, key = %s for time %s",
- cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
-
- final boolean claimed = (dao.claimNotification(getHostname(), nextAvailable, cur.getId().toString(), now, context) == 1);
- logDebug("claimed notification %s, key = %s for time %s result = %s",
- cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate(), claimed);
- if (claimed) {
- claimedNotifications.add(cur);
- dao.insertClaimedHistory(getHostname(), now, cur.getId().toString(), context);
- }
- }
-
- for (final Notification cur : claimedNotifications) {
- if (cur.getOwner() != null && !cur.getOwner().equals(getHostname())) {
- log.warn("NotificationQueue {} stealing notification {} from {}", new Object[]{getFullQName(), cur, cur.getOwner()});
- }
- }
-
- return claimedNotifications;
- }
-
- private void logDebug(final String format, final Object... args) {
- if (log.isDebugEnabled()) {
- final String realDebug = String.format(format, args);
- log.debug(String.format("Thread %d [queue = %s] %s", Thread.currentThread().getId(), getFullQName(), realDebug));
- }
- }
@Override
public void removeNotificationsByKey(final NotificationKey notificationKey, final InternalCallContext context) {
@@ -190,7 +137,42 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
dao.removeNotification(notificationId.toString(), context);
}
- private InternalCallContext createCallContext(final UUID userToken, @Nullable final Long tenantRecordId, @Nullable final Long accountRecordId) {
- return internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "NotificationQueue", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
+ @Override
+ public String getFullQName() {
+ return NotificationQueueServiceBase.getCompositeName(svcName, queueName);
+ }
+
+ @Override
+ public String getServiceName() {
+ return svcName;
+ }
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
+
+ @Override
+ public NotificationQueueHandler getHandler() {
+ return handler;
+ }
+
+ @Override
+ public void startQueue() {
+ notificationQueueService.startQueue();
+ isStarted = true;
+ }
+
+ @Override
+ public void stopQueue() {
+ // Order matters...
+ isStarted = false;
+ notificationQueueService.stopQueue();
+ }
+
+ @Override
+ public boolean isStarted() {
+ return isStarted;
}
+
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index 350a140..d56023c 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -21,25 +21,26 @@ import org.skife.jdbi.v2.IDBI;
import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
import com.google.inject.Inject;
public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
+
private final IDBI dbi;
- private final InternalCallContextFactory internalCallContextFactory;
@Inject
- public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final InternalCallContextFactory internalCallContextFactory) {
- super(clock);
+ public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final NotificationQueueConfig config,
+ final InternalCallContextFactory internalCallContextFactory) {
+ super(clock, config, dbi, internalCallContextFactory);
this.dbi = dbi;
- this.internalCallContextFactory = internalCallContextFactory;
}
+
@Override
protected NotificationQueue createNotificationQueueInternal(final String svcName,
final String queueName,
- final NotificationQueueHandler handler,
- final NotificationConfig config) {
- return new DefaultNotificationQueue(dbi, clock, svcName, queueName, handler, config, internalCallContextFactory);
+ final NotificationQueueHandler handler) {
+ return new DefaultNotificationQueue(svcName, queueName, handler, dbi, this);
}
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index 22cb36a..58b49c4 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -25,6 +25,7 @@ import org.joda.time.DateTime;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.entity.dao.EntitySqlDao;
import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
import com.ning.billing.util.queue.QueueLifecycle;
public interface NotificationQueue extends QueueLifecycle {
@@ -66,13 +67,6 @@ public interface NotificationQueue extends QueueLifecycle {
public void removeNotification(final UUID notificationId,
final InternalCallContext context);
- /**
- * This is only valid when the queue has been configured with isNotificationProcessingOff is true
- * In which case, it will callback users for all the ready notifications.
- *
- * @return the number of entries we processed
- */
- public int processReadyNotification();
/**
* @return the name of that queue
@@ -88,4 +82,6 @@ public interface NotificationQueue extends QueueLifecycle {
* @return the queue name associated
*/
public String getQueueName();
+
+ public NotificationQueueHandler getHandler();
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index f83a1ec..cae7775 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -21,8 +21,9 @@ import java.util.UUID;
import org.joda.time.DateTime;
import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.queue.QueueLifecycle;
-public interface NotificationQueueService {
+public interface NotificationQueueService extends QueueLifecycle {
public interface NotificationQueueHandler {
@@ -61,12 +62,11 @@ public interface NotificationQueueService {
* @param svcName the name of the service using that queue
* @param queueName a name for that queue (unique per service)
* @param handler the handler required for notifying the caller of state change
- * @param config the notification queue configuration
* @return a new NotificationQueue
* @throws com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists
* is the queue associated with that service and name already exits
*/
- public NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config)
+ public NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler)
throws NotificationQueueAlreadyExists;
/**
@@ -92,8 +92,7 @@ public interface NotificationQueueService {
throws NoSuchNotificationQueue;
/**
- * @param services
* @return the number of processed notifications
*/
- public int triggerManualQueueProcessing(final String[] services, final Boolean keepRunning);
+ public int triggerManualQueueProcessing(final Boolean keepRunning);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 8763192..99dd74f 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -17,39 +17,29 @@
package com.ning.billing.util.notificationq;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.skife.jdbi.v2.IDBI;
-import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
-import com.google.common.base.Joiner;
import com.google.inject.Inject;
-public abstract class NotificationQueueServiceBase implements NotificationQueueService {
- protected final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+public abstract class NotificationQueueServiceBase extends NotificationQueueDispatcher implements NotificationQueueService {
- protected final Clock clock;
-
- private final Map<String, NotificationQueue> queues;
@Inject
- public NotificationQueueServiceBase(final Clock clock) {
- this.clock = clock;
- this.queues = new TreeMap<String, NotificationQueue>();
+ public NotificationQueueServiceBase(final Clock clock, final NotificationQueueConfig config, final IDBI dbi,
+ final InternalCallContextFactory internalCallContextFactory) {
+ super(clock, config, dbi, internalCallContextFactory);
}
@Override
public NotificationQueue createNotificationQueue(final String svcName,
final String queueName,
- final NotificationQueueHandler handler,
- final NotificationConfig config) throws NotificationQueueAlreadyExists {
- if (svcName == null || queueName == null || handler == null || config == null) {
+ final NotificationQueueHandler handler) throws NotificationQueueAlreadyExists {
+ if (svcName == null || queueName == null || handler == null) {
throw new RuntimeException("Need to specify all parameters");
}
@@ -61,7 +51,7 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
throw new NotificationQueueAlreadyExists(String.format("Queue for svc %s and name %s already exist",
svcName, queueName));
}
- result = createNotificationQueueInternal(svcName, queueName, handler, config);
+ result = createNotificationQueueInternal(svcName, queueName, handler);
queues.put(compositeName, result);
}
return result;
@@ -96,33 +86,29 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
}
}
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("NotificationQueueServiceBase");
+ sb.append("{queues=").append(queues);
+ sb.append('}');
+ return sb.toString();
+ }
+
+
//
// Test ONLY
//
@Override
- public int triggerManualQueueProcessing(final String[] services, final Boolean keepRunning) {
+ public int triggerManualQueueProcessing(final Boolean keepRunning) {
int result = 0;
- List<NotificationQueue> manualQueues = null;
- if (services == null) {
- manualQueues = new ArrayList<NotificationQueue>(queues.values());
- } else {
- final Joiner join = Joiner.on(",");
- join.join(services);
-
- log.info("Trigger manual processing for services {} ", join.toString());
- manualQueues = new LinkedList<NotificationQueue>();
- synchronized (queues) {
- for (final String svc : services) {
- addQueuesForService(manualQueues, svc);
- }
- }
- }
+ List<NotificationQueue> manualQueues = new ArrayList<NotificationQueue>(queues.values());
for (final NotificationQueue cur : manualQueues) {
int processedNotifications = 0;
do {
- processedNotifications = cur.processReadyNotification();
+ doProcessEventsWithLimit(1);
log.info("Got {} results from queue {}", processedNotifications, cur.getFullQName());
result += processedNotifications;
} while (keepRunning && processedNotifications > 0);
@@ -130,28 +116,6 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
return result;
}
- private void addQueuesForService(final List<NotificationQueue> result, final String svcName) {
- for (final String cur : queues.keySet()) {
- if (cur.startsWith(svcName)) {
- result.add(queues.get(cur));
- }
- }
- }
-
protected abstract NotificationQueue createNotificationQueueInternal(String svcName,
- String queueName, NotificationQueueHandler handler,
- NotificationConfig config);
-
- public static String getCompositeName(final String svcName, final String queueName) {
- return svcName + ":" + queueName;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("NotificationQueueServiceBase");
- sb.append("{queues=").append(queues);
- sb.append('}');
- return sb.toString();
- }
+ String queueName, NotificationQueueHandler handler);
}
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index 8deea0e..2fa9baa 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -34,23 +34,25 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
private final int nbThreads;
private final Executor executor;
- private final String svcName;
+ private final String svcQName;
private final long sleepTimeMs;
private boolean isProcessingEvents;
private int curActiveThreads;
protected final ObjectMapper objectMapper;
-
- public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
+
+
+ public PersistentQueueBase(final String svcQName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
this.executor = executor;
this.nbThreads = nbThreads;
- this.svcName = svcName;
+ this.svcQName = svcQName;
this.sleepTimeMs = config.getSleepTimeMs();
this.objectMapper = new ObjectMapper();
this.isProcessingEvents = false;
this.curActiveThreads = 0;
}
+
@Override
public void startQueue() {
@@ -61,7 +63,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
final CountDownLatch doneInitialization = new CountDownLatch(nbThreads);
log.info(String.format("%s: Starting with %d threads",
- svcName, nbThreads));
+ svcQName, nbThreads));
for (int i = 0; i < nbThreads; i++) {
executor.execute(new Runnable() {
@@ -69,7 +71,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
public void run() {
log.info(String.format("%s: Thread %s [%d] starting",
- svcName,
+ svcQName,
Thread.currentThread().getName(),
Thread.currentThread().getId()));
@@ -93,19 +95,19 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
doProcessEvents();
} catch (Exception e) {
log.warn(String.format("%s: Thread %s [%d] got an exception, catching and moving on...",
- svcName,
+ svcQName,
Thread.currentThread().getName(),
Thread.currentThread().getId()), e);
}
sleepALittle();
}
} catch (InterruptedException e) {
- log.info(String.format("%s: Thread %s got interrupted, exting... ", svcName, Thread.currentThread().getName()));
+ log.info(String.format("%s: Thread %s got interrupted, exting... ", svcQName, Thread.currentThread().getName()));
} catch (Throwable e) {
- log.error(String.format("%s: Thread %s got an exception, exting... ", svcName, Thread.currentThread().getName()), e);
+ log.error(String.format("%s: Thread %s got an exception, exting... ", svcQName, Thread.currentThread().getName()), e);
} finally {
- log.info(String.format("%s: Thread %s has exited", svcName, Thread.currentThread().getName()));
+ log.info(String.format("%s: Thread %s has exited", svcQName, Thread.currentThread().getName()));
synchronized (thePersistentQ) {
curActiveThreads--;
}
@@ -121,12 +123,12 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
final boolean success = doneInitialization.await(waitTimeoutMs, TimeUnit.MILLISECONDS);
if (!success) {
- log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+ log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcQName, (nbThreads - doneInitialization.getCount()), nbThreads));
} else {
- log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+ log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcQName, (nbThreads - doneInitialization.getCount()), nbThreads));
}
} catch (InterruptedException e) {
- log.warn(String.format("%s: Start sequence, got interrupted", svcName));
+ log.warn(String.format("%s: Start sequence, got interrupted", svcQName));
}
}
@@ -147,17 +149,17 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
}
} catch (InterruptedException ignore) {
- log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcName, curActiveThreads));
+ log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcQName, curActiveThreads));
} finally {
if (remaining > 0) {
- log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+ log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcQName, curActiveThreads));
} else {
- log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+ log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcQName, curActiveThreads));
}
curActiveThreads = 0;
}
}
-
+
protected <T> T deserializeEvent(final String className, final String json) {
try {
final Class<?> claz = Class.forName(className);
@@ -168,9 +170,8 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
}
}
+ public abstract int doProcessEvents();
+ public abstract boolean isStarted();
-
- @Override
- public abstract int doProcessEvents();
}
diff --git a/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java b/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
index b839468..e183bb9 100644
--- a/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
@@ -26,8 +26,5 @@ public interface QueueLifecycle {
*/
public void stopQueue();
- /**
- * Processes event from queue
- */
- public int doProcessEvents();
+ public boolean isStarted();
}
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 647fa9c..74aaee2 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -142,7 +142,7 @@ CREATE TABLE notifications (
PRIMARY KEY(record_id)
) ENGINE=innodb;
CREATE UNIQUE INDEX notifications_id ON notifications(id);
-CREATE INDEX `idx_comp_where` ON notifications (`effective_date`, `queue_name`, `processing_state`,`processing_owner`,`processing_available_date`);
+CREATE INDEX `idx_comp_where` ON notifications (`effective_date`, `processing_state`,`processing_owner`,`processing_available_date`);
CREATE INDEX `idx_update` ON notifications (`processing_state`,`processing_owner`,`processing_available_date`);
CREATE INDEX `idx_get_ready` ON notifications (`effective_date`,`created_date`,`id`);
CREATE INDEX notifications_tenant_account_record_id ON notifications(tenant_record_id, account_record_id);
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 3b3aefc..e55a423 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -24,7 +24,6 @@ getReadyNotifications() ::= <<
FORCE INDEX (idx_comp_where)
where
effective_date \<= :now
- and queue_name = :queueName
and processing_state != 'PROCESSED'
and processing_state != 'REMOVED'
and (processing_owner IS NULL OR processing_available_date \<= :now)
diff --git a/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java b/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
index aed75f5..93ef001 100644
--- a/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
+++ b/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
@@ -16,16 +16,24 @@
package com.ning.billing.mock.glue;
+import org.skife.config.ConfigurationObjectFactory;
+
import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.google.inject.AbstractModule;
public class MockNotificationQueueModule extends AbstractModule {
+ protected void configureNotificationQueueConfig() {
+ final NotificationQueueConfig config = new ConfigurationObjectFactory(System.getProperties()).build(NotificationQueueConfig.class);
+ bind(NotificationQueueConfig.class).toInstance(config);
+ }
@Override
protected void configure() {
bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
+ configureNotificationQueueConfig();
}
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 264f10c..6b07167 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -73,7 +73,7 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
Thread.sleep(1000);
final DateTime now = new DateTime();
- final List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, "testBasic", internalCallContext);
+ final List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, internalCallContext);
assertNotNull(notifications);
assertEquals(notifications.size(), 1);
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 2ed6079..ac267a7 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -25,23 +25,38 @@ import java.util.UUID;
import org.joda.time.DateTime;
+import com.ning.billing.util.Hostname;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.clock.Clock;
-import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.entity.dao.EntitySqlDao;
import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
-import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
import com.fasterxml.jackson.databind.ObjectMapper;
-public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
+public class MockNotificationQueue implements NotificationQueue {
+
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private final String hostname;
private final TreeSet<Notification> notifications;
+ private final Clock clock;
+ private final String svcName;
+ private final String queueName;
+ private final NotificationQueueHandler handler;
+ private final MockNotificationQueueService queueService;
+
+ private volatile boolean isStarted;
+
+ public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final MockNotificationQueueService mockNotificationQueueService) {
+
+ this.svcName = svcName;
+ this.queueName = queueName;
+ this.handler = handler;
+ this.clock = clock;
+ this.hostname = Hostname.get();
+ this.queueService = mockNotificationQueueService;
- public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
- super(clock, svcName, queueName, handler, config);
notifications = new TreeSet<Notification>(new Comparator<Notification>() {
@Override
public int compare(final Notification o1, final Notification o2) {
@@ -57,7 +72,7 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
@Override
public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
final String json = objectMapper.writeValueAsString(notificationKey);
- final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, context.getUserToken(),
+ final Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.getClass().getName(), json, context.getUserToken(),
UUID.randomUUID(), futureNotificationTime, null, 0L);
synchronized (notifications) {
notifications.add(notification);
@@ -70,81 +85,112 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
recordFutureNotification(futureNotificationTime, notificationKey, context);
}
- public List<Notification> getPendingEvents() {
- final List<Notification> result = new ArrayList<Notification>();
+ @Override
+ public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
+ final List<Notification> toClearNotifications = new ArrayList<Notification>();
for (final Notification notification : notifications) {
- if (notification.getProcessingState() == PersistentQueueEntryLifecycleState.AVAILABLE) {
- result.add(notification);
+ if (notification.getNotificationKey().equals(key.toString())) {
+ toClearNotifications.add(notification);
}
}
- return result;
+ synchronized (notifications) {
+ if (toClearNotifications.size() > 0) {
+ notifications.removeAll(toClearNotifications);
+ }
+ }
}
@Override
- public int doProcessEvents() {
- final int result;
- final List<Notification> processedNotifications = new ArrayList<Notification>();
- final List<Notification> oldNotifications = new ArrayList<Notification>();
-
- final List<Notification> readyNotifications = new ArrayList<Notification>();
+ public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
+ /*
+ final List<Notification> result = new ArrayList<Notification>();
synchronized (notifications) {
- for (final Notification cur : notifications) {
- if (cur.isAvailableForProcessing(getClock().getUTCNow())) {
- readyNotifications.add(cur);
+ for (Notification cur : notifications) {
+ if (cur.getAccountId().equals(accountId) || cur.getEffectiveDate().compareTo(effectiveDate) == 0) {
+ result.add(cur);
}
}
}
+ return result;
+ */
+ return null;
+ }
- result = readyNotifications.size();
- for (final Notification cur : readyNotifications) {
- final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
- getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getFutureUserToken(), cur.getAccountRecordId(), cur.getTenantRecordId());
- final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
- "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
- PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
- cur.getNotificationKey(), cur.getUserToken(), UUID.randomUUID(), cur.getEffectiveDate(),
- cur.getAccountRecordId(), cur.getTenantRecordId());
- oldNotifications.add(cur);
- processedNotifications.add(processedNotification);
- }
+ @Override
+ public void removeNotification(final UUID notificationId, final InternalCallContext context) {
synchronized (notifications) {
- if (oldNotifications.size() > 0) {
- notifications.removeAll(oldNotifications);
- }
-
- if (processedNotifications.size() > 0) {
- notifications.addAll(processedNotifications);
+ for (Notification cur : notifications) {
+ if (cur.getId().equals(notificationId)) {
+ notifications.remove(cur);
+ break;
+ }
}
}
- return result;
}
@Override
- public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
- final List<Notification> toClearNotifications = new ArrayList<Notification>();
- for (final Notification notification : notifications) {
- if (notification.getNotificationKey().equals(key.toString())) {
- toClearNotifications.add(notification);
- }
- }
+ public String getFullQName() {
+ return NotificationQueueDispatcher.getCompositeName(svcName, queueName);
+ }
- synchronized (notifications) {
- if (toClearNotifications.size() > 0) {
- notifications.removeAll(toClearNotifications);
- }
- }
+ @Override
+ public String getServiceName() {
+ return svcName;
}
@Override
- public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
- return null;
+ public String getQueueName() {
+ return queueName;
}
@Override
- public void removeNotification(final UUID notificationId, final InternalCallContext context) {
+ public NotificationQueueHandler getHandler() {
+ return handler;
+ }
+
+ @Override
+ public void startQueue() {
+ isStarted = true;
+ queueService.startQueue();
+ }
+
+ @Override
+ public void stopQueue() {
+ isStarted = false;
+ queueService.stopQueue();
+ }
+
+ @Override
+ public boolean isStarted() {
+ return isStarted;
+ }
+
+
+ public List<Notification> getReadyNotifications() {
+ final int result;
+ final List<Notification> processedNotifications = new ArrayList<Notification>();
+ final List<Notification> oldNotifications = new ArrayList<Notification>();
+
+ final List<Notification> readyNotifications = new ArrayList<Notification>();
+ synchronized (notifications) {
+ for (final Notification cur : notifications) {
+ if (cur.isAvailableForProcessing(clock.getUTCNow())) {
+ readyNotifications.add(cur);
+ }
+ }
+ }
+ return readyNotifications;
}
+
+ public void markProcessedNotifications(final List<Notification> toBeremoved, final List<Notification> toBeAdded) {
+ synchronized (notifications) {
+ notifications.removeAll(toBeremoved);
+ notifications.addAll(toBeAdded);
+ }
+ }
+
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
index ee5ca32..0284747 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
@@ -16,22 +16,65 @@
package com.ning.billing.util.notificationq;
-import com.ning.billing.util.config.NotificationConfig;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
import com.google.inject.Inject;
public class MockNotificationQueueService extends NotificationQueueServiceBase {
@Inject
- public MockNotificationQueueService(final Clock clock) {
- super(clock);
+ public MockNotificationQueueService(final Clock clock, final NotificationQueueConfig config) {
+ super(clock, config, null, null);
+ }
+
+
+ @Override
+ protected NotificationQueue createNotificationQueueInternal(final String svcName, final String queueName,
+ final NotificationQueueHandler handler) {
+ return new MockNotificationQueue(clock, svcName, queueName, handler, this);
}
+
@Override
- protected NotificationQueue createNotificationQueueInternal(final String svcName,
- final String queueName, final NotificationQueueHandler handler,
- final NotificationConfig config) {
- return new MockNotificationQueue(clock, svcName, queueName, handler, config);
+ public int doProcessEvents() {
+
+ int result = 0;
+
+ for (NotificationQueue cur : queues.values()) {
+ result += doProcessEventsForQueue((MockNotificationQueue) cur);
+ }
+ return result;
+ }
+
+ private int doProcessEventsForQueue(final MockNotificationQueue queue) {
+
+
+ int result = 0;
+ final List<Notification> processedNotifications = new ArrayList<Notification>();
+ final List<Notification> oldNotifications = new ArrayList<Notification>();
+
+ List<Notification> readyNotifications = queue.getReadyNotifications();
+ for (final Notification cur : readyNotifications) {
+ final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
+ queue.getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getFutureUserToken(), cur.getAccountRecordId(), cur.getTenantRecordId());
+ final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
+ "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
+ PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
+ cur.getNotificationKey(), cur.getUserToken(), cur.getFutureUserToken(), cur.getEffectiveDate(),
+ cur.getAccountRecordId(), cur.getTenantRecordId());
+ oldNotifications.add(cur);
+ processedNotifications.add(processedNotification);
+ result++;
+ }
+
+ queue.markProcessedNotifications(oldNotifications, processedNotifications);
+ return result;
}
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 03ea6ae..09be990 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -36,10 +36,8 @@ import org.testng.annotations.Test;
import com.ning.billing.KillbillTestSuiteWithEmbeddedDB;
import com.ning.billing.dbi.MysqlTestingHelper;
import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
-import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
-import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.entity.dao.EntitySqlDao;
import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
@@ -60,6 +58,7 @@ import static com.jayway.awaitility.Awaitility.await;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.testng.Assert.assertEquals;
+
@Guice(modules = TestNotificationQueue.TestNotificationQueueModule.class)
public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
@@ -76,6 +75,9 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
@Inject
private Clock clock;
+ @Inject
+ NotificationQueueService queueService;
+
private int eventsReceived;
private static final class TestNotificationKey implements NotificationKey, Comparable<TestNotificationKey> {
@@ -96,6 +98,13 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
public int compareTo(TestNotificationKey arg0) {
return value.compareTo(arg0.value);
}
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(value);
+ return sb.toString();
+ }
}
@BeforeSuite(groups = "slow")
@@ -123,20 +132,20 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
- final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
- synchronized (expectedNotifications) {
- log.info("Handler received key: " + notificationKey);
-
- expectedNotifications.put(notificationKey, Boolean.TRUE);
- expectedNotifications.notify();
- }
- }
- },
- getNotificationConfig(false, 100, 1, 10000),
- new InternalCallContextFactory(dbi, clock));
+
+ final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+ "foo",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ synchronized (expectedNotifications) {
+ log.info("Handler received key: " + notificationKey);
+
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ });
queue.startQueue();
@@ -149,9 +158,13 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
expectedNotifications.put(notificationKey, Boolean.FALSE);
// Insert dummy to be processed in 2 sec'
- entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+ entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>()
+
+ {
@Override
- public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+ public Void inTransaction(
+ final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws
+ Exception {
entitySqlDaoWrapperFactory.transmogrify(DummySqlTest.class).insertDummy(obj);
queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, notificationKey, internalCallContext);
@@ -159,40 +172,56 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
return null;
}
- });
+ }
+
+ );
// Move time in the future after the notification effectiveDate
- ((ClockMock) clock).setDeltaFromReality(3000);
+ ((ClockMock) clock).
+
+ setDeltaFromReality(3000);
// Notification should have kicked but give it at least a sec' for thread scheduling
- await().atMost(1, MINUTES).until(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return expectedNotifications.get(notificationKey);
- }
- });
+ await()
+
+ .
+
+ atMost(1, MINUTES)
+
+ .
+
+ until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws
+ Exception {
+ return expectedNotifications.get(notificationKey);
+ }
+ }
+
+ );
queue.stopQueue();
Assert.assertTrue(expectedNotifications.get(notificationKey));
}
@Test(groups = "slow")
- public void testManyNotifications() throws InterruptedException {
+ public void testManyNotifications() throws Exception {
final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
- final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
- synchronized (expectedNotifications) {
- expectedNotifications.put(notificationKey, Boolean.TRUE);
- expectedNotifications.notify();
- }
- }
- },
- getNotificationConfig(false, 100, 10, 10000),
- new InternalCallContextFactory(dbi, clock));
+ final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+ "many",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ synchronized (expectedNotifications) {
+ log.info("Handler received key: " + notificationKey.toString());
+
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ });
queue.startQueue();
final DateTime now = clock.getUTCNow();
@@ -205,7 +234,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final DummyObject obj = new DummyObject("foo", key);
final int currentIteration = i;
- final NotificationKey notificationKey = new TestNotificationKey(key.toString());
+ final NotificationKey notificationKey = new TestNotificationKey(new Integer(i).toString());
expectedNotifications.put(notificationKey, Boolean.FALSE);
entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -243,7 +272,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
success = true;
break;
}
- //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+ log.info(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
expectedNotifications.wait(1000);
}
} while (nbTry-- > 0);
@@ -269,40 +298,24 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final Map<NotificationKey, Boolean> expectedNotificationsFred = new TreeMap<NotificationKey, Boolean>();
final Map<NotificationKey, Boolean> expectedNotificationsBarney = new TreeMap<NotificationKey, Boolean>();
- final NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, new InternalCallContextFactory(dbi, clock));
- final NotificationConfig config = new NotificationConfig() {
- @Override
- public boolean isNotificationProcessingOff() {
- return false;
- }
-
- @Override
- public long getSleepTimeMs() {
- return 10;
- }
- };
-
- final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
+ final NotificationQueue queueFred = queueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
@Override
public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
log.info("Fred received key: " + notificationKey);
expectedNotificationsFred.put(notificationKey, Boolean.TRUE);
eventsReceived++;
}
- },
- config);
+ });
- final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
+ final NotificationQueue queueBarney = queueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
@Override
public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
log.info("Barney received key: " + notificationKey);
expectedNotificationsBarney.put(notificationKey, Boolean.TRUE);
eventsReceived++;
}
- },
- config);
-
+ });
queueFred.startQueue();
// We don't start Barney so it can never pick up notifications
@@ -353,40 +366,26 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney));
}
- NotificationConfig getNotificationConfig(final boolean off, final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
- return new NotificationConfig() {
- @Override
- public boolean isNotificationProcessingOff() {
- return off;
- }
-
- @Override
- public long getSleepTimeMs() {
- return sleepTime;
- }
- };
- }
@Test(groups = "slow")
- public void testRemoveNotifications() throws InterruptedException {
+ public void testRemoveNotifications() throws Exception {
final UUID key = UUID.randomUUID();
final NotificationKey notificationKey = new TestNotificationKey(key.toString());
final UUID key2 = UUID.randomUUID();
final NotificationKey notificationKey2 = new TestNotificationKey(key2.toString());
- final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
- if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
- log.info("Received notification with key: " + notificationKey);
- eventsReceived++;
- }
- }
- },
- getNotificationConfig(false, 100, 10, 10000),
- new InternalCallContextFactory(dbi, clock));
+ final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+ "remove",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
+ log.info("Received notification with key: " + notificationKey);
+ eventsReceived++;
+ }
+ }
+ });
queue.startQueue();
final DateTime start = clock.getUTCNow().plusHours(1);
@@ -424,11 +423,26 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
queue.stopQueue();
}
+
+ static NotificationQueueConfig getNotificationConfig(final boolean off, final long sleepTime) {
+ return new NotificationQueueConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return off;
+ }
+
+ @Override
+ public long getSleepTimeMs() {
+ return sleepTime;
+ }
+ };
+ }
+
public static class TestNotificationQueueModule extends AbstractModule {
@Override
protected void configure() {
- bind(Clock.class).to(ClockMock.class);
+ bind(Clock.class).to(ClockMock.class).asEagerSingleton();
final MysqlTestingHelper helper = KillbillTestSuiteWithEmbeddedDB.getMysqlTestingHelper();
bind(MysqlTestingHelper.class).toInstance(helper);
@@ -436,6 +450,8 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
bind(IDBI.class).toInstance(dbi);
final IDBI otherDbi = helper.getDBI();
bind(IDBI.class).annotatedWith(Names.named("global-lock")).toInstance(otherDbi);
+ bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+ bind(NotificationQueueConfig.class).toInstance(getNotificationConfig(false, 100));
}
}
}