killbill-memoizeit
Merge remote-tracking branch 'origin/master' into meter Conflicts: overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java Signed-off-by: …
Changes
beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java 8(+4 -4)
entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java 3(+2 -1)
entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java 46(+22 -24)
entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java 44(+26 -18)
entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java 58(+42 -16)
entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java 14(+13 -1)
entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionFactory.java 63(+0 -63)
entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java 25(+15 -10)
entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java 33(+15 -18)
entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java 1(+1 -0)
entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java 37(+16 -21)
entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java 65(+62 -3)
entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionBuilder.java 251(+104 -147)
entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java 174(+126 -48)
entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java 2(+1 -1)
entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java 13(+8 -5)
entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java 8(+4 -4)
entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java 41(+23 -18)
invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java 22(+5 -17)
invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java 10(+5 -5)
invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java 10(+0 -10)
invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java 10(+0 -10)
invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java 5(+2 -3)
meter/pom.xml 3(+2 -1)
overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java 22(+5 -17)
util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java 15(+8 -7)
util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueDispatcher.java 209(+170 -39)
util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java 82(+23 -59)
Details
diff --git a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
index 7bd0ae1..c80e303 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
@@ -44,7 +44,6 @@ import com.ning.billing.entitlement.alignment.PlanAligner;
import com.ning.billing.entitlement.api.svcs.DefaultEntitlementInternalApi;
import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory;
import com.ning.billing.entitlement.api.user.EntitlementUserApi;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.engine.addon.AddonUtils;
@@ -58,9 +57,11 @@ import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.config.CatalogConfig;
import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
+
public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
private BusinessAccountTagSqlDao accountTagSqlDao;
@@ -73,6 +74,19 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
private EntitlementUserApi entitlementUserApi;
private BusinessTagDao tagDao;
+
+ private NotificationQueueConfig config = new NotificationQueueConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return false;
+ }
+
+ @Override
+ public long getSleepTimeMs() {
+ return 3000;
+ }
+ };
+
@BeforeMethod(groups = "slow")
public void setUp() throws Exception {
final Clock clock = new ClockMock();
@@ -89,13 +103,12 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
accountUserApi = new DefaultAccountUserApi(callContextFactory, internalCallContextFactory, accountDao);
final CatalogService catalogService = new DefaultCatalogService(Mockito.mock(CatalogConfig.class), Mockito.mock(VersionedCatalogLoader.class));
final AddonUtils addonUtils = new AddonUtils(catalogService);
- final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, internalCallContextFactory);
+ final DefaultNotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, config, internalCallContextFactory);
final EntitlementDao entitlementDao = new DefaultEntitlementDao(dbi, clock, addonUtils, notificationQueueService, eventBus, catalogService);
final PlanAligner planAligner = new PlanAligner(catalogService);
- final DefaultSubscriptionApiService apiService = new DefaultSubscriptionApiService(clock, entitlementDao, catalogService, planAligner, internalCallContextFactory);
- final DefaultSubscriptionFactory subscriptionFactory = new DefaultSubscriptionFactory(apiService, clock, catalogService);
- entitlementApi = new DefaultEntitlementInternalApi(entitlementDao, subscriptionFactory);
- entitlementUserApi = new DefaultEntitlementUserApi(clock, entitlementDao, catalogService, apiService, subscriptionFactory, addonUtils, internalCallContextFactory);
+ final DefaultSubscriptionApiService apiService = new DefaultSubscriptionApiService(clock, entitlementDao, catalogService, planAligner, addonUtils, internalCallContextFactory);
+ entitlementApi = new DefaultEntitlementInternalApi(entitlementDao, apiService, clock, catalogService);
+ entitlementUserApi = new DefaultEntitlementUserApi(clock, entitlementDao, catalogService, apiService, addonUtils, internalCallContextFactory);
tagDao = new BusinessTagDao(accountTagSqlDao, invoicePaymentTagSqlDao, invoiceTagSqlDao, subscriptionTransitionTagSqlDao,
accountApi, entitlementApi);
diff --git a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
index e04fed7..5ecdbb6 100644
--- a/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
+++ b/api/src/main/java/com/ning/billing/invoice/api/InvoiceUserApi.java
@@ -292,4 +292,12 @@ public interface InvoiceUserApi {
* @throws InvoiceApiException
*/
public String getInvoiceAsHTML(UUID invoiceId, TenantContext context) throws AccountApiException, IOException, InvoiceApiException;
+
+ /**
+ * Rebalance CBA for account which have credit and unpaid invoices-- only needed if system is configured to not rebalance automatically.
+ *
+ * @param accountId account id
+ * @param context the callcontext
+ */
+ public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final CallContext context);
}
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
index ef98c42..517ea8d 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/BeatrixListener.java
@@ -140,7 +140,7 @@ public class BeatrixListener {
default:
}
return eventBusType != null ?
- new ExtBusEventEntry(hostname, objectType, objectId, eventBusType, event.getAccountRecordId(), event.getAccountRecordId()) : null;
+ new ExtBusEventEntry(hostname, objectType, objectId, event.getUserToken(), eventBusType, event.getAccountRecordId(), event.getAccountRecordId()) : null;
}
}
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java
index b882a7c..a34e62f 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusEventEntry.java
@@ -37,12 +37,12 @@ public class ExtBusEventEntry implements PersistentQueueEntryLifecycle {
private final ObjectType objectType;
private final UUID objectId;
private final ExtBusEventType extBusType;
-
+ private final UUID userToken;
public ExtBusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable,
- final PersistentQueueEntryLifecycleState processingState,
- final ObjectType objectType, final UUID objectId, final ExtBusEventType extBusType,
- final Long accountRecordId, final Long tenantRecordId) {
+ final PersistentQueueEntryLifecycleState processingState,
+ final ObjectType objectType, final UUID objectId, final UUID userToken, final ExtBusEventType extBusType,
+ final Long accountRecordId, final Long tenantRecordId) {
this.id = id;
this.createdOwner = createdOwner;
this.owner = owner;
@@ -51,14 +51,14 @@ public class ExtBusEventEntry implements PersistentQueueEntryLifecycle {
this.objectType = objectType;
this.objectId = objectId;
this.extBusType = extBusType;
+ this.userToken = userToken;
this.accountRecordId = accountRecordId;
this.tenantRecordId = tenantRecordId;
}
- public ExtBusEventEntry(final String createdOwner,
- final ObjectType objectType, final UUID objectId, final ExtBusEventType extBusType,
- final Long accountRecordId, final Long tenantRecordId) {
- this(0, createdOwner, null, null, null, objectType, objectId, extBusType, accountRecordId, tenantRecordId);
+ public ExtBusEventEntry(final String createdOwner, final ObjectType objectType, final UUID objectId, final UUID userToken, final ExtBusEventType extBusType,
+ final Long accountRecordId, final Long tenantRecordId) {
+ this(0, createdOwner, null, null, null, objectType, objectId, userToken, extBusType, accountRecordId, tenantRecordId);
}
public long getId() {
@@ -78,6 +78,11 @@ public class ExtBusEventEntry implements PersistentQueueEntryLifecycle {
}
@Override
+ public UUID getUserToken() {
+ return userToken;
+ }
+
+ @Override
public String getOwner() {
return owner;
}
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java
index 2f3c3d0..f174158 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.java
@@ -13,6 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
+
package com.ning.billing.beatrix.extbus.dao;
import java.sql.ResultSet;
@@ -42,43 +43,41 @@ import com.ning.billing.util.dao.BinderBase;
import com.ning.billing.util.dao.MapperBase;
import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
-
@ExternalizedSqlViaStringTemplate3()
public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
-
@SqlQuery
@Mapper(ExtBusSqlMapper.class)
public ExtBusEventEntry getNextBusExtEventEntry(@Bind("max") int max,
- @Bind("owner") String owner,
- @Bind("now") Date now,
- @InternalTenantContextBinder final InternalTenantContext context);
+ @Bind("owner") String owner,
+ @Bind("now") Date now,
+ @InternalTenantContextBinder final InternalTenantContext context);
@SqlUpdate
public int claimBusExtEvent(@Bind("owner") String owner,
- @Bind("nextAvailable") Date nextAvailable,
- @Bind("recordId") Long id,
- @Bind("now") Date now,
- @InternalTenantContextBinder final InternalCallContext context);
+ @Bind("nextAvailable") Date nextAvailable,
+ @Bind("recordId") Long id,
+ @Bind("now") Date now,
+ @InternalTenantContextBinder final InternalCallContext context);
@SqlUpdate
public void clearBusExtEvent(@Bind("recordId") Long id,
- @Bind("owner") String owner,
- @InternalTenantContextBinder final InternalCallContext context);
+ @Bind("owner") String owner,
+ @InternalTenantContextBinder final InternalCallContext context);
@SqlUpdate
public void removeBusExtEventsById(@Bind("recordId") Long id,
- @InternalTenantContextBinder final InternalCallContext context);
+ @InternalTenantContextBinder final InternalCallContext context);
@SqlUpdate
public void insertBusExtEvent(@Bind(binder = ExtBusSqlBinder.class) ExtBusEventEntry evt,
- @InternalTenantContextBinder final InternalCallContext context);
+ @InternalTenantContextBinder final InternalCallContext context);
@SqlUpdate
public void insertClaimedExtHistory(@Bind("ownerId") String owner,
- @Bind("claimedDate") Date claimedDate,
- @Bind("busEventId") long id,
- @InternalTenantContextBinder final InternalCallContext context);
+ @Bind("claimedDate") Date claimedDate,
+ @Bind("busEventId") long id,
+ @InternalTenantContextBinder final InternalCallContext context);
public static class ExtBusSqlBinder extends BinderBase implements Binder<Bind, ExtBusEventEntry> {
@@ -87,6 +86,7 @@ public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
stmt.bind("eventType", evt.getExtBusType().toString());
stmt.bind("objectId", evt.getObjectId().toString());
stmt.bind("objectType", evt.getObjectType().toString());
+ stmt.bind("userToken", getUUIDString(evt.getUserToken()));
stmt.bind("createdDate", getDate(new DateTime()));
stmt.bind("creatingOwner", evt.getCreatedOwner());
stmt.bind("processingAvailableDate", getDate(evt.getNextAvailableDate()));
@@ -104,6 +104,7 @@ public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
final ExtBusEventType eventType = ExtBusEventType.valueOf(r.getString("event_type"));
final UUID objectId = getUUID(r, "object_id");
final ObjectType objectType = ObjectType.valueOf(r.getString("object_type"));
+ final UUID userToken = getUUID(r, "user_token");
final String createdOwner = r.getString("creating_owner");
final DateTime nextAvailableDate = getDateTime(r, "processing_available_date");
final String processingOwner = r.getString("processing_owner");
@@ -111,7 +112,7 @@ public interface ExtBusSqlDao extends Transactional<ExtBusSqlDao>, CloseMe {
final Long accountRecordId = r.getLong("account_record_id");
final Long tenantRecordId = r.getLong("tenant_record_id");
return new ExtBusEventEntry(recordId, createdOwner, processingOwner, nextAvailableDate, processingState,
- objectType, objectId, eventType, accountRecordId, tenantRecordId);
+ objectType, objectId, userToken, eventType, accountRecordId, tenantRecordId);
}
}
}
diff --git a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
index 81c832a..eddf4b5 100644
--- a/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
+++ b/beatrix/src/main/java/com/ning/billing/beatrix/extbus/PersistentExternalBus.java
@@ -63,6 +63,8 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
private final InternalCallContextFactory internalCallContextFactory;
private final AccountInternalApi accountApi;
+ private volatile boolean isStarted;
+
private static final class EventBusDelegate extends EventBus {
public EventBusDelegate(final String busName) {
@@ -90,10 +92,12 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
public void start() {
startQueue();
+ isStarted = true;
}
public void stop() {
stopQueue();
+ isStarted = false;
}
@Override
@@ -119,6 +123,11 @@ public class PersistentExternalBus extends PersistentQueueBase implements Extern
return result;
}
+ @Override
+ public boolean isStarted() {
+ return isStarted;
+ }
+
private final UUID getAccountIdFromRecordId(final Long recordId, final InternalCallContext context) {
try {
final Account account = accountApi.getAccountByRecordId(recordId, context);
diff --git a/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql b/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
index 655d4e0..d4b955c 100644
--- a/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
+++ b/beatrix/src/main/resources/com/ning/billing/beatrix/ddl.sql
@@ -6,6 +6,7 @@ CREATE TABLE bus_ext_events (
event_type varchar(32) NOT NULL,
object_id varchar(64) NOT NULL,
object_type varchar(32) NOT NULL,
+ user_token char(36),
created_date datetime NOT NULL,
creating_owner char(50) NOT NULL,
processing_owner char(50) DEFAULT NULL,
diff --git a/beatrix/src/main/resources/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.sql.stg b/beatrix/src/main/resources/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.sql.stg
index 897d273..c1acfa4 100644
--- a/beatrix/src/main/resources/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.sql.stg
+++ b/beatrix/src/main/resources/com/ning/billing/beatrix/extbus/dao/ExtBusSqlDao.sql.stg
@@ -9,6 +9,7 @@ getNextBusExtEventEntry() ::= <<
, event_type
, object_id
, object_type
+ , user_token
, created_date
, creating_owner
, processing_owner
@@ -66,6 +67,7 @@ insertBusExtEvent() ::= <<
event_type
, object_id
, object_type
+ , user_token
, created_date
, creating_owner
, processing_owner
@@ -76,7 +78,8 @@ insertBusExtEvent() ::= <<
) values (
:eventType
, :objectId
- , :objectType
+ , :objectType
+ , :userToken
, :createdDate
, :creatingOwner
, :processingOwner
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
index 38b3a50..fcd10f8 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/TestOverdueIntegration.java
@@ -179,17 +179,17 @@ public class TestOverdueIntegration extends TestOverdueBase {
new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 30), new LocalDate(2012, 7, 31), InvoiceItemType.REPAIR_ADJ, new BigDecimal("-249.95")),
new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 23), InvoiceItemType.CBA_ADJ, new BigDecimal("249.95")));
invoiceChecker.checkInvoice(account.getId(), 4, callContext,
- // Note the end date here is not 07-25, but 07-9. The overdue configuration disabled invoicing between 07-09 and 07-23 (e.g. the bundle
+ // Note the end date here is not 07-25, but 07-10. The overdue configuration disabled invoicing between 07-10 and 07-23 (e.g. the bundle
// was inaccessible, hence we didn't want to charge the customer for that period, even though the account was overdue).
- new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 30), new LocalDate(2012, 7, 9), InvoiceItemType.RECURRING, new BigDecimal("74.99")),
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 6, 30), new LocalDate(2012, 7, 10), InvoiceItemType.RECURRING, new BigDecimal("83.31")),
// Item for the upgraded recurring plan
new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 31), InvoiceItemType.RECURRING, new BigDecimal("154.85")),
// Credits consumed
- new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 23), InvoiceItemType.CBA_ADJ, new BigDecimal("-229.84")));
+ new ExpectedInvoiceItemCheck(new LocalDate(2012, 7, 23), new LocalDate(2012, 7, 23), InvoiceItemType.CBA_ADJ, new BigDecimal("-238.16")));
invoiceChecker.checkChargedThroughDate(baseSubscription.getId(), new LocalDate(2012, 7, 31), callContext);
// Verify the account balance: 249.95 - 74.99 - 154.85
- assertEquals(invoiceUserApi.getAccountBalance(account.getId(), callContext).compareTo(new BigDecimal("-20.11")), 0);
+ assertEquals(invoiceUserApi.getAccountBalance(account.getId(), callContext).compareTo(new BigDecimal("-11.79")), 0);
}
@Test(groups = "slow")
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/EntitlementApiBase.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/EntitlementApiBase.java
new file mode 100644
index 0000000..39d636b
--- /dev/null
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/EntitlementApiBase.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2010-2012 Ning, Inc.
+ *
+ * Ning licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.ning.billing.entitlement.api;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.ning.billing.catalog.api.CatalogService;
+import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.engine.dao.EntitlementDao;
+import com.ning.billing.entitlement.events.EntitlementEvent;
+import com.ning.billing.util.clock.Clock;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
+public class EntitlementApiBase {
+
+ protected final EntitlementDao dao;
+
+ protected final SubscriptionApiService apiService;
+ protected final Clock clock;
+ protected final CatalogService catalogService;
+
+ public EntitlementApiBase(final EntitlementDao dao, final SubscriptionApiService apiService, final Clock clock, final CatalogService catalogService) {
+ this.dao = dao;
+ this.apiService = apiService;
+ this.clock = clock;
+ this.catalogService = catalogService;
+ }
+
+ protected List<Subscription> createSubscriptionsForApiUse(final List<Subscription> internalSubscriptions) {
+ return new ArrayList<Subscription>(Collections2.transform(internalSubscriptions, new Function<Subscription, Subscription>() {
+ @Override
+ public Subscription apply(final Subscription subscription) {
+ return createSubscriptionForApiUse((SubscriptionData) subscription);
+ }
+ }));
+ }
+
+ protected SubscriptionData createSubscriptionForApiUse(final Subscription internalSubscription) {
+ return new SubscriptionData((SubscriptionData) internalSubscription, apiService, clock);
+ }
+
+ protected SubscriptionData createSubscriptionForApiUse(SubscriptionBuilder builder, List<EntitlementEvent> events) {
+ final SubscriptionData subscription = new SubscriptionData(builder, apiService, clock);
+ if (events.size() > 0) {
+ subscription.rebuildTransitions(events, catalogService.getFullCatalog());
+ }
+ return subscription;
+ }
+}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java
index a6eb3d9..30bb487 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/AccountMigrationData.java
@@ -20,7 +20,8 @@ import java.util.List;
import org.joda.time.DateTime;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.events.EntitlementEvent;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
index e836d59..1b42632 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/migration/DefaultEntitlementMigrationApi.java
@@ -25,13 +25,15 @@ import java.util.UUID;
import org.joda.time.DateTime;
+import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.entitlement.alignment.MigrationPlanAligner;
import com.ning.billing.entitlement.alignment.TimedMigration;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
@@ -54,24 +56,20 @@ import com.ning.billing.util.clock.Clock;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
-public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
+public class DefaultEntitlementMigrationApi extends EntitlementApiBase implements EntitlementMigrationApi {
- private final EntitlementDao dao;
private final MigrationPlanAligner migrationAligner;
- private final SubscriptionFactory factory;
- private final Clock clock;
private final InternalCallContextFactory internalCallContextFactory;
@Inject
public DefaultEntitlementMigrationApi(final MigrationPlanAligner migrationAligner,
- final SubscriptionFactory factory,
+ final SubscriptionApiService apiService,
+ final CatalogService catalogService,
final EntitlementDao dao,
final Clock clock,
final InternalCallContextFactory internalCallContextFactory) {
- this.dao = dao;
+ super(dao, apiService, clock, catalogService);
this.migrationAligner = migrationAligner;
- this.factory = factory;
- this.clock = clock;
this.internalCallContextFactory = internalCallContextFactory;
}
@@ -141,13 +139,13 @@ public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
final TimedMigration[] events = migrationAligner.getEventsMigration(input, now);
final DateTime migrationStartDate = events[0].getEventTime();
final List<EntitlementEvent> emptyEvents = Collections.emptyList();
- final SubscriptionData subscriptionData = factory.createSubscription(new SubscriptionBuilder()
- .setId(UUID.randomUUID())
- .setBundleId(bundleId)
- .setCategory(productCategory)
- .setBundleStartDate(migrationStartDate)
- .setAlignStartDate(migrationStartDate),
- emptyEvents);
+ final SubscriptionData subscriptionData = createSubscriptionForApiUse(new SubscriptionBuilder()
+ .setId(UUID.randomUUID())
+ .setBundleId(bundleId)
+ .setCategory(productCategory)
+ .setBundleStartDate(migrationStartDate)
+ .setAlignStartDate(migrationStartDate),
+ emptyEvents);
return new SubscriptionMigrationData(subscriptionData, toEvents(subscriptionData, now, ctd, events, context), ctd);
}
@@ -157,13 +155,13 @@ public class DefaultEntitlementMigrationApi implements EntitlementMigrationApi {
final TimedMigration[] events = migrationAligner.getEventsMigration(input, now);
final DateTime migrationStartDate = events[0].getEventTime();
final List<EntitlementEvent> emptyEvents = Collections.emptyList();
- final SubscriptionData subscriptionData = factory.createSubscription(new SubscriptionBuilder()
- .setId(UUID.randomUUID())
- .setBundleId(bundleId)
- .setCategory(productCategory)
- .setBundleStartDate(bundleStartDate)
- .setAlignStartDate(migrationStartDate),
- emptyEvents);
+ final SubscriptionData subscriptionData = createSubscriptionForApiUse(new SubscriptionBuilder()
+ .setId(UUID.randomUUID())
+ .setBundleId(bundleId)
+ .setCategory(productCategory)
+ .setBundleStartDate(bundleStartDate)
+ .setAlignStartDate(migrationStartDate),
+ emptyEvents);
return new SubscriptionMigrationData(subscriptionData, toEvents(subscriptionData, now, ctd, events, context), ctd);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
index 4721c66..db0af82 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/SubscriptionApiService.java
@@ -23,10 +23,11 @@ import com.ning.billing.catalog.api.BillingPeriod;
import com.ning.billing.catalog.api.PhaseType;
import com.ning.billing.catalog.api.Plan;
import com.ning.billing.catalog.api.PlanPhaseSpecifier;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.InternalCallContext;
public interface SubscriptionApiService {
@@ -54,4 +55,6 @@ public interface SubscriptionApiService {
public boolean changePlanWithPolicy(SubscriptionData subscription, String productName, BillingPeriod term,
String priceList, DateTime requestedDate, ActionPolicy policy, CallContext context)
throws EntitlementUserApiException;
+
+ public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java
index 1596030..f4b0420 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/svcs/DefaultEntitlementInternalApi.java
@@ -29,17 +29,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ning.billing.ErrorCode;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.catalog.api.CatalogService;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
+import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.events.EffectiveSubscriptionInternalEvent;
import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
@@ -48,18 +51,17 @@ import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
-public class DefaultEntitlementInternalApi implements EntitlementInternalApi {
+public class DefaultEntitlementInternalApi extends EntitlementApiBase implements EntitlementInternalApi {
private final Logger log = LoggerFactory.getLogger(DefaultEntitlementInternalApi.class);
- private final EntitlementDao dao;
- private final SubscriptionFactory subscriptionFactory;
@Inject
public DefaultEntitlementInternalApi(final EntitlementDao dao,
- final SubscriptionFactory subscriptionFactory) {
- this.dao = dao;
- this.subscriptionFactory = subscriptionFactory;
+ final DefaultSubscriptionApiService apiService,
+ final Clock clock,
+ final CatalogService catalogService) {
+ super(dao, apiService, clock, catalogService);
}
@Override
@@ -68,26 +70,31 @@ public class DefaultEntitlementInternalApi implements EntitlementInternalApi {
}
@Override
- public List<Subscription> getSubscriptionsForBundle(final UUID bundleId, final InternalTenantContext context) {
- return dao.getSubscriptions(subscriptionFactory, bundleId, context);
+ public List<Subscription> getSubscriptionsForBundle(UUID bundleId,
+ InternalTenantContext context) {
+ final List<Subscription> internalSubscriptions = dao.getSubscriptions(bundleId, context);
+ return createSubscriptionsForApiUse(internalSubscriptions);
}
@Override
- public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) throws EntitlementUserApiException {
- final Subscription result = dao.getBaseSubscription(subscriptionFactory, bundleId, context);
+ public Subscription getBaseSubscription(UUID bundleId,
+ InternalTenantContext context) throws EntitlementUserApiException {
+ final Subscription result = dao.getBaseSubscription(bundleId, context);
if (result == null) {
throw new EntitlementUserApiException(ErrorCode.ENT_GET_NO_SUCH_BASE_SUBSCRIPTION, bundleId);
}
- return result;
+ return createSubscriptionForApiUse(result);
}
@Override
- public Subscription getSubscriptionFromId(final UUID id, final InternalTenantContext context) throws EntitlementUserApiException {
- final Subscription result = dao.getSubscriptionFromId(subscriptionFactory, id, context);
+
+ public Subscription getSubscriptionFromId(UUID id,
+ InternalTenantContext context) throws EntitlementUserApiException {
+ final Subscription result = dao.getSubscriptionFromId(id, context);
if (result == null) {
throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_SUBSCRIPTION_ID, id);
}
- return result;
+ return createSubscriptionForApiUse(result);
}
@Override
@@ -105,8 +112,9 @@ public class DefaultEntitlementInternalApi implements EntitlementInternalApi {
}
@Override
- public void setChargedThroughDate(final UUID subscriptionId, final LocalDate localChargedThruDate, final InternalCallContext context) {
- final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, subscriptionId, context);
+ public void setChargedThroughDate(UUID subscriptionId,
+ LocalDate localChargedThruDate, InternalCallContext context) {
+ final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionId, context);
final DateTime chargedThroughDate = localChargedThruDate.toDateTime(new LocalTime(subscription.getStartDate(), DateTimeZone.UTC), DateTimeZone.UTC);
final SubscriptionBuilder builder = new SubscriptionBuilder(subscription)
.setChargedThroughDate(chargedThroughDate)
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
index f61d2a6..c44617a 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
@@ -16,6 +16,7 @@
package com.ning.billing.entitlement.api.timeline;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -25,21 +26,25 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import javax.annotation.Nullable;
+
import org.joda.time.DateTime;
import com.ning.billing.ErrorCode;
import com.ning.billing.catalog.api.CatalogApiException;
import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.SubscriptionTransitionType;
import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.NewEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.events.EntitlementEvent;
import com.ning.billing.entitlement.glue.DefaultEntitlementModule;
@@ -47,17 +52,21 @@ import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.callcontext.TenantContext;
+import com.ning.billing.util.clock.Clock;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
import com.google.inject.Inject;
import com.google.inject.name.Named;
-public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
+public class DefaultEntitlementTimelineApi extends EntitlementApiBase implements EntitlementTimelineApi {
- private final EntitlementDao dao;
- private final SubscriptionFactory factory;
private final RepairEntitlementLifecycleDao repairDao;
private final CatalogService catalogService;
private final InternalCallContextFactory internalCallContextFactory;
+ private final AddonUtils addonUtils;
+
+ private final SubscriptionApiService repairApiService;
private enum RepairType {
BASE_REPAIR,
@@ -66,14 +75,17 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
}
@Inject
- public DefaultEntitlementTimelineApi(@Named(DefaultEntitlementModule.REPAIR_NAMED) final SubscriptionFactory factory, final CatalogService catalogService,
+ public DefaultEntitlementTimelineApi(final CatalogService catalogService,
+ final SubscriptionApiService apiService,
@Named(DefaultEntitlementModule.REPAIR_NAMED) final RepairEntitlementLifecycleDao repairDao, final EntitlementDao dao,
- final InternalCallContextFactory internalCallContextFactory) {
+ @Named(DefaultEntitlementModule.REPAIR_NAMED) final SubscriptionApiService repairApiService,
+ final InternalCallContextFactory internalCallContextFactory, final Clock clock, final AddonUtils addonUtils) {
+ super(dao, apiService, clock, catalogService);
this.catalogService = catalogService;
- this.dao = dao;
this.repairDao = repairDao;
- this.factory = factory;
this.internalCallContextFactory = internalCallContextFactory;
+ this.repairApiService = repairApiService;
+ this.addonUtils = addonUtils;
}
@Override
@@ -101,7 +113,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
if (bundle == null) {
throw new EntitlementRepairException(ErrorCode.ENT_REPAIR_UNKNOWN_BUNDLE, descBundle);
}
- final List<Subscription> subscriptions = dao.getSubscriptions(factory, bundle.getId(), internalCallContextFactory.createInternalTenantContext(context));
+ final List<SubscriptionDataRepair> subscriptions = convertToSubscriptionsDataRepair(dao.getSubscriptions(bundle.getId(), internalCallContextFactory.createInternalTenantContext(context)));
if (subscriptions.size() == 0) {
throw new EntitlementRepairException(ErrorCode.ENT_REPAIR_NO_ACTIVE_SUBSCRIPTIONS, bundle.getId());
}
@@ -113,6 +125,18 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
}
}
+ private List<SubscriptionDataRepair> convertToSubscriptionsDataRepair(List<Subscription> input) {
+ return new ArrayList<SubscriptionDataRepair>(Collections2.transform(input, new Function<Subscription, SubscriptionDataRepair>() {
+ @Override
+ public SubscriptionDataRepair apply(@Nullable final Subscription subscription) {
+ return convertToSubscriptionDataRepair((SubscriptionData) subscription);
+ }
+ }));
+ }
+ private SubscriptionDataRepair convertToSubscriptionDataRepair(SubscriptionData input) {
+ return new SubscriptionDataRepair(input, repairApiService, (EntitlementDao) repairDao, clock, addonUtils, catalogService, internalCallContextFactory);
+ }
+
@Override
public BundleTimeline repairBundle(final BundleTimeline input, final boolean dryRun, final CallContext context) throws EntitlementRepairException {
final InternalTenantContext tenantContext = internalCallContextFactory.createInternalTenantContext(context);
@@ -123,7 +147,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
}
// Subscriptions are ordered with BASE subscription first-- if exists
- final List<Subscription> subscriptions = dao.getSubscriptions(factory, input.getId(), tenantContext);
+ final List<SubscriptionDataRepair> subscriptions = convertToSubscriptionsDataRepair(dao.getSubscriptions(input.getId(), tenantContext));
if (subscriptions.size() == 0) {
throw new EntitlementRepairException(ErrorCode.ENT_REPAIR_NO_ACTIVE_SUBSCRIPTIONS, input.getId());
}
@@ -252,7 +276,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
}
}
- private void validateBasePlanRecreate(final boolean isBasePlanRecreate, final List<Subscription> subscriptions, final List<SubscriptionTimeline> input)
+ private void validateBasePlanRecreate(final boolean isBasePlanRecreate, final List<SubscriptionDataRepair> subscriptions, final List<SubscriptionTimeline> input)
throws EntitlementRepairException {
if (!isBasePlanRecreate) {
return;
@@ -269,7 +293,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
}
}
- private void validateInputSubscriptionsKnown(final List<Subscription> subscriptions, final List<SubscriptionTimeline> input)
+ private void validateInputSubscriptionsKnown(final List<SubscriptionDataRepair> subscriptions, final List<SubscriptionTimeline> input)
throws EntitlementRepairException {
for (final SubscriptionTimeline cur : input) {
boolean found = false;
@@ -365,7 +389,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
return result;
}
- private String getViewId(final DateTime lastUpdateBundleDate, final List<Subscription> subscriptions) {
+ private String getViewId(final DateTime lastUpdateBundleDate, final List<SubscriptionDataRepair> subscriptions) {
final StringBuilder tmp = new StringBuilder();
long lastOrderedId = -1;
for (final Subscription cur : subscriptions) {
@@ -412,7 +436,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
};
}
- private List<SubscriptionTimeline> createGetSubscriptionRepairList(final List<Subscription> subscriptions, final List<SubscriptionTimeline> inRepair) throws CatalogApiException {
+ private List<SubscriptionTimeline> createGetSubscriptionRepairList(final List<SubscriptionDataRepair> subscriptions, final List<SubscriptionTimeline> inRepair) throws CatalogApiException {
final List<SubscriptionTimeline> result = new LinkedList<SubscriptionTimeline>();
final Set<UUID> repairIds = new TreeSet<UUID>();
@@ -464,7 +488,9 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
}
}
- return (SubscriptionDataRepair) factory.createSubscription(builder, initialEvents);
+ final SubscriptionDataRepair subscriptiondataRepair = new SubscriptionDataRepair(builder, curData.getEvents(), repairApiService, (EntitlementDao) repairDao, clock, addonUtils, catalogService, internalCallContextFactory);
+ subscriptiondataRepair.rebuildTransitions(curData.getEvents(), catalogService.getFullCatalog());
+ return subscriptiondataRepair;
}
private SubscriptionTimeline findAndCreateSubscriptionRepair(final UUID target, final List<SubscriptionTimeline> input) {
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
index 1870bfe..9f5d397 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/RepairSubscriptionApiService.java
@@ -16,12 +16,17 @@
package com.ning.billing.entitlement.api.timeline;
+import org.joda.time.DateTime;
+
import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.entitlement.alignment.PlanAligner;
import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
+import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.glue.DefaultEntitlementModule;
+import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
@@ -35,7 +40,14 @@ public class RepairSubscriptionApiService extends DefaultSubscriptionApiService
@Named(DefaultEntitlementModule.REPAIR_NAMED) final EntitlementDao dao,
final CatalogService catalogService,
final PlanAligner planAligner,
+ final AddonUtils addonUtils,
final InternalCallContextFactory internalCallContextFactory) {
- super(clock, dao, catalogService, planAligner, internalCallContextFactory);
+ super(clock, dao, catalogService, planAligner, addonUtils, internalCallContextFactory);
+ }
+
+ // Nothing to do for repair as we pass all the repair events in the stream
+ @Override
+ public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context) {
+ return 0;
}
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
index 301f15f..310469c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/SubscriptionDataRepair.java
@@ -24,7 +24,6 @@ import org.joda.time.DateTime;
import com.ning.billing.ErrorCode;
import com.ning.billing.ObjectType;
-import com.ning.billing.catalog.api.Catalog;
import com.ning.billing.catalog.api.CatalogApiException;
import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.Plan;
@@ -33,8 +32,8 @@ import com.ning.billing.catalog.api.Product;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.SubscriptionTransitionType;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.api.user.SubscriptionTransition;
import com.ning.billing.entitlement.engine.addon.AddonUtils;
@@ -59,8 +58,6 @@ public class SubscriptionDataRepair extends SubscriptionData {
private final List<EntitlementEvent> initialEvents;
private final InternalCallContextFactory internalCallContextFactory;
- // Low level events are ONLY used for Repair APIs
- private List<EntitlementEvent> events;
public SubscriptionDataRepair(final SubscriptionBuilder builder, final List<EntitlementEvent> initialEvents, final SubscriptionApiService apiService,
final EntitlementDao dao, final Clock clock, final AddonUtils addonUtils, final CatalogService catalogService,
@@ -74,6 +71,20 @@ public class SubscriptionDataRepair extends SubscriptionData {
this.internalCallContextFactory = internalCallContextFactory;
}
+
+
+ public SubscriptionDataRepair(final SubscriptionData subscriptionData, final SubscriptionApiService apiService,
+ final EntitlementDao dao, final Clock clock, final AddonUtils addonUtils, final CatalogService catalogService,
+ final InternalCallContextFactory internalCallContextFactory) {
+ super(subscriptionData, apiService , clock);
+ this.repairDao = dao;
+ this.addonUtils = addonUtils;
+ this.clock = clock;
+ this.catalogService = catalogService;
+ this.initialEvents = subscriptionData.getEvents();
+ this.internalCallContextFactory = internalCallContextFactory;
+ }
+
DateTime getLastUserEventEffectiveDate() {
DateTime res = null;
for (final EntitlementEvent cur : events) {
@@ -185,12 +196,6 @@ public class SubscriptionDataRepair extends SubscriptionData {
}
}
- @Override
- public void rebuildTransitions(final List<EntitlementEvent> inputEvents, final Catalog catalog) {
- this.events = inputEvents;
- super.rebuildTransitions(inputEvents, catalog);
- }
-
public List<EntitlementEvent> getEvents() {
return events;
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java
index 466d466..651b297 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/transfer/DefaultEntitlementTransferApi.java
@@ -29,7 +29,8 @@ import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.PlanPhase;
import com.ning.billing.catalog.api.PlanPhaseSpecifier;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
import com.ning.billing.entitlement.api.timeline.BundleTimeline;
@@ -37,7 +38,8 @@ import com.ning.billing.entitlement.api.timeline.EntitlementRepairException;
import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline;
import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.ExistingEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
@@ -58,22 +60,17 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
-public class DefaultEntitlementTransferApi implements EntitlementTransferApi {
+public class DefaultEntitlementTransferApi extends EntitlementApiBase implements EntitlementTransferApi {
- private final Clock clock;
- private final EntitlementDao dao;
private final CatalogService catalogService;
- private final SubscriptionFactory subscriptionFactory;
private final EntitlementTimelineApi timelineApi;
private final InternalCallContextFactory internalCallContextFactory;
@Inject
public DefaultEntitlementTransferApi(final Clock clock, final EntitlementDao dao, final EntitlementTimelineApi timelineApi, final CatalogService catalogService,
- final SubscriptionFactory subscriptionFactory, final InternalCallContextFactory internalCallContextFactory) {
- this.clock = clock;
- this.dao = dao;
+ final SubscriptionApiService apiService, final InternalCallContextFactory internalCallContextFactory) {
+ super(dao, apiService, clock, catalogService);
this.catalogService = catalogService;
- this.subscriptionFactory = subscriptionFactory;
this.timelineApi = timelineApi;
this.internalCallContextFactory = internalCallContextFactory;
}
@@ -220,7 +217,7 @@ public class DefaultEntitlementTransferApi implements EntitlementTransferApi {
DateTime bundleStartdate = null;
for (final SubscriptionTimeline cur : bundleTimeline.getSubscriptions()) {
- final SubscriptionData oldSubscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, cur.getId(), fromInternalCallContext);
+ final SubscriptionData oldSubscription = (SubscriptionData) dao.getSubscriptionFromId(cur.getId(), fromInternalCallContext);
final List<ExistingEvent> existingEvents = cur.getExistingEvents();
final ProductCategory productCategory = existingEvents.get(0).getPlanPhaseSpecifier().getProductCategory();
if (productCategory == ProductCategory.ADD_ON) {
@@ -254,13 +251,13 @@ public class DefaultEntitlementTransferApi implements EntitlementTransferApi {
}
// Create the new subscription for the new bundle on the new account
- final SubscriptionData subscriptionData = subscriptionFactory.createSubscription(new SubscriptionBuilder()
- .setId(UUID.randomUUID())
- .setBundleId(subscriptionBundleData.getId())
- .setCategory(productCategory)
- .setBundleStartDate(effectiveTransferDate)
- .setAlignStartDate(subscriptionAlignStartDate),
- ImmutableList.<EntitlementEvent>of());
+ final SubscriptionData subscriptionData = createSubscriptionForApiUse(new SubscriptionBuilder()
+ .setId(UUID.randomUUID())
+ .setBundleId(subscriptionBundleData.getId())
+ .setCategory(productCategory)
+ .setBundleStartDate(effectiveTransferDate)
+ .setAlignStartDate(subscriptionAlignStartDate),
+ ImmutableList.<EntitlementEvent>of());
final List<EntitlementEvent> events = toEvents(existingEvents, subscriptionData, effectiveTransferDate, context);
final SubscriptionMigrationData curData = new SubscriptionMigrationData(subscriptionData, events, null);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
index 32d968c..4272038 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEffectiveSubscriptionEvent.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DefaultEffectiveSubscriptionEvent extends DefaultSubscriptionEvent implements EffectiveSubscriptionInternalEvent {
+
public DefaultEffectiveSubscriptionEvent(final SubscriptionTransitionData in, final DateTime startDate, final Long accountRecordId, final Long tenantRecordId) {
super(in, startDate, accountRecordId, tenantRecordId);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
index d7b8d6e..d3d128c 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
@@ -33,8 +33,7 @@ import com.ning.billing.catalog.api.PlanPhase;
import com.ning.billing.catalog.api.PlanPhaseSpecifier;
import com.ning.billing.catalog.api.PriceListSet;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.EntitlementApiBase;
import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
import com.ning.billing.entitlement.api.user.SubscriptionStatusDryRun.DryRunChangeReason;
import com.ning.billing.entitlement.engine.addon.AddonUtils;
@@ -48,26 +47,19 @@ import com.ning.billing.util.clock.DefaultClock;
import com.google.inject.Inject;
-public class DefaultEntitlementUserApi implements EntitlementUserApi {
+public class DefaultEntitlementUserApi extends EntitlementApiBase implements EntitlementUserApi {
- private final Clock clock;
- private final EntitlementDao dao;
private final CatalogService catalogService;
- private final DefaultSubscriptionApiService apiService;
private final AddonUtils addonUtils;
- private final SubscriptionFactory subscriptionFactory;
private final InternalCallContextFactory internalCallContextFactory;
@Inject
public DefaultEntitlementUserApi(final Clock clock, final EntitlementDao dao, final CatalogService catalogService,
- final DefaultSubscriptionApiService apiService, final SubscriptionFactory subscriptionFactory,
+ final DefaultSubscriptionApiService apiService,
final AddonUtils addonUtils, final InternalCallContextFactory internalCallContextFactory) {
- this.clock = clock;
- this.apiService = apiService;
- this.dao = dao;
+ super(dao, apiService, clock, catalogService);
this.catalogService = catalogService;
this.addonUtils = addonUtils;
- this.subscriptionFactory = subscriptionFactory;
this.internalCallContextFactory = internalCallContextFactory;
}
@@ -82,11 +74,11 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
@Override
public Subscription getSubscriptionFromId(final UUID id, final TenantContext context) throws EntitlementUserApiException {
- final Subscription result = dao.getSubscriptionFromId(subscriptionFactory, id, internalCallContextFactory.createInternalTenantContext(context));
+ final Subscription result = dao.getSubscriptionFromId(id, internalCallContextFactory.createInternalTenantContext(context));
if (result == null) {
throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_SUBSCRIPTION_ID, id);
}
- return result;
+ return createSubscriptionForApiUse(result);
}
@Override
@@ -111,23 +103,26 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
@Override
public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId, final String bundleKey, final TenantContext context) {
- return dao.getSubscriptionsForAccountAndKey(subscriptionFactory, accountId, bundleKey, internalCallContextFactory.createInternalTenantContext(context));
+ final List<Subscription> internalSubscriptions = dao.getSubscriptionsForAccountAndKey(accountId, bundleKey, internalCallContextFactory.createInternalTenantContext(context));
+ return createSubscriptionsForApiUse(internalSubscriptions);
}
@Override
public List<Subscription> getSubscriptionsForBundle(final UUID bundleId, final TenantContext context) {
- return dao.getSubscriptions(subscriptionFactory, bundleId, internalCallContextFactory.createInternalTenantContext(context));
+ final List<Subscription> internalSubscriptions = dao.getSubscriptions(bundleId, internalCallContextFactory.createInternalTenantContext(context));
+ return createSubscriptionsForApiUse(internalSubscriptions);
}
@Override
public Subscription getBaseSubscription(final UUID bundleId, final TenantContext context) throws EntitlementUserApiException {
- final Subscription result = dao.getBaseSubscription(subscriptionFactory, bundleId, internalCallContextFactory.createInternalTenantContext(context));
+ final Subscription result = dao.getBaseSubscription(bundleId, internalCallContextFactory.createInternalTenantContext(context));
if (result == null) {
throw new EntitlementUserApiException(ErrorCode.ENT_GET_NO_SUCH_BASE_SUBSCRIPTION, bundleId);
}
- return result;
+ return createSubscriptionForApiUse(result);
}
+
@Override
public SubscriptionBundle createBundleForAccount(final UUID accountId, final String bundleName, final CallContext context)
throws EntitlementUserApiException {
@@ -162,7 +157,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
}
DateTime bundleStartDate = null;
- final SubscriptionData baseSubscription = (SubscriptionData) dao.getBaseSubscription(subscriptionFactory, bundleId, internalCallContextFactory.createInternalTenantContext(context));
+ final SubscriptionData baseSubscription = (SubscriptionData) dao.getBaseSubscription(bundleId, internalCallContextFactory.createInternalTenantContext(context));
switch (plan.getProduct().getCategory()) {
case BASE:
if (baseSubscription != null) {
@@ -231,7 +226,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
public List<SubscriptionStatusDryRun> getDryRunChangePlanStatus(final UUID subscriptionId, @Nullable final String baseProductName,
final DateTime requestedDate, final TenantContext context)
throws EntitlementUserApiException {
- final Subscription subscription = dao.getSubscriptionFromId(subscriptionFactory, subscriptionId, internalCallContextFactory.createInternalTenantContext(context));
+ final Subscription subscription = dao.getSubscriptionFromId(subscriptionId, internalCallContextFactory.createInternalTenantContext(context));
if (subscription == null) {
throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_SUBSCRIPTION_ID, subscriptionId);
}
@@ -241,7 +236,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
final List<SubscriptionStatusDryRun> result = new LinkedList<SubscriptionStatusDryRun>();
- final List<Subscription> bundleSubscriptions = dao.getSubscriptions(subscriptionFactory, subscription.getBundleId(), internalCallContextFactory.createInternalTenantContext(context));
+ final List<Subscription> bundleSubscriptions = dao.getSubscriptions(subscription.getBundleId(), internalCallContextFactory.createInternalTenantContext(context));
for (final Subscription cur : bundleSubscriptions) {
if (cur.getId().equals(subscriptionId)) {
continue;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
index 3dd5850..d60a898 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultSubscriptionApiService.java
@@ -17,6 +17,8 @@
package com.ning.billing.entitlement.api.user;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
@@ -37,11 +39,12 @@ import com.ning.billing.catalog.api.PlanSpecifier;
import com.ning.billing.catalog.api.PriceList;
import com.ning.billing.catalog.api.PriceListSet;
import com.ning.billing.catalog.api.Product;
+import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.entitlement.alignment.PlanAligner;
import com.ning.billing.entitlement.alignment.TimedPhase;
import com.ning.billing.entitlement.api.SubscriptionApiService;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
+import com.ning.billing.entitlement.engine.addon.AddonUtils;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.events.EntitlementEvent;
import com.ning.billing.entitlement.events.phase.PhaseEvent;
@@ -68,18 +71,22 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
private final EntitlementDao dao;
private final CatalogService catalogService;
private final PlanAligner planAligner;
+ private final AddonUtils addonUtils;
private final InternalCallContextFactory internalCallContextFactory;
@Inject
public DefaultSubscriptionApiService(final Clock clock, final EntitlementDao dao, final CatalogService catalogService,
- final PlanAligner planAligner, final InternalCallContextFactory internalCallContextFactory) {
+ final PlanAligner planAligner, final AddonUtils addonUtils,
+ final InternalCallContextFactory internalCallContextFactory) {
this.clock = clock;
this.catalogService = catalogService;
this.planAligner = planAligner;
this.dao = dao;
+ this.addonUtils = addonUtils;
this.internalCallContextFactory = internalCallContextFactory;
}
+
@Override
public SubscriptionData createPlan(final SubscriptionBuilder builder, final Plan plan, final PhaseType initialPhase,
final String realPriceList, final DateTime requestedDate, final DateTime effectiveDate, final DateTime processedDate,
@@ -215,6 +222,9 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
final InternalCallContext internalCallContext = createCallContextFromBundleId(subscription.getBundleId(), context);
dao.cancelSubscription(subscription, cancelEvent, internalCallContext, 0);
subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId(), internalCallContext), catalogService.getFullCatalog());
+
+ cancelAddOnsIfRequired(subscription, effectiveDate, internalCallContext);
+
return (policy == ActionPolicy.IMMEDIATE);
}
@@ -354,9 +364,58 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
dao.changePlan(subscription, changeEvents, internalCallContext);
subscription.rebuildTransitions(dao.getEventsForSubscription(subscription.getId(), internalCallContext), catalogService.getFullCatalog());
+ cancelAddOnsIfRequired(subscription, effectiveDate, internalCallContext);
+
return (policy == ActionPolicy.IMMEDIATE);
}
+
+ public int cancelAddOnsIfRequired(final SubscriptionData baseSubscription, final DateTime effectiveDate, final InternalCallContext context) {
+
+ // If cancellation/change occur in the future, there is nothing to do
+ final DateTime now = clock.getUTCNow();
+ if (effectiveDate.compareTo(now) > 0) {
+ return 0;
+ }
+
+ final Product baseProduct = (baseSubscription.getState() == SubscriptionState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
+
+ final List<Subscription> subscriptions = dao.getSubscriptions(baseSubscription.getBundleId(), context);
+
+ final List<SubscriptionData> subscriptionsToBeCancelled = new LinkedList<SubscriptionData>();
+ final List<EntitlementEvent> cancelEvents = new LinkedList<EntitlementEvent>();
+
+ for (final Subscription subscription : subscriptions) {
+ final SubscriptionData cur = (SubscriptionData) subscription;
+ if (cur.getState() == SubscriptionState.CANCELLED ||
+ cur.getCategory() != ProductCategory.ADD_ON) {
+ continue;
+ }
+
+ final Plan addonCurrentPlan = cur.getCurrentPlan();
+ if (baseProduct == null ||
+ addonUtils.isAddonIncluded(baseProduct, addonCurrentPlan) ||
+ !addonUtils.isAddonAvailable(baseProduct, addonCurrentPlan)) {
+ //
+ // Perform AO cancellation using the effectiveDate of the BP
+ //
+ final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
+ .setSubscriptionId(cur.getId())
+ .setActiveVersion(cur.getActiveVersion())
+ .setProcessedDate(now)
+ .setEffectiveDate(effectiveDate)
+ .setRequestedDate(now)
+ .setUserToken(context.getUserToken())
+ .setFromDisk(true));
+ subscriptionsToBeCancelled.add(cur);
+ cancelEvents.add(cancelEvent);
+ }
+ }
+
+ dao.cancelSubscriptions(subscriptionsToBeCancelled, cancelEvents, context);
+ return subscriptionsToBeCancelled.size();
+ }
+
private void validateRequestedDate(final SubscriptionData subscription, final DateTime now, final DateTime requestedDate)
throws EntitlementUserApiException {
@@ -364,7 +423,7 @@ public class DefaultSubscriptionApiService implements SubscriptionApiService {
throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_FUTURE_DATE, requestedDate.toString());
}
- final SubscriptionTransition previousTransition = subscription.getPreviousTransition();
+ final SubscriptionTransition previousTransition = subscription.getPreviousTransition();
if (previousTransition != null && previousTransition.getEffectiveTransitionTime().isAfter(requestedDate)) {
throw new EntitlementUserApiException(ErrorCode.ENT_INVALID_REQUESTED_DATE,
requestedDate.toString(), previousTransition.getEffectiveTransitionTime());
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
index 2903504..7d88024 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/SubscriptionData.java
@@ -39,7 +39,6 @@ import com.ning.billing.catalog.api.PriceList;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.SubscriptionTransitionType;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionDataIterator.Kind;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionDataIterator.Order;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionDataIterator.TimeLimit;
@@ -85,6 +84,14 @@ public class SubscriptionData extends EntityBase implements Subscription {
//
private LinkedList<SubscriptionTransitionData> transitions;
+ // Low level events are ONLY used for Repair APIs
+ protected List<EntitlementEvent> events;
+
+
+ public List<EntitlementEvent> getEvents() {
+ return events;
+ }
+
// Transient object never returned at the API
public SubscriptionData(final SubscriptionBuilder builder) {
this(builder, null, null);
@@ -103,6 +110,22 @@ public class SubscriptionData extends EntityBase implements Subscription {
this.paidThroughDate = builder.getPaidThroughDate();
}
+ // Used for API to make sure we have a clock and an apiService set before we return the object
+ public SubscriptionData(final SubscriptionData internalSubscription, final SubscriptionApiService apiService, final Clock clock) {
+ super(internalSubscription.getId(), internalSubscription.getCreatedDate(), internalSubscription.getUpdatedDate());
+ this.apiService = apiService;
+ this.clock = clock;
+ this.bundleId = internalSubscription.getBundleId();
+ this.alignStartDate = internalSubscription.getAlignStartDate();
+ this.bundleStartDate = internalSubscription.getBundleStartDate();
+ this.category = internalSubscription.getCategory();
+ this.activeVersion = internalSubscription.getActiveVersion();
+ this.chargedThroughDate = internalSubscription.getChargedThroughDate();
+ this.paidThroughDate = internalSubscription.getPaidThroughDate();
+ this.transitions = new LinkedList<SubscriptionTransitionData>(internalSubscription.getAllTransitions());
+ this.events = internalSubscription.getEvents();
+ }
+
@Override
public UUID getBundleId() {
return bundleId;
@@ -484,13 +507,14 @@ public class SubscriptionData extends EntityBase implements Subscription {
"Failed to find CurrentPhaseStart id = %s", getId().toString()));
}
- public void rebuildTransitions(final List<EntitlementEvent> inputEvents,
- final Catalog catalog) {
+ public void rebuildTransitions(final List<EntitlementEvent> inputEvents, final Catalog catalog) {
if (inputEvents == null) {
return;
}
+ this.events = inputEvents;
+
SubscriptionState nextState = null;
String nextPlanName = null;
String nextPhaseName = null;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 27c9eea..ce206ed 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -28,12 +28,10 @@ import org.slf4j.LoggerFactory;
import com.ning.billing.catalog.api.Plan;
import com.ning.billing.catalog.api.Product;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.util.config.EntitlementConfig;
-import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.entitlement.alignment.PlanAligner;
import com.ning.billing.entitlement.alignment.TimedPhase;
import com.ning.billing.entitlement.api.EntitlementService;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
import com.ning.billing.entitlement.api.user.Subscription;
import com.ning.billing.entitlement.api.user.Subscription.SubscriptionState;
@@ -80,29 +78,25 @@ public class Engine implements EventListener, EntitlementService {
private final PlanAligner planAligner;
private final AddonUtils addonUtils;
private final InternalBus eventBus;
- private final EntitlementConfig config;
private final NotificationQueueService notificationQueueService;
- private final SubscriptionFactory subscriptionFactory;
private final InternalCallContextFactory internalCallContextFactory;
-
private NotificationQueue subscriptionEventQueue;
+ private final SubscriptionApiService apiService;
@Inject
public Engine(final Clock clock, final EntitlementDao dao, final PlanAligner planAligner,
- final EntitlementConfig config,
final AddonUtils addonUtils, final InternalBus eventBus,
final NotificationQueueService notificationQueueService,
- final SubscriptionFactory subscriptionFactory,
- final InternalCallContextFactory internalCallContextFactory) {
+ final InternalCallContextFactory internalCallContextFactory,
+ final SubscriptionApiService apiService) {
this.clock = clock;
this.dao = dao;
this.planAligner = planAligner;
this.addonUtils = addonUtils;
- this.config = config;
this.eventBus = eventBus;
this.notificationQueueService = notificationQueueService;
- this.subscriptionFactory = subscriptionFactory;
this.internalCallContextFactory = internalCallContextFactory;
+ this.apiService = apiService;
}
@Override
@@ -115,7 +109,7 @@ public class Engine implements EventListener, EntitlementService {
try {
final NotificationQueueHandler queueHandler = new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+ public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final UUID fromNotificationQueueUserToken, final Long accountRecordId, final Long tenantRecordId) {
if (!(inputKey instanceof EntitlementNotificationKey)) {
log.error("Entitlement service received an unexpected event type {}" + inputKey.getClass().getName());
return;
@@ -128,28 +122,16 @@ public class Engine implements EventListener, EntitlementService {
return;
}
- final UUID userToken = (event.getType() == EventType.API_USER) ? ((ApiEvent) event).getUserToken() : null;
+ // TODO STEPH?
+ final UUID userToken = (event.getType() == EventType.API_USER) ? ((ApiEvent) event).getUserToken() : fromNotificationQueueUserToken;
final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "SubscriptionEventQueue", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
processEventReady(event, key.getSeqId(), context);
}
};
- final NotificationConfig notificationConfig = new NotificationConfig() {
- @Override
- public long getSleepTimeMs() {
- return config.getSleepTimeMs();
- }
-
- @Override
- public boolean isNotificationProcessingOff() {
- return config.isNotificationProcessingOff();
- }
- };
-
subscriptionEventQueue = notificationQueueService.createNotificationQueue(ENTITLEMENT_SERVICE_NAME,
NOTIFICATION_QUEUE_NAME,
- queueHandler,
- notificationConfig);
+ queueHandler);
} catch (NotificationQueueAlreadyExists e) {
throw new RuntimeException(e);
}
@@ -174,7 +156,7 @@ public class Engine implements EventListener, EntitlementService {
return;
}
- final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(subscriptionFactory, event.getSubscriptionId(), context);
+ final SubscriptionData subscription = (SubscriptionData) dao.getSubscriptionFromId(event.getSubscriptionId(), context);
if (subscription == null) {
log.warn("Failed to retrieve subscription for id %s", event.getSubscriptionId());
return;
@@ -187,7 +169,6 @@ public class Engine implements EventListener, EntitlementService {
//
// Do any internal processing on that event before we send the event to the bus
//
-
int theRealSeqId = seqId;
if (event.getType() == EventType.PHASE) {
onPhaseEvent(subscription, context);
@@ -198,7 +179,7 @@ public class Engine implements EventListener, EntitlementService {
try {
final SubscriptionTransitionData transition = (subscription.getTransitionFromEvent(event, theRealSeqId));
final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, subscription.getAlignStartDate(),
- context.getAccountRecordId(), context.getTenantRecordId());
+ context.getAccountRecordId(), context.getTenantRecordId());
eventBus.post(busEvent, context);
} catch (EventBusException e) {
log.warn("Failed to post entitlement event " + event, e);
@@ -221,47 +202,8 @@ public class Engine implements EventListener, EntitlementService {
}
private int onBasePlanEvent(final SubscriptionData baseSubscription, final ApiEvent event, final InternalCallContext context) {
- final DateTime now = clock.getUTCNow();
- final Product baseProduct = (baseSubscription.getState() == SubscriptionState.CANCELLED) ? null : baseSubscription.getCurrentPlan().getProduct();
-
- final List<Subscription> subscriptions = dao.getSubscriptions(subscriptionFactory, baseSubscription.getBundleId(), context);
-
- final Map<UUID, EntitlementEvent> addOnCancellations = new HashMap<UUID, EntitlementEvent>();
- final Map<UUID, SubscriptionData> addOnCancellationSubscriptions = new HashMap<UUID, SubscriptionData>();
- for (final Subscription subscription : subscriptions) {
- final SubscriptionData cur = (SubscriptionData) subscription;
- if (cur.getState() == SubscriptionState.CANCELLED ||
- cur.getCategory() != ProductCategory.ADD_ON) {
- continue;
- }
+ return apiService.cancelAddOnsIfRequired(baseSubscription, event.getEffectiveDate(), context);
+ }
- final Plan addonCurrentPlan = cur.getCurrentPlan();
- if (baseProduct == null ||
- addonUtils.isAddonIncluded(baseProduct, addonCurrentPlan) ||
- !addonUtils.isAddonAvailable(baseProduct, addonCurrentPlan)) {
- //
- // Perform AO cancellation using the effectiveDate of the BP
- //
- final EntitlementEvent cancelEvent = new ApiEventCancel(new ApiEventBuilder()
- .setSubscriptionId(cur.getId())
- .setActiveVersion(cur.getActiveVersion())
- .setProcessedDate(now)
- .setEffectiveDate(event.getEffectiveDate())
- .setRequestedDate(now)
- .setUserToken(context.getUserToken())
- .setFromDisk(true));
- addOnCancellations.put(cur.getId(), cancelEvent);
- addOnCancellationSubscriptions.put(cur.getId(), cur);
- }
- }
-
- final int addOnSize = addOnCancellations.size();
- int cancelSeq = addOnSize - 1;
- for (final UUID key : addOnCancellations.keySet()) {
- dao.cancelSubscription(addOnCancellationSubscriptions.get(key), addOnCancellations.get(key), context, cancelSeq);
- cancelSeq--;
- }
- return addOnSize;
- }
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
index aecdd99..7f077ce 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/DefaultEntitlementDao.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2010-2011 Ning, Inc.
+ * Copyright 2010-2012 Ning, Inc.
*
* Ning licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -39,19 +40,20 @@ import com.ning.billing.ErrorCode;
import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.Plan;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
import com.ning.billing.entitlement.api.migration.AccountMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
import com.ning.billing.entitlement.api.timeline.DefaultRepairEntitlementEvent;
import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
import com.ning.billing.entitlement.api.transfer.TransferCancelData;
+import com.ning.billing.entitlement.api.user.DefaultEffectiveSubscriptionEvent;
import com.ning.billing.entitlement.api.user.DefaultRequestedSubscriptionEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
+import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
import com.ning.billing.entitlement.engine.addon.AddonUtils;
import com.ning.billing.entitlement.engine.core.Engine;
import com.ning.billing.entitlement.engine.core.EntitlementNotificationKey;
@@ -76,6 +78,7 @@ import com.ning.billing.util.entity.dao.EntitySqlDao;
import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import com.ning.billing.util.events.EffectiveSubscriptionInternalEvent;
import com.ning.billing.util.events.RepairEntitlementInternalEvent;
import com.ning.billing.util.notificationq.NotificationKey;
import com.ning.billing.util.notificationq.NotificationQueue;
@@ -98,6 +101,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
private final NotificationQueueService notificationQueueService;
private final AddonUtils addonUtils;
private final InternalBus eventBus;
+ private final CatalogService catalogService;
@Inject
public DefaultEntitlementDao(final IDBI dbi, final Clock clock, final AddonUtils addonUtils,
@@ -107,6 +111,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
this.notificationQueueService = notificationQueueService;
this.addonUtils = addonUtils;
this.eventBus = eventBus;
+ this.catalogService = catalogService;
}
@Override
@@ -206,29 +211,26 @@ public class DefaultEntitlementDao implements EntitlementDao {
}
@Override
- public Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
- return getBaseSubscription(factory, bundleId, true, context);
+ public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) {
+ return getBaseSubscription(bundleId, true, context);
}
@Override
- public Subscription getSubscriptionFromId(final SubscriptionFactory factory, final UUID subscriptionId, final InternalTenantContext context) {
- return buildSubscription(factory, getSubscriptionFromId(subscriptionId, context), context);
- }
-
- private Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
- return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Subscription>() {
+ public Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
+ final Subscription shellSubscription = transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Subscription>() {
@Override
public Subscription inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
final SubscriptionModelDao model = entitySqlDaoWrapperFactory.become(SubscriptionSqlDao.class).getById(subscriptionId.toString(), context);
return SubscriptionModelDao.toSubscription(model);
}
});
+ return buildSubscription(shellSubscription, context);
}
@Override
- public List<Subscription> getSubscriptions(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
- return buildBundleSubscriptions(bundleId, factory, getSubscriptionFromBundleId(bundleId, context), context);
+ public List<Subscription> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
+ return buildBundleSubscriptions(bundleId, getSubscriptionFromBundleId(bundleId, context), context);
}
private List<Subscription> getSubscriptionFromBundleId(final UUID bundleId, final InternalTenantContext context) {
@@ -248,7 +250,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
@Override
- public List<Subscription> getSubscriptionsForAccountAndKey(final SubscriptionFactory factory, final UUID accountId,
+ public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId,
final String bundleKey, final InternalTenantContext context) {
return transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<List<Subscription>>() {
@Override
@@ -257,7 +259,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
if (bundleModel == null) {
return Collections.emptyList();
}
- return getSubscriptions(factory, bundleModel.getId(), context);
+ return getSubscriptions(bundleModel.getId(), context);
}
});
}
@@ -389,10 +391,10 @@ public class DefaultEntitlementDao implements EntitlementDao {
final EntitlementEventSqlDao eventsDaoFromSameTransaction = entitySqlDaoWrapperFactory.become(EntitlementEventSqlDao.class);
for (final EntitlementEvent cur : initialEvents) {
eventsDaoFromSameTransaction.create(new EntitlementEventModelDao(cur), context);
- recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
- cur.getEffectiveDate(),
- new EntitlementNotificationKey(cur.getId()),
- context);
+
+ final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+ recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
+
}
// Notify the Bus of the latest requested change, if needed
if (initialEvents.size() > 0) {
@@ -403,6 +405,8 @@ public class DefaultEntitlementDao implements EntitlementDao {
});
}
+
+
@Override
public void recreateSubscription(final SubscriptionData subscription, final List<EntitlementEvent> recreateEvents, final InternalCallContext context) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -412,11 +416,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
for (final EntitlementEvent cur : recreateEvents) {
transactional.create(new EntitlementEventModelDao(cur), context);
- recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
- cur.getEffectiveDate(),
- new EntitlementNotificationKey(cur.getId()),
- context);
+ final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+ recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
}
// Notify the Bus of the latest requested change
@@ -428,6 +430,24 @@ public class DefaultEntitlementDao implements EntitlementDao {
}
@Override
+ public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+
+ transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+ @Override
+ public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+ for (int i = 0; i < subscriptions.size(); i++) {
+ final SubscriptionData subscription = subscriptions.get(i);
+ final EntitlementEvent cancelEvent = cancelEvents.get(i);
+ cancelSubscriptionFromTransaction(subscription, cancelEvent, entitySqlDaoWrapperFactory, context, i);
+ }
+ return null;
+ }
+ });
+ }
+
+
+
+ @Override
public void cancelSubscription(final SubscriptionData subscription, final EntitlementEvent cancelEvent, final InternalCallContext context, final int seqId) {
transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@Override
@@ -498,10 +518,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
for (final EntitlementEvent cur : changeEventsTweakedWithMigrateBilling) {
transactional.create(new EntitlementEventModelDao(cur), context);
- recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
- cur.getEffectiveDate(),
- new EntitlementNotificationKey(cur.getId()),
- context);
+
+ final boolean isBusEvent = cur.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0 && (cur.getType() == EventType.API_USER);
+ recordBusOrFutureNotificationFromTransaction(subscription, cur, entitySqlDaoWrapperFactory, isBusEvent, 0, context);
}
// Notify the Bus of the latest requested change
@@ -608,10 +627,9 @@ public class DefaultEntitlementDao implements EntitlementDao {
final UUID subscriptionId = subscription.getId();
cancelFutureEventsFromTransaction(subscriptionId, entitySqlDaoWrapperFactory, context);
entitySqlDaoWrapperFactory.become(EntitlementEventSqlDao.class).create(new EntitlementEventModelDao(cancelEvent), context);
- recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
- cancelEvent.getEffectiveDate(),
- new EntitlementNotificationKey(cancelEvent.getId(), seqId),
- context);
+
+ final boolean isBusEvent = cancelEvent.getEffectiveDate().compareTo(clock.getUTCNow()) <= 0;
+ recordBusOrFutureNotificationFromTransaction(subscription, cancelEvent, entitySqlDaoWrapperFactory, isBusEvent, seqId, context);
// Notify the Bus of the requested change
notifyBusOfRequestedChange(entitySqlDaoWrapperFactory, subscription, cancelEvent, context);
@@ -663,14 +681,14 @@ public class DefaultEntitlementDao implements EntitlementDao {
}
}
- private Subscription buildSubscription(final SubscriptionFactory factory, final Subscription input, final InternalTenantContext context) {
+ private Subscription buildSubscription(final Subscription input, final InternalTenantContext context) {
if (input == null) {
return null;
}
final List<Subscription> bundleInput = new ArrayList<Subscription>();
if (input.getCategory() == ProductCategory.ADD_ON) {
- final Subscription baseSubscription = getBaseSubscription(factory, input.getBundleId(), false, context);
+ final Subscription baseSubscription = getBaseSubscription(input.getBundleId(), false, context);
if (baseSubscription == null) {
return null;
}
@@ -681,7 +699,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
bundleInput.add(input);
}
- final List<Subscription> reloadedSubscriptions = buildBundleSubscriptions(input.getBundleId(), factory, bundleInput, context);
+ final List<Subscription> reloadedSubscriptions = buildBundleSubscriptions(input.getBundleId(), bundleInput, context);
for (final Subscription cur : reloadedSubscriptions) {
if (cur.getId().equals(input.getId())) {
return cur;
@@ -691,7 +709,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
throw new EntitlementError("Unexpected code path in buildSubscription");
}
- private List<Subscription> buildBundleSubscriptions(final UUID bundleId, final SubscriptionFactory factory, final List<Subscription> input, final InternalTenantContext context) {
+ private List<Subscription> buildBundleSubscriptions(final UUID bundleId, final List<Subscription> input, final InternalTenantContext context) {
if (input == null || input.size() == 0) {
return Collections.emptyList();
}
@@ -714,7 +732,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
final List<Subscription> result = new ArrayList<Subscription>(input.size());
for (final Subscription cur : input) {
final List<EntitlementEvent> events = getEventsForSubscription(cur.getId(), context);
- Subscription reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
+ Subscription reloaded = createSubscriptionForInternalUse(cur, events);
switch (cur.getCategory()) {
case BASE:
@@ -752,7 +770,7 @@ public class DefaultEntitlementDao implements EntitlementDao {
events.add(addOnCancelEvent);
// Finally reload subscription with full set of events
- reloaded = factory.createSubscription(new SubscriptionBuilder((SubscriptionData) cur), events);
+ reloaded = createSubscriptionForInternalUse(cur, events);
}
break;
default:
@@ -838,26 +856,58 @@ public class DefaultEntitlementDao implements EntitlementDao {
});
}
- private Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final boolean rebuildSubscription, final InternalTenantContext context) {
+ private SubscriptionData createSubscriptionForInternalUse(final Subscription shellSubscription, final List<EntitlementEvent> events) {
+ final SubscriptionData result = new SubscriptionData(new SubscriptionBuilder(((SubscriptionData) shellSubscription)), null, clock);
+ if (events.size() > 0) {
+ result.rebuildTransitions(events, catalogService.getFullCatalog());
+ }
+ return result;
+ }
+
+ private Subscription getBaseSubscription(final UUID bundleId, final boolean rebuildSubscription, final InternalTenantContext context) {
final List<Subscription> subscriptions = getSubscriptionFromBundleId(bundleId, context);
for (final Subscription cur : subscriptions) {
if (cur.getCategory() == ProductCategory.BASE) {
- return rebuildSubscription ? buildSubscription(factory, cur, context) : cur;
+ return rebuildSubscription ? buildSubscription(cur, context) : cur;
}
}
return null;
}
- private void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final DateTime effectiveDate,
- final NotificationKey notificationKey, final InternalCallContext context) {
+
+ //
+ // Either records a notfication or sends a bus event is operation is immediate
+ //
+ private void recordBusOrFutureNotificationFromTransaction(final SubscriptionData subscription, final EntitlementEvent event, final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final boolean busEvent,
+ final int seqId, final InternalCallContext context) {
+ if (busEvent) {
+ notifyBusOfEffectiveImmediateChange(entitySqlDaoWrapperFactory, subscription, event, seqId, context);
+ } else {
+ recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory,
+ event.getEffectiveDate(),
+ new EntitlementNotificationKey(event.getId()),
+ context);
+ }
+ }
+
+ //
+ // Sends bus notification for event on effecfive date-- only used for operation that happen immediately:
+ // - CREATE,
+ // - IMM CANCEL or CHANGE
+ //
+ private void notifyBusOfEffectiveImmediateChange(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final SubscriptionData subscription,
+ final EntitlementEvent immediateEvent, final int seqId, final InternalCallContext context) {
try {
- final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
- Engine.NOTIFICATION_QUEUE_NAME);
- subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, null, notificationKey, context);
- } catch (NoSuchNotificationQueue e) {
- throw new RuntimeException(e);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ final SubscriptionData upToDateSubscription = createSubscriptionWithNewEvent(subscription, immediateEvent);
+
+ final SubscriptionTransitionData transition = upToDateSubscription.getTransitionFromEvent(immediateEvent, seqId);
+ final EffectiveSubscriptionInternalEvent busEvent = new DefaultEffectiveSubscriptionEvent(transition, upToDateSubscription.getAlignStartDate(),
+ context.getAccountRecordId(), context.getTenantRecordId());
+
+
+ eventBus.postFromTransaction(busEvent, entitySqlDaoWrapperFactory, context);
+ } catch (EventBusException e) {
+ log.warn("Failed to post effective event for subscription " + subscription.getId(), e);
}
}
@@ -870,6 +920,18 @@ public class DefaultEntitlementDao implements EntitlementDao {
}
}
+ private void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final DateTime effectiveDate,
+ final NotificationKey notificationKey, final InternalCallContext context) {
+ try {
+ final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
+ Engine.NOTIFICATION_QUEUE_NAME);
+ subscriptionEventQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, effectiveDate, notificationKey, context);
+ } catch (NoSuchNotificationQueue e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
private void migrateBundleDataFromTransaction(final BundleMigrationData bundleTransferData, final EntitlementEventSqlDao transactional,
final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final InternalCallContext context) throws EntityPersistenceException {
@@ -902,4 +964,20 @@ public class DefaultEntitlementDao implements EntitlementDao {
transBundleDao.create(new SubscriptionBundleModelDao(bundleData), context);
}
+
+ //
+ // Creates a copy of the existing subscriptions whose 'transitions' will reflect the new event
+ //
+ private SubscriptionData createSubscriptionWithNewEvent(final SubscriptionData subscription, EntitlementEvent newEvent) {
+
+ final SubscriptionData subscriptionWithNewEvent = new SubscriptionData(subscription, null, clock);
+ final List<EntitlementEvent> allEvents = new LinkedList<EntitlementEvent>();
+ if (subscriptionWithNewEvent.getEvents() != null) {
+ allEvents.addAll(subscriptionWithNewEvent.getEvents());
+ }
+ allEvents.add(newEvent);
+ subscriptionWithNewEvent.rebuildTransitions(allEvents, catalogService.getFullCatalog());
+ return subscriptionWithNewEvent;
+ }
+
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
index 1c17929..2b83ab2 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementDao.java
@@ -20,7 +20,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
import com.ning.billing.entitlement.api.migration.AccountMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
@@ -46,17 +45,17 @@ public interface EntitlementDao {
public SubscriptionBundle createSubscriptionBundle(SubscriptionBundleData bundle, InternalCallContext context);
- public Subscription getSubscriptionFromId(SubscriptionFactory factory, UUID subscriptionId, InternalTenantContext context);
+ public Subscription getSubscriptionFromId(UUID subscriptionId, InternalTenantContext context);
// ACCOUNT retrieval
public UUID getAccountIdFromSubscriptionId(UUID subscriptionId, InternalTenantContext context);
// Subscription retrieval
- public Subscription getBaseSubscription(SubscriptionFactory factory, UUID bundleId, InternalTenantContext context);
+ public Subscription getBaseSubscription(UUID bundleId, InternalTenantContext context);
- public List<Subscription> getSubscriptions(SubscriptionFactory factory, UUID bundleId, InternalTenantContext context);
+ public List<Subscription> getSubscriptions(UUID bundleId, InternalTenantContext context);
- public List<Subscription> getSubscriptionsForAccountAndKey(SubscriptionFactory factory, UUID accountId, String bundleKey, InternalTenantContext context);
+ public List<Subscription> getSubscriptionsForAccountAndKey(UUID accountId, String bundleKey, InternalTenantContext context);
// Update
public void updateChargedThroughDate(SubscriptionData subscription, InternalCallContext context);
@@ -79,6 +78,8 @@ public interface EntitlementDao {
public void cancelSubscription(SubscriptionData subscription, EntitlementEvent cancelEvent, InternalCallContext context, int cancelSeq);
+ public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context);
+
public void uncancelSubscription(SubscriptionData subscription, List<EntitlementEvent> uncancelEvents, InternalCallContext context);
public void changePlan(SubscriptionData subscription, List<EntitlementEvent> changeEvents, InternalCallContext context);
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java
index 0f0e18d..b208f18 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/model/SubscriptionModelDao.java
@@ -21,8 +21,8 @@ import java.util.UUID;
import org.joda.time.DateTime;
import com.ning.billing.catalog.api.ProductCategory;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.util.dao.TableName;
import com.ning.billing.util.entity.EntityBase;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
index b4d3899..38695ac 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/RepairEntitlementDao.java
@@ -27,7 +27,6 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
import com.ning.billing.entitlement.api.migration.AccountMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
@@ -176,6 +175,10 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
}
@Override
+ public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+ }
+
+ @Override
public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents, final InternalCallContext context) {
addEvents(subscription.getId(), changeEvents);
}
@@ -228,7 +231,7 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
}
@Override
- public Subscription getSubscriptionFromId(final SubscriptionFactory factory, final UUID subscriptionId, final InternalTenantContext context) {
+ public Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
throw new EntitlementError(NOT_IMPLEMENTED);
}
@@ -238,17 +241,17 @@ public class RepairEntitlementDao implements EntitlementDao, RepairEntitlementLi
}
@Override
- public Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+ public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) {
throw new EntitlementError(NOT_IMPLEMENTED);
}
@Override
- public List<Subscription> getSubscriptions(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+ public List<Subscription> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
throw new EntitlementError(NOT_IMPLEMENTED);
}
@Override
- public List<Subscription> getSubscriptionsForAccountAndKey(final SubscriptionFactory factory, final UUID accountId,
+ public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId,
final String bundleKey, final InternalTenantContext context) {
throw new EntitlementError(NOT_IMPLEMENTED);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java b/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
index 459cae7..f3dc0b9 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/glue/DefaultEntitlementModule.java
@@ -24,7 +24,6 @@ import com.ning.billing.entitlement.alignment.MigrationPlanAligner;
import com.ning.billing.entitlement.alignment.PlanAligner;
import com.ning.billing.entitlement.api.EntitlementService;
import com.ning.billing.entitlement.api.SubscriptionApiService;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
import com.ning.billing.entitlement.api.migration.DefaultEntitlementMigrationApi;
import com.ning.billing.entitlement.api.migration.EntitlementMigrationApi;
import com.ning.billing.entitlement.api.svcs.DefaultEntitlementInternalApi;
@@ -32,12 +31,10 @@ import com.ning.billing.entitlement.api.timeline.DefaultEntitlementTimelineApi;
import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
import com.ning.billing.entitlement.api.timeline.RepairSubscriptionApiService;
-import com.ning.billing.entitlement.api.timeline.RepairSubscriptionFactory;
import com.ning.billing.entitlement.api.transfer.DefaultEntitlementTransferApi;
import com.ning.billing.entitlement.api.transfer.EntitlementTransferApi;
import com.ning.billing.entitlement.api.user.DefaultEntitlementUserApi;
import com.ning.billing.entitlement.api.user.DefaultSubscriptionApiService;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory;
import com.ning.billing.entitlement.api.user.EntitlementUserApi;
import com.ning.billing.entitlement.engine.addon.AddonUtils;
import com.ning.billing.entitlement.engine.core.Engine;
@@ -68,9 +65,6 @@ public class DefaultEntitlementModule extends AbstractModule implements Entitlem
protected void installEntitlementCore() {
- bind(SubscriptionFactory.class).annotatedWith(Names.named(REPAIR_NAMED)).to(RepairSubscriptionFactory.class).asEagerSingleton();
- bind(SubscriptionFactory.class).to(DefaultSubscriptionFactory.class).asEagerSingleton();
-
bind(SubscriptionApiService.class).annotatedWith(Names.named(REPAIR_NAMED)).to(RepairSubscriptionApiService.class).asEagerSingleton();
bind(SubscriptionApiService.class).to(DefaultSubscriptionApiService.class).asEagerSingleton();
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java b/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java
index 7a3ceaf..38c949f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/alignment/TestPlanAligner.java
@@ -33,8 +33,8 @@ import com.ning.billing.catalog.api.PhaseType;
import com.ning.billing.catalog.api.Plan;
import com.ning.billing.catalog.api.PriceListSet;
import com.ning.billing.catalog.io.VersionedCatalogLoader;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.util.config.CatalogConfig;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory;
import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.api.user.SubscriptionTransitionData;
@@ -164,7 +164,7 @@ public class TestPlanAligner extends KillbillTestSuite {
}
private SubscriptionData createSubscriptionStartedInThePast(final String productName, final PhaseType phaseType) {
- final DefaultSubscriptionFactory.SubscriptionBuilder builder = new DefaultSubscriptionFactory.SubscriptionBuilder();
+ final SubscriptionBuilder builder = new SubscriptionBuilder();
builder.setBundleStartDate(clock.getUTCNow().minusHours(10));
// Make sure to set the dates apart
builder.setAlignStartDate(new DateTime(builder.getBundleStartDate().plusHours(5)));
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index 0b95f47..bd05062 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -275,6 +275,10 @@ public abstract class TestApiBase extends EntitlementTestSuiteWithEmbeddedDB imp
new PlanPhaseSpecifier(productName, ProductCategory.BASE, term, planSet, null),
requestedDate == null ? clock.getUTCNow() : requestedDate, callContext);
assertNotNull(subscription);
+
+
+ //try {Thread.sleep(100000000); } catch (Exception e) {};
+
assertTrue(testListener.isCompleted(5000));
return subscription;
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
index db3df23..8052a4f 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/timeline/TestRepairBP.java
@@ -342,7 +342,7 @@ public class TestRepairBP extends TestApiBaseRepair {
assertEquals(dryRunBaseSubscription.getActiveVersion(), SubscriptionEvents.INITIAL_VERSION);
assertEquals(dryRunBaseSubscription.getBundleId(), bundle.getId());
- assertEquals(dryRunBaseSubscription.getStartDate(), baseSubscription.getStartDate());
+ assertTrue(dryRunBaseSubscription.getStartDate().compareTo(baseSubscription.getStartDate()) == 0);
Plan currentPlan = dryRunBaseSubscription.getCurrentPlan();
assertNotNull(currentPlan);
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java
index 2f21fe2..cf792cc 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/transfer/TestDefaultEntitlementTransferApi.java
@@ -35,11 +35,11 @@ import com.ning.billing.catalog.api.PlanPhaseSpecifier;
import com.ning.billing.catalog.api.PriceListSet;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.entitlement.EntitlementTestSuite;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
+import com.ning.billing.entitlement.api.SubscriptionApiService;
import com.ning.billing.entitlement.api.SubscriptionTransitionType;
import com.ning.billing.entitlement.api.timeline.EntitlementTimelineApi;
import com.ning.billing.entitlement.api.timeline.SubscriptionTimeline.ExistingEvent;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionData;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.events.EntitlementEvent;
@@ -63,10 +63,10 @@ public class TestDefaultEntitlementTransferApi extends EntitlementTestSuite {
public void setUp() throws Exception {
final EntitlementDao dao = Mockito.mock(EntitlementDao.class);
final CatalogService catalogService = new MockCatalogService(new MockCatalog());
- final SubscriptionFactory subscriptionFactory = Mockito.mock(SubscriptionFactory.class);
+ final SubscriptionApiService apiService = Mockito.mock(SubscriptionApiService.class);
final EntitlementTimelineApi timelineApi = Mockito.mock(EntitlementTimelineApi.class);
final InternalCallContextFactory internalCallContextFactory = new InternalCallContextFactory(Mockito.mock(IDBI.class), clock);
- transferApi = new DefaultEntitlementTransferApi(clock, dao, timelineApi, catalogService, subscriptionFactory, internalCallContextFactory);
+ transferApi = new DefaultEntitlementTransferApi(clock, dao, timelineApi, catalogService, apiService, internalCallContextFactory);
}
@Test(groups = "fast")
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
index b73c1e3..a7f3191 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoMemory.java
@@ -33,14 +33,13 @@ import org.slf4j.LoggerFactory;
import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.catalog.api.ProductCategory;
import com.ning.billing.catalog.api.TimeUnit;
-import com.ning.billing.entitlement.api.SubscriptionFactory;
import com.ning.billing.entitlement.api.migration.AccountMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.BundleMigrationData;
import com.ning.billing.entitlement.api.migration.AccountMigrationData.SubscriptionMigrationData;
import com.ning.billing.entitlement.api.timeline.SubscriptionDataRepair;
import com.ning.billing.entitlement.api.transfer.TransferCancelData;
-import com.ning.billing.entitlement.api.user.DefaultSubscriptionFactory.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.Subscription;
+import com.ning.billing.entitlement.api.user.SubscriptionBuilder;
import com.ning.billing.entitlement.api.user.SubscriptionBundle;
import com.ning.billing.entitlement.api.user.SubscriptionBundleData;
import com.ning.billing.entitlement.api.user.SubscriptionData;
@@ -141,10 +140,10 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
}
@Override
- public Subscription getSubscriptionFromId(final SubscriptionFactory factory, final UUID subscriptionId, final InternalTenantContext context) {
+ public Subscription getSubscriptionFromId(final UUID subscriptionId, final InternalTenantContext context) {
for (final Subscription cur : subscriptions) {
if (cur.getId().equals(subscriptionId)) {
- return buildSubscription(factory, (SubscriptionData) cur, context);
+ return buildSubscription((SubscriptionData) cur, context);
}
}
return null;
@@ -156,11 +155,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
}
@Override
- public List<Subscription> getSubscriptionsForAccountAndKey(final SubscriptionFactory factory, final UUID accountId, final String bundleKey, final InternalTenantContext context) {
+ public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId, final String bundleKey, final InternalTenantContext context) {
for (final SubscriptionBundle cur : bundles) {
if (cur.getExternalKey().equals(bundleKey) && cur.getAccountId().equals(bundleKey)) {
- return getSubscriptions(factory, cur.getId(), context);
+ return getSubscriptions(cur.getId(), context);
}
}
return Collections.emptyList();
@@ -175,7 +174,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
recordFutureNotificationFromTransaction(null, cur.getEffectiveDate(), new EntitlementNotificationKey(cur.getId()), context);
}
}
- final Subscription updatedSubscription = buildSubscription(null, subscription, context);
+ final Subscription updatedSubscription = buildSubscription(subscription, context);
subscriptions.add(updatedSubscription);
}
@@ -190,11 +189,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
}
@Override
- public List<Subscription> getSubscriptions(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+ public List<Subscription> getSubscriptions(final UUID bundleId, final InternalTenantContext context) {
final List<Subscription> results = new ArrayList<Subscription>();
for (final Subscription cur : subscriptions) {
if (cur.getBundleId().equals(bundleId)) {
- results.add(buildSubscription(factory, (SubscriptionData) cur, context));
+ results.add(buildSubscription((SubscriptionData) cur, context));
}
}
return results;
@@ -229,11 +228,11 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
}
@Override
- public Subscription getBaseSubscription(final SubscriptionFactory factory, final UUID bundleId, final InternalTenantContext context) {
+ public Subscription getBaseSubscription(final UUID bundleId, final InternalTenantContext context) {
for (final Subscription cur : subscriptions) {
if (cur.getBundleId().equals(bundleId) &&
cur.getCurrentPlan().getProduct().getCategory() == ProductCategory.BASE) {
- return buildSubscription(factory, (SubscriptionData) cur, context);
+ return buildSubscription((SubscriptionData) cur, context);
}
}
return null;
@@ -245,16 +244,13 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
insertEvent(nextPhase, context);
}
- private Subscription buildSubscription(final SubscriptionFactory factory, final SubscriptionData in, final InternalTenantContext context) {
- if (factory != null) {
- return factory.createSubscription(new SubscriptionBuilder(in), getEventsForSubscription(in.getId(), context));
- } else {
+ private Subscription buildSubscription(final SubscriptionData in, final InternalTenantContext context) {
final SubscriptionData subscription = new SubscriptionData(new SubscriptionBuilder(in), null, clock);
if (events.size() > 0) {
subscription.rebuildTransitions(getEventsForSubscription(in.getId(), context), catalogService.getFullCatalog());
}
return subscription;
- }
+
}
@Override
@@ -284,6 +280,15 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
}
@Override
+ public void cancelSubscriptions(final List<SubscriptionData> subscriptions, final List<EntitlementEvent> cancelEvents, final InternalCallContext context) {
+ synchronized (events) {
+ for (int i = 0; i < subscriptions.size(); i++) {
+ cancelSubscription(subscriptions.get(i), cancelEvents.get(i), context, 0);
+ }
+ }
+ }
+
+ @Override
public void changePlan(final SubscriptionData subscription, final List<EntitlementEvent> changeEvents, final InternalCallContext context) {
synchronized (events) {
cancelNextChangeEvent(subscription.getId());
@@ -303,7 +308,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
}
private void cancelNextPhaseEvent(final UUID subscriptionId, final InternalTenantContext context) {
- final Subscription curSubscription = getSubscriptionFromId(null, subscriptionId, context);
+ final Subscription curSubscription = getSubscriptionFromId(subscriptionId, context);
if (curSubscription.getCurrentPhase() == null ||
curSubscription.getCurrentPhase().getDuration().getUnit() == TimeUnit.UNLIMITED) {
return;
@@ -412,7 +417,7 @@ public class MockEntitlementDaoMemory implements EntitlementDao {
try {
final NotificationQueue subscriptionEventQueue = notificationQueueService.getNotificationQueue(Engine.ENTITLEMENT_SERVICE_NAME,
Engine.NOTIFICATION_QUEUE_NAME);
- subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, null, notificationKey, context);
+ subscriptionEventQueue.recordFutureNotificationFromTransaction(transactionalDao, effectiveDate, notificationKey, context);
} catch (NoSuchNotificationQueue e) {
throw new RuntimeException(e);
} catch (IOException e) {
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
index fff17a9..a580f01 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
@@ -17,6 +17,7 @@
package com.ning.billing.entitlement.glue;
import org.mockito.Mockito;
+import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.IDBI;
import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
@@ -28,6 +29,7 @@ import com.ning.billing.util.callcontext.MockCallContextSqlDao;
import com.ning.billing.util.glue.BusModule;
import com.ning.billing.util.glue.BusModule.BusType;
import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.google.inject.name.Names;
@@ -44,6 +46,12 @@ public class MockEngineModuleMemory extends MockEngineModule {
private void installNotificationQueue() {
bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
+ configureNotificationQueueConfig();
+ }
+
+ protected void configureNotificationQueueConfig() {
+ final NotificationQueueConfig config = new ConfigurationObjectFactory(System.getProperties()).build(NotificationQueueConfig.class);
+ bind(NotificationQueueConfig.class).toInstance(config);
}
protected void installDBI() {
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index fa5da08..3f43198 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -305,6 +305,11 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
return generator.generateInvoice(account, invoice, manualPay);
}
+ @Override
+ public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final CallContext context) {
+ dao.consumeExstingCBAOnAccountWithUnpaidInvoices(accountId, internalCallContextFactory.createInternalCallContext(accountId, context));
+ }
+
private void notifyBusOfInvoiceAdjustment(final UUID invoiceId, final UUID accountId, final InternalCallContext context) {
try {
eventBus.post(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId()), context);
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index e928a24..f5aa44e 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -203,7 +203,7 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
// Now we check whether we generated any credit that could be used on some unpaid invoices
useExistingCBAFromTransaction(invoice.getAccountId(), entitySqlDaoWrapperFactory, context);
- notifyOfFutureBillingEvents(entitySqlDaoWrapperFactory, invoice.getAccountId(), callbackDateTimePerSubscriptions);
+ notifyOfFutureBillingEvents(entitySqlDaoWrapperFactory, invoice.getAccountId(), callbackDateTimePerSubscriptions, context.getUserToken());
// Create associated payments
final InvoicePaymentSqlDao invoicePaymentSqlDao = entitySqlDaoWrapperFactory.become(InvoicePaymentSqlDao.class);
@@ -791,6 +791,16 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
});
}
+ public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final InternalCallContext context) {
+ transactionalSqlDao.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+ @Override
+ public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+ useExistingCBAFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
+ return null;
+ }
+ });
+ }
+
private void useExistingCBAFromTransaction(final UUID accountId, final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final InternalCallContext context) throws InvoiceApiException, EntityPersistenceException {
final BigDecimal accountCBA = getAccountCBAFromTransaction(accountId, entitySqlDaoWrapperFactory, context);
@@ -983,10 +993,11 @@ public class DefaultInvoiceDao extends EntityDaoBase<InvoiceModelDao, Invoice, I
invoice.addPayments(invoicePayments);
}
- private void notifyOfFutureBillingEvents(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId, final Map<UUID, DateTime> callbackDateTimePerSubscriptions) {
+ private void notifyOfFutureBillingEvents(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId,
+ final Map<UUID, DateTime> callbackDateTimePerSubscriptions, final UUID userToken) {
for (final UUID subscriptionId : callbackDateTimePerSubscriptions.keySet()) {
final DateTime callbackDateTimeUTC = callbackDateTimePerSubscriptions.get(subscriptionId);
- nextBillingDatePoster.insertNextBillingNotification(entitySqlDaoWrapperFactory, accountId, subscriptionId, callbackDateTimeUTC);
+ nextBillingDatePoster.insertNextBillingNotification(entitySqlDaoWrapperFactory, accountId, subscriptionId, callbackDateTimeUTC, userToken);
}
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java
index dbaa474..3e76427 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/InvoiceDao.java
@@ -163,4 +163,11 @@ public interface InvoiceDao {
void deleteCBA(UUID accountId, UUID invoiceId, UUID invoiceItemId, InternalCallContext context) throws InvoiceApiException;
void notifyOfPayment(InvoicePaymentModelDao invoicePayment, InternalCallContext context);
+
+ /**
+ *
+ * @param accountId the account for which we need to rebalance the CBA
+ * @param context the callContext
+ */
+ public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final InternalCallContext context);
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java b/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java
index 25bb819..74914d7 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/InvoiceListener.java
@@ -73,9 +73,9 @@ public class InvoiceListener {
}
}
- public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+ public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
try {
- final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "Next Billing Date", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
dispatcher.processSubscription(subscriptionId, eventDateTime, context);
} catch (InvoiceApiException e) {
log.error(e.getMessage());
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 6e3a8d7..464376d 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -68,21 +68,10 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
@Override
public void initialize() throws NotificationQueueAlreadyExists {
- final NotificationConfig notificationConfig = new NotificationConfig() {
- @Override
- public long getSleepTimeMs() {
- return config.getSleepTimeMs();
- }
-
- @Override
- public boolean isNotificationProcessingOff() {
- return config.isNotificationProcessingOff();
- }
- };
final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final Long accountRecordId, final Long tenantRecordId) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
try {
if (!(notificationKey instanceof NextBillingDateNotificationKey)) {
log.error("Invoice service received an unexpected event type {}", notificationKey.getClass().getName());
@@ -95,7 +84,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
if (subscription == null) {
log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")");
} else {
- processEvent(key.getUuidKey(), eventDate, accountRecordId, tenantRecordId);
+ processEvent(key.getUuidKey(), eventDate, userToken, accountRecordId, tenantRecordId);
}
} catch (EntitlementUserApiException e) {
log.warn("Next Billing Date Notification Queue handled spurious notification (key: " + key + ")", e);
@@ -108,8 +97,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
nextBillingQueue = notificationQueueService.createNotificationQueue(DefaultInvoiceService.INVOICE_SERVICE_NAME,
NEXT_BILLING_DATE_NOTIFIER_QUEUE,
- notificationQueueHandler,
- notificationConfig);
+ notificationQueueHandler);
}
@Override
@@ -125,7 +113,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
}
}
- private void processEvent(final UUID subscriptionId, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
- listener.handleNextBillingDateEvent(subscriptionId, eventDateTime, accountRecordId, tenantRecordId);
+ private void processEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ listener.handleNextBillingDateEvent(subscriptionId, eventDateTime, userToken, accountRecordId, tenantRecordId);
}
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
index f923aff..c17af59 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDatePoster.java
@@ -52,8 +52,8 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
@Override
public void insertNextBillingNotification(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId,
- final UUID subscriptionId, final DateTime futureNotificationTime) {
- final InternalCallContext context = createCallContext(accountId);
+ final UUID subscriptionId, final DateTime futureNotificationTime, final UUID userToken) {
+ final InternalCallContext context = createCallContext(accountId, userToken);
final NotificationQueue nextBillingQueue;
try {
@@ -61,7 +61,7 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
DefaultNextBillingDateNotifier.NEXT_BILLING_DATE_NOTIFIER_QUEUE);
log.info("Queuing next billing date notification at {} for subscriptionId {}", futureNotificationTime.toString(), subscriptionId.toString());
- nextBillingQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationTime, accountId,
+ nextBillingQueue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, futureNotificationTime,
new NextBillingDateNotificationKey(subscriptionId), context);
} catch (NoSuchNotificationQueue e) {
log.error("Attempting to put items on a non-existent queue (NextBillingDateNotifier).", e);
@@ -70,7 +70,7 @@ public class DefaultNextBillingDatePoster implements NextBillingDatePoster {
}
}
- private InternalCallContext createCallContext(final UUID accountId) {
- return internalCallContextFactory.createInternalCallContext(accountId, "NextBillingDatePoster", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+ private InternalCallContext createCallContext(final UUID accountId, final UUID userToken) {
+ return internalCallContextFactory.createInternalCallContext(accountId, "NextBillingDatePoster", CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
}
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
index 95955f7..1a8da73 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/NextBillingDatePoster.java
@@ -25,7 +25,6 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
public interface NextBillingDatePoster {
- void insertNextBillingNotification(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, UUID accountId,
- UUID subscriptionId, DateTime futureNotificationTime);
-
+ void insertNextBillingNotification(EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, UUID accountId,
+ UUID subscriptionId, DateTime futureNotificationTime, UUID userToken);
}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/api/migration/InvoiceApiTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/api/migration/InvoiceApiTestBase.java
index d4ee2e1..43afee2 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/api/migration/InvoiceApiTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/api/migration/InvoiceApiTestBase.java
@@ -23,6 +23,8 @@ import java.util.UUID;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
@@ -69,6 +71,8 @@ import com.google.inject.Inject;
@Guice(modules = {MockModuleNoEntitlement.class})
public abstract class InvoiceApiTestBase extends InvoicingTestBase {
+ private static final Logger log = LoggerFactory.getLogger(InvoiceApiTestBase.class);
+
protected static final Currency accountCurrency = Currency.USD;
@Inject
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
index 2949653..7559994 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
@@ -66,16 +66,6 @@ public class InvoiceDaoTestBase extends InvoicingTestBase {
private final InvoiceConfig invoiceConfig = new InvoiceConfig() {
@Override
- public long getSleepTimeMs() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isNotificationProcessingOff() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getNumberOfMonthsInFuture() {
return 36;
}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
index 6861d13..9371dce 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
@@ -182,6 +182,10 @@ public class MockInvoiceDao implements InvoiceDao {
}
@Override
+ public void consumeExstingCBAOnAccountWithUnpaidInvoices(final UUID accountId, final InternalCallContext context) {
+ }
+
+ @Override
public BigDecimal getAccountBalance(final UUID accountId, final InternalTenantContext context) {
BigDecimal balance = BigDecimal.ZERO;
diff --git a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java
index 5864d95..9c4087a 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGenerator.java
@@ -82,11 +82,6 @@ public class TestDefaultInvoiceGenerator extends InvoicingTestBase {
final Clock clock = new DefaultClock();
final InvoiceConfig invoiceConfig = new InvoiceConfig() {
@Override
- public long getSleepTimeMs() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getNumberOfMonthsInFuture() {
return 36;
}
@@ -95,11 +90,6 @@ public class TestDefaultInvoiceGenerator extends InvoicingTestBase {
public boolean isEmailNotificationsEnabled() {
return false;
}
-
- @Override
- public boolean isNotificationProcessingOff() {
- throw new UnsupportedOperationException();
- }
};
this.generator = new DefaultInvoiceGenerator(clock, invoiceConfig);
}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java
index 1fd99d3..c5c7168 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/generator/TestDefaultInvoiceGeneratorUnit.java
@@ -67,21 +67,11 @@ public class TestDefaultInvoiceGeneratorUnit extends InvoicingTestBase {
clock = new ClockMock();
gen = new TestDefaultInvoiceGeneratorMock(clock, new InvoiceConfig() {
@Override
- public boolean isNotificationProcessingOff() {
- return false;
- }
-
- @Override
public boolean isEmailNotificationsEnabled() {
return false;
}
@Override
- public long getSleepTimeMs() {
- return 100;
- }
-
- @Override
public int getNumberOfMonthsInFuture() {
return 5;
}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java b/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java
index 6b77d80..54091f5 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/MockNextBillingDatePoster.java
@@ -26,6 +26,7 @@ import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
public class MockNextBillingDatePoster implements NextBillingDatePoster {
@Override
- public void insertNextBillingNotification(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId, final UUID subscriptionId, final DateTime futureNotificationTime) {
+ public void insertNextBillingNotification(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory, final UUID accountId,
+ final UUID subscriptionId, final DateTime futureNotificationTime, final UUID userToken) {
}
}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
index 4ea0e9d..15d08b6 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
@@ -92,8 +92,7 @@ public class TestNextBillingDateNotifier extends InvoiceTestSuiteWithEmbeddedDB
}
@Override
- public void handleNextBillingDateEvent(final UUID subscriptionId,
- final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+ public void handleNextBillingDateEvent(final UUID subscriptionId, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
eventCount++;
latestSubscriptionId = subscriptionId;
}
@@ -182,7 +181,7 @@ public class TestNextBillingDateNotifier extends InvoiceTestSuiteWithEmbeddedDB
entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@Override
public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
- poster.insertNextBillingNotification(entitySqlDaoWrapperFactory, accountId, subscriptionId, readyTime);
+ poster.insertNextBillingNotification(entitySqlDaoWrapperFactory, accountId, subscriptionId, readyTime, UUID.randomUUID());
return null;
}
});
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
index 0162c8f..f07276d 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
@@ -332,6 +332,27 @@ public class AccountResource extends JaxRsResourceBase {
return Response.status(Status.OK).build();
}
+
+ /*
+ * ************************** INVOICE CBA REBALANCING ********************************
+ */
+ @POST
+ @Path("/{accountId:" + UUID_PATTERN + "}/" + CBA_REBALANCING)
+ @Consumes(APPLICATION_JSON)
+ @Produces(APPLICATION_JSON)
+ public Response rebalanceExistingCBAOnAccount(@PathParam("accountId") final String accountIdString,
+ @HeaderParam(HDR_CREATED_BY) final String createdBy,
+ @HeaderParam(HDR_REASON) final String reason,
+ @HeaderParam(HDR_COMMENT) final String comment,
+ @javax.ws.rs.core.Context final HttpServletRequest request) throws AccountApiException {
+
+ final CallContext callContext = context.createContext(createdBy, reason, comment, request);
+ final UUID accountId = UUID.fromString(accountIdString);
+
+ invoiceApi.consumeExstingCBAOnAccountWithUnpaidInvoices(accountId, callContext);
+ return Response.status(Status.OK).build();
+ }
+
/*
* ************************** PAYMENTS ********************************
*/
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
index 8384cf9..7d0ec23 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/JaxrsResource.java
@@ -132,4 +132,7 @@ public interface JaxrsResource {
public static final String EXPORT = "export";
public static final String EXPORT_PATH = PREFIX + "/" + EXPORT;
+
+ public static final String CBA_REBALANCING = "cbaRebalancing";
+
}
meter/pom.xml 3(+2 -1)
diff --git a/meter/pom.xml b/meter/pom.xml
index e374441..395f26e 100644
--- a/meter/pom.xml
+++ b/meter/pom.xml
@@ -8,7 +8,8 @@
OR CONDITIONS OF ANY KIND, either express or implied. See the ~ License for
the specific language governing permissions and limitations ~ under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.ning.billing</groupId>
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
index 39b7ef4..d107468 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -16,15 +16,16 @@
package com.ning.billing.ovedue.notification;
+import java.util.UUID;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.overdue.OverdueProperties;
import com.ning.billing.overdue.listener.OverdueListener;
import com.ning.billing.overdue.service.DefaultOverdueService;
+import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.notificationq.NotificationKey;
import com.ning.billing.util.notificationq.NotificationQueue;
import com.ning.billing.util.notificationq.NotificationQueueService;
@@ -55,21 +56,9 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
@Override
public void initialize() {
- final NotificationConfig notificationConfig = new NotificationConfig() {
- @Override
- public boolean isNotificationProcessingOff() {
- return config.isNotificationProcessingOff();
- }
-
- @Override
- public long getSleepTimeMs() {
- return config.getSleepTimeMs();
- }
- };
-
final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final Long accountRecordId, final Long tenantRecordId) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
try {
if (!(notificationKey instanceof OverdueCheckNotificationKey)) {
log.error("Overdue service received Unexpected notificationKey {}", notificationKey.getClass().getName());
@@ -77,7 +66,7 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
}
final OverdueCheckNotificationKey key = (OverdueCheckNotificationKey) notificationKey;
- listener.handleNextOverdueCheck(key, accountRecordId, tenantRecordId);
+ listener.handleNextOverdueCheck(key, userToken, accountRecordId, tenantRecordId);
} catch (IllegalArgumentException e) {
log.error("The key returned from the NextBillingNotificationQueue is not a valid UUID", e);
}
@@ -88,8 +77,7 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
try {
overdueQueue = notificationQueueService.createNotificationQueue(DefaultOverdueService.OVERDUE_SERVICE_NAME,
OVERDUE_CHECK_NOTIFIER_QUEUE,
- notificationQueueHandler,
- notificationConfig);
+ notificationQueueHandler);
} catch (NotificationQueueAlreadyExists e) {
throw new RuntimeException(e);
}
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
index ed401f1..2cb149e 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckPoster.java
@@ -52,7 +52,7 @@ public class DefaultOverdueCheckPoster implements OverdueCheckPoster {
DefaultOverdueCheckNotifier.OVERDUE_CHECK_NOTIFIER_QUEUE);
log.info("Queuing overdue check notification. id: {}, timestamp: {}", overdueable.getId().toString(), futureNotificationTime.toString());
- checkOverdueQueue.recordFutureNotification(futureNotificationTime, null, new OverdueCheckNotificationKey(overdueable.getId(), Blockable.Type.get(overdueable)), context);
+ checkOverdueQueue.recordFutureNotification(futureNotificationTime, new OverdueCheckNotificationKey(overdueable.getId(), Blockable.Type.get(overdueable)), context);
} catch (NoSuchNotificationQueue e) {
log.error("Attempting to put items on a non-existent queue (DefaultOverdueCheck).", e);
} catch (IOException e) {
diff --git a/overdue/src/main/java/com/ning/billing/overdue/listener/OverdueListener.java b/overdue/src/main/java/com/ning/billing/overdue/listener/OverdueListener.java
index 86af3de..1733da5 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/listener/OverdueListener.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/listener/OverdueListener.java
@@ -67,10 +67,10 @@ public class OverdueListener {
dispatcher.processOverdueForAccount(accountId, createCallContext(event.getUserToken(), event.getAccountRecordId(), event.getTenantRecordId()));
}
- public void handleNextOverdueCheck(final OverdueCheckNotificationKey notificationKey, final Long accountRecordId, final Long tenantRecordId) {
+ public void handleNextOverdueCheck(final OverdueCheckNotificationKey notificationKey, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
log.info(String.format("Received OD checkup notification for type = %s, id = %s",
notificationKey.getType(), notificationKey.getUuidKey()));
- dispatcher.processOverdue(notificationKey.getType(), notificationKey.getUuidKey(), createCallContext(null, accountRecordId, tenantRecordId));
+ dispatcher.processOverdue(notificationKey.getType(), notificationKey.getUuidKey(), createCallContext(userToken, accountRecordId, tenantRecordId));
}
private InternalCallContext createCallContext(final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
diff --git a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
index 084e6c6..ff3a672 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
@@ -64,7 +64,7 @@ import com.ning.billing.util.email.templates.TemplateModule;
import com.ning.billing.util.globallocker.GlobalLocker;
import com.ning.billing.util.globallocker.MySqlGlobalLocker;
import com.ning.billing.util.glue.BusModule;
-import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.glue.NotificationQueueModule;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
@@ -80,6 +80,7 @@ import static com.jayway.awaitility.Awaitility.await;
import static java.util.concurrent.TimeUnit.MINUTES;
public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
+
private Clock clock;
private DefaultOverdueCheckNotifier notifier;
@@ -88,6 +89,7 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
private NotificationQueueService notificationQueueService;
private static final class OverdueListenerMock extends OverdueListener {
+
int eventCount = 0;
UUID latestSubscriptionId = null;
@@ -96,7 +98,7 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
}
@Override
- public void handleNextOverdueCheck(final OverdueCheckNotificationKey key, final Long accountRecordId, final Long tenantRecordId) {
+ public void handleNextOverdueCheck(final OverdueCheckNotificationKey key, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
eventCount++;
latestSubscriptionId = key.getUuidKey();
}
@@ -118,7 +120,6 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
super.configure();
bind(Clock.class).to(ClockMock.class).asEagerSingleton();
bind(CallContextFactory.class).to(DefaultCallContextFactory.class).asEagerSingleton();
- bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
final InvoiceConfig invoiceConfig = new ConfigurationObjectFactory(System.getProperties()).build(InvoiceConfig.class);
bind(InvoiceConfig.class).toInstance(invoiceConfig);
final CatalogConfig catalogConfig = new ConfigurationObjectFactory(System.getProperties()).build(CatalogConfig.class);
@@ -132,7 +133,7 @@ public class TestOverdueCheckNotifier extends OverdueTestSuiteWithEmbeddedDB {
install(new MockJunctionModule());
install(new EmailModule());
install(new TemplateModule());
-
+ install(new NotificationQueueModule());
final AccountInternalApi accountApi = Mockito.mock(AccountInternalApi.class);
bind(AccountInternalApi.class).toInstance(accountApi);
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
index b45819d..5ffb060 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/AutoPayRetryService.java
@@ -39,7 +39,7 @@ public class AutoPayRetryService extends BaseRetryService implements RetryServic
final PaymentConfig config,
final PaymentProcessor paymentProcessor,
final InternalCallContextFactory internalCallContextFactory) {
- super(notificationQueueService, config, internalCallContextFactory);
+ super(notificationQueueService, internalCallContextFactory);
this.paymentProcessor = paymentProcessor;
}
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
index 2828c63..c2cda2d 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
@@ -47,16 +47,13 @@ public abstract class BaseRetryService implements RetryService {
private static final String PAYMENT_RETRY_SERVICE = "PaymentRetryService";
private final NotificationQueueService notificationQueueService;
- private final PaymentConfig config;
private final InternalCallContextFactory internalCallContextFactory;
private NotificationQueue retryQueue;
public BaseRetryService(final NotificationQueueService notificationQueueService,
- final PaymentConfig config,
final InternalCallContextFactory internalCallContextFactory) {
this.notificationQueueService = notificationQueueService;
- this.config = config;
this.internalCallContextFactory = internalCallContextFactory;
}
@@ -66,17 +63,16 @@ public abstract class BaseRetryService implements RetryService {
getQueueName(),
new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
if (!(notificationKey instanceof PaymentRetryNotificationKey)) {
log.error("Payment service got an unexpected notification type {}", notificationKey.getClass().getName());
return;
}
final PaymentRetryNotificationKey key = (PaymentRetryNotificationKey) notificationKey;
- final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, null);
+ final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, userToken);
retry(key.getUuidKey(), callContext);
}
- },
- config);
+ });
}
@Override
@@ -141,9 +137,9 @@ public abstract class BaseRetryService implements RetryService {
final NotificationKey key = new PaymentRetryNotificationKey(paymentId);
if (retryQueue != null) {
if (transactionalDao == null) {
- retryQueue.recordFutureNotification(timeOfRetry, null, key, context);
+ retryQueue.recordFutureNotification(timeOfRetry, key, context);
} else {
- retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, null, key, context);
+ retryQueue.recordFutureNotificationFromTransaction(transactionalDao, timeOfRetry, key, context);
}
}
} catch (NoSuchNotificationQueue e) {
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
index 250987b..c9a80a1 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/FailedPaymentRetryService.java
@@ -45,7 +45,7 @@ public class FailedPaymentRetryService extends BaseRetryService implements Retry
final PaymentConfig config,
final PaymentProcessor paymentProcessor,
final InternalCallContextFactory internalCallContextFactory) {
- super(notificationQueueService, config, internalCallContextFactory);
+ super(notificationQueueService, internalCallContextFactory);
this.paymentProcessor = paymentProcessor;
}
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
index 9217abd..2e76ea2 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/PluginFailureRetryService.java
@@ -43,10 +43,9 @@ public class PluginFailureRetryService extends BaseRetryService implements Retry
@Inject
public PluginFailureRetryService(final NotificationQueueService notificationQueueService,
- final PaymentConfig config,
final PaymentProcessor paymentProcessor,
final InternalCallContextFactory internalCallContextFactory) {
- super(notificationQueueService, config, internalCallContextFactory);
+ super(notificationQueueService, internalCallContextFactory);
this.paymentProcessor = paymentProcessor;
}
diff --git a/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java b/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java
index a01dda5..0da7d2b 100644
--- a/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java
+++ b/payment/src/test/java/com/ning/billing/payment/glue/PaymentTestModuleWithMocks.java
@@ -16,8 +16,6 @@
package com.ning.billing.payment.glue;
-import static org.testng.Assert.assertNotNull;
-
import java.io.IOException;
import java.net.URL;
import java.util.Properties;
@@ -28,7 +26,6 @@ import org.skife.config.SimplePropertyConfigSource;
import org.skife.jdbi.v2.IDBI;
import com.ning.billing.ObjectType;
-import com.ning.billing.util.config.PaymentConfig;
import com.ning.billing.mock.glue.MockInvoiceModule;
import com.ning.billing.mock.glue.MockNotificationQueueModule;
import com.ning.billing.payment.dao.MockPaymentDao;
@@ -37,6 +34,7 @@ import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
import com.ning.billing.util.callcontext.CallContextSqlDao;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.callcontext.MockCallContextSqlDao;
+import com.ning.billing.util.config.PaymentConfig;
import com.ning.billing.util.globallocker.GlobalLocker;
import com.ning.billing.util.globallocker.MockGlobalLocker;
import com.ning.billing.util.glue.BusModule;
@@ -46,9 +44,11 @@ import com.ning.billing.util.svcapi.tag.TagInternalApi;
import com.ning.billing.util.tag.Tag;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+
+import static org.testng.Assert.assertNotNull;
public class PaymentTestModuleWithMocks extends PaymentModule {
+
public static final String PLUGIN_TEST_NAME = "my-mock";
private void loadSystemPropertiesFromClasspath(final String resource) {
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
index c77521d..98be956 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
@@ -16,6 +16,8 @@
package com.ning.billing.util.bus.dao;
+import java.util.UUID;
+
import org.joda.time.DateTime;
import com.ning.billing.util.queue.PersistentQueueEntryLifecycle;
@@ -29,12 +31,13 @@ public class BusEventEntry implements PersistentQueueEntryLifecycle {
private final PersistentQueueEntryLifecycleState processingState;
private final String busEventClass;
private final String busEventJson;
+ private final UUID userToken;
private final Long accountRecordId;
private final Long tenantRecordId;
public BusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable,
final PersistentQueueEntryLifecycleState processingState, final String busEventClass, final String busEventJson,
- final Long accountRecordId, final Long tenantRecordId) {
+ final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
this.id = id;
this.createdOwner = createdOwner;
this.owner = owner;
@@ -42,13 +45,14 @@ public class BusEventEntry implements PersistentQueueEntryLifecycle {
this.processingState = processingState;
this.busEventClass = busEventClass;
this.busEventJson = busEventJson;
+ this.userToken = userToken;
this.accountRecordId = accountRecordId;
this.tenantRecordId = tenantRecordId;
}
public BusEventEntry(final String createdOwner, final String busEventClass, final String busEventJson,
- final Long accountRecordId, final Long tenantRecordId) {
- this(0, createdOwner, null, null, null, busEventClass, busEventJson, accountRecordId, tenantRecordId);
+ final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ this(0, createdOwner, null, null, null, busEventClass, busEventJson, userToken, accountRecordId, tenantRecordId);
}
public long getId() {
@@ -64,6 +68,11 @@ public class BusEventEntry implements PersistentQueueEntryLifecycle {
}
@Override
+ public UUID getUserToken() {
+ return userToken;
+ }
+
+ @Override
public String getOwner() {
return owner;
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
index 0f2ad73..3203c11 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.bus.dao;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
+import java.util.UUID;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.SQLStatement;
@@ -82,6 +83,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
public void bind(@SuppressWarnings("rawtypes") final SQLStatement stmt, final Bind bind, final BusEventEntry evt) {
stmt.bind("className", evt.getBusEventClass());
stmt.bind("eventJson", evt.getBusEventJson());
+ stmt.bind("userToken", getUUIDString(evt.getUserToken()));
stmt.bind("createdDate", getDate(new DateTime()));
stmt.bind("creatingOwner", evt.getCreatedOwner());
stmt.bind("processingAvailableDate", getDate(evt.getNextAvailableDate()));
@@ -100,6 +102,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
final String className = r.getString("class_name");
final String createdOwner = r.getString("creating_owner");
final String eventJson = r.getString("event_json");
+ final UUID userToken = getUUID(r, "user_token");
final DateTime nextAvailableDate = getDateTime(r, "processing_available_date");
final String processingOwner = r.getString("processing_owner");
final PersistentQueueEntryLifecycleState processingState = PersistentQueueEntryLifecycleState.valueOf(r.getString("processing_state"));
@@ -107,7 +110,7 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
final Long tenantRecordId = r.getLong("tenant_record_id");
return new BusEventEntry(recordId, createdOwner, processingOwner, nextAvailableDate, processingState, className,
- eventJson, accountRecordId, tenantRecordId);
+ eventJson, userToken, accountRecordId, tenantRecordId);
}
}
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
index 859fce8..f2fa6ab 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentInternalBus.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ThreadFactory;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +59,8 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
private final String hostname;
private final InternalCallContextFactory internalCallContextFactory;
+ private volatile boolean isStarted;
+
private static final class EventBusDelegate extends EventBus {
public EventBusDelegate(final String busName) {
@@ -96,16 +97,19 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
this.hostname = Hostname.get();
this.internalCallContextFactory = internalCallContextFactory;
+ this.isStarted = false;
}
@Override
public void start() {
startQueue();
+ isStarted = true;
}
@Override
public void stop() {
stopQueue();
+ isStarted = false;
}
@Override
@@ -131,6 +135,11 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
return result;
}
+ @Override
+ public boolean isStarted() {
+ return isStarted;
+ }
+
private List<BusEventEntry> getNextBusEvent(final InternalCallContext context) {
final Date now = clock.getUTCNow().toDate();
final Date nextAvailable = clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
@@ -182,16 +191,15 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
private void postFromTransaction(final BusInternalEvent event, final InternalCallContext context, final PersistentBusSqlDao transactional) {
try {
final String json = objectMapper.writeValueAsString(event);
- final BusEventEntry entry = new BusEventEntry(hostname, event.getClass().getName(), json, context.getAccountRecordId(), context.getTenantRecordId());
+ final BusEventEntry entry = new BusEventEntry(hostname, event.getClass().getName(), json, context.getUserToken(), context.getAccountRecordId(), context.getTenantRecordId());
transactional.insertBusEvent(entry, context);
} catch (Exception e) {
log.error("Failed to post BusEvent " + event, e);
}
}
-
private String tweakJsonToIncludeAccountAndTenantRecordId(final String input, final Long accountRecordId, final Long tenantRecordId) {
- int lastIndexPriorFinalBracket = input.lastIndexOf("}");
+ final int lastIndexPriorFinalBracket = input.lastIndexOf("}");
final StringBuilder tmp = new StringBuilder(input.substring(0, lastIndexPriorFinalBracket));
tmp.append(",\"accountRecordId\":");
tmp.append(accountRecordId);
@@ -200,5 +208,4 @@ public class PersistentInternalBus extends PersistentQueueBase implements Intern
tmp.append("}");
return tmp.toString();
}
-
}
diff --git a/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java b/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java
index aad7427..30c0ebd 100644
--- a/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/EntitlementConfig.java
@@ -19,14 +19,5 @@ package com.ning.billing.util.config;
import org.skife.config.Config;
import org.skife.config.Default;
-public interface EntitlementConfig extends NotificationConfig, KillbillConfig {
- @Override
- @Config("killbill.entitlement.engine.notifications.sleep")
- @Default("500")
- public long getSleepTimeMs();
-
- @Override
- @Config("killbill.entitlement.engine.notifications.off")
- @Default("false")
- public boolean isNotificationProcessingOff();
+public interface EntitlementConfig extends KillbillConfig {
}
diff --git a/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java b/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java
index a1f8d7b..73db4fc 100644
--- a/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/InvoiceConfig.java
@@ -19,17 +19,7 @@ package com.ning.billing.util.config;
import org.skife.config.Config;
import org.skife.config.Default;
-public interface InvoiceConfig extends NotificationConfig, KillbillConfig {
- @Override
- @Config("killbill.invoice.engine.notifications.sleep")
- @Default("500")
- public long getSleepTimeMs();
-
- @Override
- @Config("killbill.invoice.engine.notifications.off")
- @Default("false")
- public boolean isNotificationProcessingOff();
-
+public interface InvoiceConfig extends KillbillConfig {
@Config("killbill.invoice.maxNumberOfMonthsInFuture")
@Default("36")
public int getNumberOfMonthsInFuture();
diff --git a/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java b/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java
index 16a6e4b..64ae12d 100644
--- a/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java
+++ b/util/src/main/java/com/ning/billing/util/config/PaymentConfig.java
@@ -22,7 +22,7 @@ import org.skife.config.Config;
import org.skife.config.Default;
-public interface PaymentConfig extends NotificationConfig, KillbillConfig {
+public interface PaymentConfig extends KillbillConfig {
@Config("killbill.payment.provider.default")
@Default("noop")
public String getDefaultPaymentProvider();
@@ -43,16 +43,6 @@ public interface PaymentConfig extends NotificationConfig, KillbillConfig {
@Default("8")
public int getPluginFailureRetryMaxAttempts();
- @Override
- @Config("killbill.payment.engine.notifications.sleep")
- @Default("500")
- public long getSleepTimeMs();
-
- @Override
- @Config("killbill.payment.engine.notifications.off")
- @Default("false")
- public boolean isNotificationProcessingOff();
-
@Config("killbill.payment.off")
@Default("false")
public boolean isPaymentOff();
diff --git a/util/src/main/java/com/ning/billing/util/dao/BinderBase.java b/util/src/main/java/com/ning/billing/util/dao/BinderBase.java
index 27cb985..48f2534 100644
--- a/util/src/main/java/com/ning/billing/util/dao/BinderBase.java
+++ b/util/src/main/java/com/ning/billing/util/dao/BinderBase.java
@@ -17,11 +17,17 @@
package com.ning.billing.util.dao;
import java.util.Date;
+import java.util.UUID;
import org.joda.time.DateTime;
public abstract class BinderBase {
+
protected Date getDate(final DateTime dateTime) {
return dateTime == null ? null : dateTime.toDate();
}
+
+ protected String getUUIDString(final UUID uuid) {
+ return uuid == null ? null : uuid.toString();
+ }
}
diff --git a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
index 557a6e1..7e4e1c9 100644
--- a/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/NotificationQueueModule.java
@@ -16,15 +16,25 @@
package com.ning.billing.util.glue;
+import org.skife.config.ConfigurationObjectFactory;
+
+import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.google.inject.AbstractModule;
public class NotificationQueueModule extends AbstractModule {
+
+ protected void configureNotificationQueueConfig() {
+ final NotificationQueueConfig config = new ConfigurationObjectFactory(System.getProperties()).build(NotificationQueueConfig.class);
+ bind(NotificationQueueConfig.class).toInstance(config);
+ }
@Override
protected void configure() {
bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+ configureNotificationQueueConfig();
}
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 811846e..0a295ae 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -55,12 +55,11 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
public List<Notification> getReadyNotifications(@Bind("now") Date now,
@Bind("owner") String owner,
@Bind("max") int max,
- @Bind("queueName") String queueName,
@InternalTenantContextBinder final InternalTenantContext context);
@SqlQuery
@Mapper(NotificationSqlMapper.class)
- public List<Notification> getNotificationForAccountAndDate(@Bind("accountId") final String accountId,
+ public List<Notification> getNotificationForAccountAndDate(@Bind("accountRecordId") final long accountRecordId,
@Bind("effectiveDate") final Date effectiveDate,
@InternalTenantContextBinder final InternalTenantContext context);
@@ -102,7 +101,8 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
stmt.bind("createdDate", getDate(new DateTime()));
stmt.bind("creatingOwner", evt.getCreatedOwner());
stmt.bind("className", evt.getNotificationKeyClass());
- stmt.bind("accountId", evt.getAccountId() != null ? evt.getAccountId().toString() : null);
+ // The current user token will be bound with the InternalTenantContextBinder
+ stmt.bind("futureUserToken", getUUIDString(evt.getFutureUserToken()));
stmt.bind("notificationKey", evt.getNotificationKey());
stmt.bind("effectiveDate", getDate(evt.getEffectiveDate()));
stmt.bind("queueName", evt.getQueueName());
@@ -123,7 +123,8 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
final String createdOwner = r.getString("creating_owner");
final String className = r.getString("class_name");
final String notificationKey = r.getString("notification_key");
- final UUID accountId = getUUID(r, "account_id");
+ final UUID userToken = getUUID(r, "user_token");
+ final UUID futureUserToken = getUUID(r, "future_user_token");
final String queueName = r.getString("queue_name");
final DateTime effectiveDate = getDateTime(r, "effective_date");
final DateTime nextAvailableDate = getDateTime(r, "processing_available_date");
@@ -133,7 +134,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
final Long tenantRecordId = r.getLong("tenant_record_id");
return new DefaultNotification(ordering, id, createdOwner, processingOwner, queueName, nextAvailableDate,
- processingState, className, notificationKey, accountId, effectiveDate,
+ processingState, className, notificationKey, userToken, futureUserToken, effectiveDate,
accountRecordId, tenantRecordId);
}
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 848ed9a..c5310a4 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -32,15 +32,16 @@ public class DefaultNotification extends EntityBase implements Notification {
private final PersistentQueueEntryLifecycleState lifecycleState;
private final String notificationKeyClass;
private final String notificationKey;
+ private final UUID userToken;
+ private final UUID futureUserToken;
private final DateTime effectiveDate;
- private final UUID accountId;
private final Long accountRecordId;
private final Long tenantRecordId;
public DefaultNotification(final long ordering, final UUID id, final String createdOwner, final String owner, final String queueName,
final DateTime nextAvailableDate, final PersistentQueueEntryLifecycleState lifecycleState,
- final String notificationKeyClass, final String notificationKey, final UUID accountId, final DateTime effectiveDate,
- final Long accountRecordId, final Long tenantRecordId) {
+ final String notificationKeyClass, final String notificationKey, final UUID userToken, final UUID futureUserToken,
+ final DateTime effectiveDate, final Long accountRecordId, final Long tenantRecordId) {
super(id);
this.ordering = ordering;
this.owner = owner;
@@ -50,17 +51,18 @@ public class DefaultNotification extends EntityBase implements Notification {
this.lifecycleState = lifecycleState;
this.notificationKeyClass = notificationKeyClass;
this.notificationKey = notificationKey;
- this.accountId = accountId;
+ this.userToken = userToken;
+ this.futureUserToken = futureUserToken;
this.effectiveDate = effectiveDate;
this.accountRecordId = accountRecordId;
this.tenantRecordId = tenantRecordId;
}
public DefaultNotification(final String queueName, final String createdOwner, final String notificationKeyClass,
- final String notificationKey, final UUID accountId, final DateTime effectiveDate,
+ final String notificationKey, final UUID userToken, final UUID futureUserToken, final DateTime effectiveDate,
final Long accountRecordId, final Long tenantRecordId) {
this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE,
- notificationKeyClass, notificationKey, accountId, effectiveDate, accountRecordId, tenantRecordId);
+ notificationKeyClass, notificationKey, userToken, futureUserToken, effectiveDate, accountRecordId, tenantRecordId);
}
@Override
@@ -128,8 +130,13 @@ public class DefaultNotification extends EntityBase implements Notification {
}
@Override
- public UUID getAccountId() {
- return accountId;
+ public UUID getUserToken() {
+ return userToken;
+ }
+
+ @Override
+ public UUID getFutureUserToken() {
+ return futureUserToken;
}
@Override
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 6f58a2e..2780faa 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -17,156 +17,162 @@
package com.ning.billing.util.notificationq;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
-import javax.annotation.Nullable;
-
import org.joda.time.DateTime;
+import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
+import org.skife.jdbi.v2.tweak.HandleCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.ning.billing.util.Hostname;
import com.ning.billing.util.config.NotificationConfig;
-import com.ning.billing.util.callcontext.CallOrigin;
+
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
-import com.ning.billing.util.callcontext.UserType;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.entity.dao.EntitySqlDao;
import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
-public class DefaultNotificationQueue extends NotificationQueueBase {
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+
+public class DefaultNotificationQueue implements NotificationQueue {
private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
+ private final IDBI dbi;
private final NotificationSqlDao dao;
- private final InternalCallContextFactory internalCallContextFactory;
+ private final String hostname;
- public DefaultNotificationQueue(final IDBI dbi, final Clock clock, final String svcName, final String queueName,
- final NotificationQueueHandler handler, final NotificationConfig config,
- final InternalCallContextFactory internalCallContextFactory) {
- super(clock, svcName, queueName, handler, config);
- this.dao = dbi.onDemand(NotificationSqlDao.class);
- this.internalCallContextFactory = internalCallContextFactory;
- }
+ private final String svcName;
+ private final String queueName;
- @Override
- public int doProcessEvents() {
- logDebug("ENTER doProcessEvents");
- // Finding and claiming notifications is not done per tenant (yet?)
- final List<Notification> notifications = getReadyNotifications(createCallContext(null, null));
- if (notifications.size() == 0) {
- logDebug("EXIT doProcessEvents");
- return 0;
- }
+ private final ObjectMapper objectMapper;
- logDebug("START processing %d events at time %s", notifications.size(), getClock().getUTCNow().toDate());
-
- int result = 0;
- for (final Notification cur : notifications) {
- getNbProcessedEvents().incrementAndGet();
- logDebug("handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
- final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
- getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
- result++;
- clearNotification(cur, createCallContext(cur.getTenantRecordId(), cur.getAccountRecordId()));
- logDebug("done handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
- }
+ private final NotificationQueueHandler handler;
+
+ private final NotificationQueueService notificationQueueService;
+
+ private volatile boolean isStarted;
- return result;
+ public DefaultNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler,
+ final IDBI dbi, final NotificationQueueService notificationQueueService) {
+ this.svcName = svcName;
+ this.queueName = queueName;
+ this.handler = handler;
+ this.dbi = dbi;
+ this.dao = dbi.onDemand(NotificationSqlDao.class);
+ this.hostname = Hostname.get();
+ this.notificationQueueService = notificationQueueService;
+ this.objectMapper = new ObjectMapper();
}
+
@Override
public void recordFutureNotification(final DateTime futureNotificationTime,
- final UUID accountId,
final NotificationKey notificationKey,
final InternalCallContext context) throws IOException {
- recordFutureNotificationInternal(futureNotificationTime, accountId, notificationKey, dao, context);
+ recordFutureNotificationInternal(futureNotificationTime, notificationKey, dao, context);
}
@Override
public void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
final DateTime futureNotificationTime,
- final UUID accountId,
final NotificationKey notificationKey,
final InternalCallContext context) throws IOException {
final NotificationSqlDao transactionalNotificationDao = transactionalDao.transmogrify(NotificationSqlDao.class);
- recordFutureNotificationInternal(futureNotificationTime, accountId, notificationKey, transactionalNotificationDao, context);
+ recordFutureNotificationInternal(futureNotificationTime, notificationKey, transactionalNotificationDao, context);
}
private void recordFutureNotificationInternal(final DateTime futureNotificationTime,
- final UUID accountId,
final NotificationKey notificationKey,
final NotificationSqlDao thisDao,
final InternalCallContext context) throws IOException {
final String json = objectMapper.writeValueAsString(notificationKey);
- final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json,
- accountId, futureNotificationTime, context.getAccountRecordId(), context.getTenantRecordId());
- thisDao.insertNotification(notification, context);
- }
+ final UUID futureUserToken = UUID.randomUUID();
- private void clearNotification(final Notification cleared, final InternalCallContext context) {
- dao.clearNotification(cleared.getId().toString(), getHostname(), context);
+ final Notification notification = new DefaultNotification(getFullQName(), hostname, notificationKey.getClass().getName(), json,
+ context.getUserToken(), futureUserToken, futureNotificationTime, context.getAccountRecordId(), context.getTenantRecordId());
+ thisDao.insertNotification(notification, context);
}
- private List<Notification> getReadyNotifications(final InternalCallContext context) {
- final Date now = getClock().getUTCNow().toDate();
- final Date nextAvailable = getClock().getUTCNow().plus(CLAIM_TIME_MS).toDate();
- final List<Notification> input = dao.getReadyNotifications(now, getHostname(), CLAIM_TIME_MS, getFullQName(), context);
- final List<Notification> claimedNotifications = new ArrayList<Notification>();
- for (final Notification cur : input) {
- logDebug("about to claim notification %s, key = %s for time %s",
- cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
- final boolean claimed = (dao.claimNotification(getHostname(), nextAvailable, cur.getId().toString(), now, context) == 1);
- logDebug("claimed notification %s, key = %s for time %s result = %s",
- cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate(), claimed);
+ @Override
+ public void removeNotificationsByKey(final NotificationKey notificationKey, final InternalCallContext context) {
+ dao.removeNotificationsByKey(notificationKey.toString(), context);
+ }
- if (claimed) {
- claimedNotifications.add(cur);
- dao.insertClaimedHistory(getHostname(), now, cur.getId().toString(), context);
+ @Override
+ public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
+ // TODO we have the same use case in InternalCallContextFactory, do we need some sort of helper class?
+ final Long accountRecordId = dbi.withHandle(new HandleCallback<Long>() {
+ @Override
+ public Long withHandle(final Handle handle) throws Exception {
+ final List<Map<String, Object>> values = handle.select("select record_id from accounts where id = " + accountId.toString());
+ if (values.size() == 0) {
+ return null;
+ } else {
+ return (Long) values.get(0).get("record_id");
+ }
}
- }
+ });
- for (final Notification cur : claimedNotifications) {
- if (cur.getOwner() != null && !cur.getOwner().equals(getHostname())) {
- log.warn("NotificationQueue {} stealing notification {} from {}", new Object[]{getFullQName(), cur, cur.getOwner()});
- }
+ if (accountId == null) {
+ return ImmutableList.<Notification>of();
+ } else {
+ return dao.getNotificationForAccountAndDate(accountRecordId, effectiveDate.toDate(), context);
}
+ }
- return claimedNotifications;
+ @Override
+ public void removeNotification(final UUID notificationId, final InternalCallContext context) {
+ dao.removeNotification(notificationId.toString(), context);
}
- private void logDebug(final String format, final Object... args) {
- if (log.isDebugEnabled()) {
- final String realDebug = String.format(format, args);
- log.debug(String.format("Thread %d [queue = %s] %s", Thread.currentThread().getId(), getFullQName(), realDebug));
- }
+ @Override
+ public String getFullQName() {
+ return NotificationQueueServiceBase.getCompositeName(svcName, queueName);
}
@Override
- public void removeNotificationsByKey(final NotificationKey notificationKey, final InternalCallContext context) {
- dao.removeNotificationsByKey(notificationKey.toString(), context);
+ public String getServiceName() {
+ return svcName;
}
@Override
- public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
- return dao.getNotificationForAccountAndDate(accountId.toString(), effectiveDate.toDate(), context);
+ public String getQueueName() {
+ return queueName;
}
@Override
- public void removeNotification(final UUID notificationId, final InternalCallContext context) {
- dao.removeNotification(notificationId.toString(), context);
+ public NotificationQueueHandler getHandler() {
+ return handler;
+ }
+
+ @Override
+ public void startQueue() {
+ notificationQueueService.startQueue();
+ isStarted = true;
}
- private InternalCallContext createCallContext(@Nullable final Long tenantRecordId, @Nullable final Long accountRecordId) {
- return internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "NotificationQueue", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+ @Override
+ public void stopQueue() {
+ // Order matters...
+ isStarted = false;
+ notificationQueueService.stopQueue();
}
+
+ @Override
+ public boolean isStarted() {
+ return isStarted;
+ }
+
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
index 350a140..d56023c 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueueService.java
@@ -21,25 +21,26 @@ import org.skife.jdbi.v2.IDBI;
import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
import com.google.inject.Inject;
public class DefaultNotificationQueueService extends NotificationQueueServiceBase {
+
private final IDBI dbi;
- private final InternalCallContextFactory internalCallContextFactory;
@Inject
- public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final InternalCallContextFactory internalCallContextFactory) {
- super(clock);
+ public DefaultNotificationQueueService(final IDBI dbi, final Clock clock, final NotificationQueueConfig config,
+ final InternalCallContextFactory internalCallContextFactory) {
+ super(clock, config, dbi, internalCallContextFactory);
this.dbi = dbi;
- this.internalCallContextFactory = internalCallContextFactory;
}
+
@Override
protected NotificationQueue createNotificationQueueInternal(final String svcName,
final String queueName,
- final NotificationQueueHandler handler,
- final NotificationConfig config) {
- return new DefaultNotificationQueue(dbi, clock, svcName, queueName, handler, config, internalCallContextFactory);
+ final NotificationQueueHandler handler) {
+ return new DefaultNotificationQueue(svcName, queueName, handler, dbi, this);
}
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index a790d15..4c28ff5 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -35,6 +35,8 @@ public interface Notification extends PersistentQueueEntryLifecycle, Entity {
public String getQueueName();
- // TODO - do we still need it now we have account_record_id?
- public UUID getAccountId();
+ // Future user token, i.e. user token of the context when this notification will be claimed.
+ // The user token can be used as a trace to follow events (e.g. all bus events triggered as a result of a
+ // claimed notification will share the same user token)
+ public UUID getFutureUserToken();
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
index e592674..58b49c4 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueue.java
@@ -21,12 +21,11 @@ import java.util.List;
import java.util.UUID;
import org.joda.time.DateTime;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import com.ning.billing.util.callcontext.InternalCallContext;
-import com.ning.billing.util.entity.dao.EntityDao;
import com.ning.billing.util.entity.dao.EntitySqlDao;
import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
+import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
import com.ning.billing.util.queue.QueueLifecycle;
public interface NotificationQueue extends QueueLifecycle {
@@ -38,7 +37,6 @@ public interface NotificationQueue extends QueueLifecycle {
* @param notificationKey the key for that notification
*/
public void recordFutureNotification(final DateTime futureNotificationTime,
- final UUID accountId,
final NotificationKey notificationKey,
final InternalCallContext context)
throws IOException;
@@ -52,7 +50,6 @@ public interface NotificationQueue extends QueueLifecycle {
*/
public void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao,
final DateTime futureNotificationTime,
- final UUID accountId,
final NotificationKey notificationKey,
final InternalCallContext context)
throws IOException;
@@ -70,13 +67,6 @@ public interface NotificationQueue extends QueueLifecycle {
public void removeNotification(final UUID notificationId,
final InternalCallContext context);
- /**
- * This is only valid when the queue has been configured with isNotificationProcessingOff is true
- * In which case, it will callback users for all the ready notifications.
- *
- * @return the number of entries we processed
- */
- public int processReadyNotification();
/**
* @return the name of that queue
@@ -92,4 +82,6 @@ public interface NotificationQueue extends QueueLifecycle {
* @return the queue name associated
*/
public String getQueueName();
+
+ public NotificationQueueHandler getHandler();
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index d6df6dd..cae7775 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -16,11 +16,14 @@
package com.ning.billing.util.notificationq;
+import java.util.UUID;
+
import org.joda.time.DateTime;
import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.queue.QueueLifecycle;
-public interface NotificationQueueService {
+public interface NotificationQueueService extends QueueLifecycle {
public interface NotificationQueueHandler {
@@ -28,10 +31,11 @@ public interface NotificationQueueService {
* Called for each notification ready
*
* @param notificationKey the notification key associated to that notification entry
+ * @param userToken user token associated with that notification entry
* @param accountRecordId account record id associated with that notification entry
* @param tenantRecordId tenant record id associated with that notification entry
*/
- public void handleReadyNotification(NotificationKey notificationKey, DateTime eventDateTime, Long accountRecordId, Long tenantRecordId);
+ public void handleReadyNotification(NotificationKey notificationKey, DateTime eventDateTime, UUID userToken, Long accountRecordId, Long tenantRecordId);
}
public static final class NotificationQueueAlreadyExists extends Exception {
@@ -58,12 +62,11 @@ public interface NotificationQueueService {
* @param svcName the name of the service using that queue
* @param queueName a name for that queue (unique per service)
* @param handler the handler required for notifying the caller of state change
- * @param config the notification queue configuration
* @return a new NotificationQueue
* @throws com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists
* is the queue associated with that service and name already exits
*/
- public NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config)
+ public NotificationQueue createNotificationQueue(final String svcName, final String queueName, final NotificationQueueHandler handler)
throws NotificationQueueAlreadyExists;
/**
@@ -89,8 +92,7 @@ public interface NotificationQueueService {
throws NoSuchNotificationQueue;
/**
- * @param services
* @return the number of processed notifications
*/
- public int triggerManualQueueProcessing(final String[] services, final Boolean keepRunning);
+ public int triggerManualQueueProcessing(final Boolean keepRunning);
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
index 8763192..99dd74f 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueServiceBase.java
@@ -17,39 +17,29 @@
package com.ning.billing.util.notificationq;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.skife.jdbi.v2.IDBI;
-import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
-import com.google.common.base.Joiner;
import com.google.inject.Inject;
-public abstract class NotificationQueueServiceBase implements NotificationQueueService {
- protected final Logger log = LoggerFactory.getLogger(DefaultNotificationQueueService.class);
+public abstract class NotificationQueueServiceBase extends NotificationQueueDispatcher implements NotificationQueueService {
- protected final Clock clock;
-
- private final Map<String, NotificationQueue> queues;
@Inject
- public NotificationQueueServiceBase(final Clock clock) {
- this.clock = clock;
- this.queues = new TreeMap<String, NotificationQueue>();
+ public NotificationQueueServiceBase(final Clock clock, final NotificationQueueConfig config, final IDBI dbi,
+ final InternalCallContextFactory internalCallContextFactory) {
+ super(clock, config, dbi, internalCallContextFactory);
}
@Override
public NotificationQueue createNotificationQueue(final String svcName,
final String queueName,
- final NotificationQueueHandler handler,
- final NotificationConfig config) throws NotificationQueueAlreadyExists {
- if (svcName == null || queueName == null || handler == null || config == null) {
+ final NotificationQueueHandler handler) throws NotificationQueueAlreadyExists {
+ if (svcName == null || queueName == null || handler == null) {
throw new RuntimeException("Need to specify all parameters");
}
@@ -61,7 +51,7 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
throw new NotificationQueueAlreadyExists(String.format("Queue for svc %s and name %s already exist",
svcName, queueName));
}
- result = createNotificationQueueInternal(svcName, queueName, handler, config);
+ result = createNotificationQueueInternal(svcName, queueName, handler);
queues.put(compositeName, result);
}
return result;
@@ -96,33 +86,29 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
}
}
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("NotificationQueueServiceBase");
+ sb.append("{queues=").append(queues);
+ sb.append('}');
+ return sb.toString();
+ }
+
+
//
// Test ONLY
//
@Override
- public int triggerManualQueueProcessing(final String[] services, final Boolean keepRunning) {
+ public int triggerManualQueueProcessing(final Boolean keepRunning) {
int result = 0;
- List<NotificationQueue> manualQueues = null;
- if (services == null) {
- manualQueues = new ArrayList<NotificationQueue>(queues.values());
- } else {
- final Joiner join = Joiner.on(",");
- join.join(services);
-
- log.info("Trigger manual processing for services {} ", join.toString());
- manualQueues = new LinkedList<NotificationQueue>();
- synchronized (queues) {
- for (final String svc : services) {
- addQueuesForService(manualQueues, svc);
- }
- }
- }
+ List<NotificationQueue> manualQueues = new ArrayList<NotificationQueue>(queues.values());
for (final NotificationQueue cur : manualQueues) {
int processedNotifications = 0;
do {
- processedNotifications = cur.processReadyNotification();
+ doProcessEventsWithLimit(1);
log.info("Got {} results from queue {}", processedNotifications, cur.getFullQName());
result += processedNotifications;
} while (keepRunning && processedNotifications > 0);
@@ -130,28 +116,6 @@ public abstract class NotificationQueueServiceBase implements NotificationQueueS
return result;
}
- private void addQueuesForService(final List<NotificationQueue> result, final String svcName) {
- for (final String cur : queues.keySet()) {
- if (cur.startsWith(svcName)) {
- result.add(queues.get(cur));
- }
- }
- }
-
protected abstract NotificationQueue createNotificationQueueInternal(String svcName,
- String queueName, NotificationQueueHandler handler,
- NotificationConfig config);
-
- public static String getCompositeName(final String svcName, final String queueName) {
- return svcName + ":" + queueName;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("NotificationQueueServiceBase");
- sb.append("{queues=").append(queues);
- sb.append('}');
- return sb.toString();
- }
+ String queueName, NotificationQueueHandler handler);
}
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
index 8deea0e..2fa9baa 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueBase.java
@@ -34,23 +34,25 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
private final int nbThreads;
private final Executor executor;
- private final String svcName;
+ private final String svcQName;
private final long sleepTimeMs;
private boolean isProcessingEvents;
private int curActiveThreads;
protected final ObjectMapper objectMapper;
-
- public PersistentQueueBase(final String svcName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
+
+
+ public PersistentQueueBase(final String svcQName, final Executor executor, final int nbThreads, final PersistentQueueConfig config) {
this.executor = executor;
this.nbThreads = nbThreads;
- this.svcName = svcName;
+ this.svcQName = svcQName;
this.sleepTimeMs = config.getSleepTimeMs();
this.objectMapper = new ObjectMapper();
this.isProcessingEvents = false;
this.curActiveThreads = 0;
}
+
@Override
public void startQueue() {
@@ -61,7 +63,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
final CountDownLatch doneInitialization = new CountDownLatch(nbThreads);
log.info(String.format("%s: Starting with %d threads",
- svcName, nbThreads));
+ svcQName, nbThreads));
for (int i = 0; i < nbThreads; i++) {
executor.execute(new Runnable() {
@@ -69,7 +71,7 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
public void run() {
log.info(String.format("%s: Thread %s [%d] starting",
- svcName,
+ svcQName,
Thread.currentThread().getName(),
Thread.currentThread().getId()));
@@ -93,19 +95,19 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
doProcessEvents();
} catch (Exception e) {
log.warn(String.format("%s: Thread %s [%d] got an exception, catching and moving on...",
- svcName,
+ svcQName,
Thread.currentThread().getName(),
Thread.currentThread().getId()), e);
}
sleepALittle();
}
} catch (InterruptedException e) {
- log.info(String.format("%s: Thread %s got interrupted, exting... ", svcName, Thread.currentThread().getName()));
+ log.info(String.format("%s: Thread %s got interrupted, exting... ", svcQName, Thread.currentThread().getName()));
} catch (Throwable e) {
- log.error(String.format("%s: Thread %s got an exception, exting... ", svcName, Thread.currentThread().getName()), e);
+ log.error(String.format("%s: Thread %s got an exception, exting... ", svcQName, Thread.currentThread().getName()), e);
} finally {
- log.info(String.format("%s: Thread %s has exited", svcName, Thread.currentThread().getName()));
+ log.info(String.format("%s: Thread %s has exited", svcQName, Thread.currentThread().getName()));
synchronized (thePersistentQ) {
curActiveThreads--;
}
@@ -121,12 +123,12 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
final boolean success = doneInitialization.await(waitTimeoutMs, TimeUnit.MILLISECONDS);
if (!success) {
- log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+ log.warn(String.format("%s: Failed to wait for all threads to be started, got %d/%d", svcQName, (nbThreads - doneInitialization.getCount()), nbThreads));
} else {
- log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcName, (nbThreads - doneInitialization.getCount()), nbThreads));
+ log.info(String.format("%s: Done waiting for all threads to be started, got %d/%d", svcQName, (nbThreads - doneInitialization.getCount()), nbThreads));
}
} catch (InterruptedException e) {
- log.warn(String.format("%s: Start sequence, got interrupted", svcName));
+ log.warn(String.format("%s: Start sequence, got interrupted", svcQName));
}
}
@@ -147,17 +149,17 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
}
} catch (InterruptedException ignore) {
- log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcName, curActiveThreads));
+ log.info(String.format("%s: Stop sequence has been interrupted, remaining active threads = %d", svcQName, curActiveThreads));
} finally {
if (remaining > 0) {
- log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+ log.error(String.format("%s: Stop sequence completed with %d active remaing threads", svcQName, curActiveThreads));
} else {
- log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcName, curActiveThreads));
+ log.info(String.format("%s: Stop sequence completed with %d active remaing threads", svcQName, curActiveThreads));
}
curActiveThreads = 0;
}
}
-
+
protected <T> T deserializeEvent(final String className, final String json) {
try {
final Class<?> claz = Class.forName(className);
@@ -168,9 +170,8 @@ public abstract class PersistentQueueBase implements QueueLifecycle {
}
}
+ public abstract int doProcessEvents();
+ public abstract boolean isStarted();
-
- @Override
- public abstract int doProcessEvents();
}
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java
index ea4ab88..922ed34 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java
@@ -16,6 +16,8 @@
package com.ning.billing.util.queue;
+import java.util.UUID;
+
import org.joda.time.DateTime;
public interface PersistentQueueEntryLifecycle {
@@ -40,4 +42,8 @@ public interface PersistentQueueEntryLifecycle {
public PersistentQueueEntryLifecycleState getProcessingState();
public boolean isAvailableForProcessing(DateTime now);
+
+ // User token associated with this bus event or notification (i.e. user token from the context that
+ // was used to generate this PersistentQueueEntryLifecycle)
+ public UUID getUserToken();
}
diff --git a/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java b/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
index b839468..e183bb9 100644
--- a/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/queue/QueueLifecycle.java
@@ -26,8 +26,5 @@ public interface QueueLifecycle {
*/
public void stopQueue();
- /**
- * Processes event from queue
- */
- public int doProcessEvents();
+ public boolean isStarted();
}
diff --git a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
index 5d67ee3..18ec252 100644
--- a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
@@ -8,6 +8,7 @@ getNextBusEventEntry() ::= <<
record_id
, class_name
, event_json
+ , user_token
, created_date
, creating_owner
, processing_owner
@@ -64,6 +65,7 @@ insertBusEvent() ::= <<
insert into bus_events (
class_name
, event_json
+ , user_token
, created_date
, creating_owner
, processing_owner
@@ -74,6 +76,7 @@ insertBusEvent() ::= <<
) values (
:className
, :eventJson
+ , :userToken
, :createdDate
, :creatingOwner
, :processingOwner
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 157001b..2a19753 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -130,8 +130,9 @@ CREATE TABLE notifications (
id char(36) NOT NULL,
created_date datetime NOT NULL,
class_name varchar(256) NOT NULL,
- account_id char(36),
notification_key varchar(2048) NOT NULL,
+ user_token char(36),
+ future_user_token char(36),
creating_owner char(50) NOT NULL,
effective_date datetime NOT NULL,
queue_name char(64) NOT NULL,
@@ -143,7 +144,7 @@ CREATE TABLE notifications (
PRIMARY KEY(record_id)
);
CREATE UNIQUE INDEX notifications_id ON notifications(id);
-CREATE INDEX `idx_comp_where` ON notifications (`effective_date`, `queue_name`, `processing_state`,`processing_owner`,`processing_available_date`);
+CREATE INDEX `idx_comp_where` ON notifications (`effective_date`, `processing_state`,`processing_owner`,`processing_available_date`);
CREATE INDEX `idx_update` ON notifications (`processing_state`,`processing_owner`,`processing_available_date`);
CREATE INDEX `idx_get_ready` ON notifications (`effective_date`,`created_date`,`id`);
CREATE INDEX notifications_tenant_account_record_id ON notifications(tenant_record_id, account_record_id);
@@ -185,8 +186,9 @@ CREATE INDEX audit_log_tenant_account_record_id ON audit_log(tenant_record_id, a
DROP TABLE IF EXISTS bus_events;
CREATE TABLE bus_events (
record_id int(11) unsigned NOT NULL AUTO_INCREMENT,
- class_name varchar(128) NOT NULL,
- event_json varchar(2048) NOT NULL,
+ class_name varchar(128) NOT NULL,
+ event_json varchar(2048) NOT NULL,
+ user_token char(36),
created_date datetime NOT NULL,
creating_owner char(50) NOT NULL,
processing_owner char(50) DEFAULT NULL,
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index f0d9187..895ea5d 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -8,8 +8,9 @@ getReadyNotifications() ::= <<
record_id
, id
, class_name
- , account_id
, notification_key
+ , user_token
+ , future_user_token
, created_date
, creating_owner
, effective_date
@@ -22,7 +23,6 @@ getReadyNotifications() ::= <<
from notifications
where
effective_date \<= :now
- and queue_name = :queueName
and processing_state != 'PROCESSED'
and processing_state != 'REMOVED'
and (processing_owner IS NULL OR processing_available_date \<= :now)
@@ -36,11 +36,12 @@ getReadyNotifications() ::= <<
getNotificationForAccountAndDate() ::= <<
select
- record_id
+ record_id
, id
, class_name
- , account_id
, notification_key
+ , user_token
+ , future_user_token
, created_date
, creating_owner
, effective_date
@@ -52,7 +53,7 @@ getNotificationForAccountAndDate() ::= <<
, tenant_record_id
from notifications
where
- account_id = :accountId AND effective_date = :effectiveDate
+ account_record_id = :accountRecordId AND effective_date = :effectiveDate
;
>>
@@ -101,8 +102,9 @@ insertNotification() ::= <<
insert into notifications (
id
, class_name
- , account_id
, notification_key
+ , user_token
+ , future_user_token
, created_date
, creating_owner
, effective_date
@@ -115,8 +117,9 @@ insertNotification() ::= <<
) values (
:id
, :className
- , :accountId
, :notificationKey
+ , :userToken
+ , :futureUserToken
, :createdDate
, :creatingOwner
, :effectiveDate
diff --git a/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java b/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
index aed75f5..93ef001 100644
--- a/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
+++ b/util/src/test/java/com/ning/billing/mock/glue/MockNotificationQueueModule.java
@@ -16,16 +16,24 @@
package com.ning.billing.mock.glue;
+import org.skife.config.ConfigurationObjectFactory;
+
import com.ning.billing.util.notificationq.MockNotificationQueueService;
+import com.ning.billing.util.notificationq.NotificationQueueConfig;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.google.inject.AbstractModule;
public class MockNotificationQueueModule extends AbstractModule {
+ protected void configureNotificationQueueConfig() {
+ final NotificationQueueConfig config = new ConfigurationObjectFactory(System.getProperties()).build(NotificationQueueConfig.class);
+ bind(NotificationQueueConfig.class).toInstance(config);
+ }
@Override
protected void configure() {
bind(NotificationQueueService.class).to(MockNotificationQueueService.class).asEagerSingleton();
+ configureNotificationQueueConfig();
}
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 2634e62..a3266d9 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -26,7 +26,6 @@ import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.testng.Assert;
import org.testng.annotations.BeforeSuite;
-import org.testng.annotations.BeforeTest;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@@ -45,7 +44,6 @@ import static org.testng.Assert.assertNotNull;
@Guice(modules = TestNotificationSqlDao.TestNotificationSqlDaoModule.class)
public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
- private static final UUID accountId = UUID.randomUUID();
private static final String hostname = "Yop";
@Inject
@@ -58,31 +56,20 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
dao = dbi.onDemand(NotificationSqlDao.class);
}
- @BeforeTest(groups = "slow")
- public void cleanupDb() {
- dbi.withHandle(new HandleCallback<Void>() {
- @Override
- public Void withHandle(final Handle handle) throws Exception {
- handle.execute("delete from notifications");
- handle.execute("delete from claimed_notifications");
- return null;
- }
- });
- }
-
@Test(groups = "slow")
public void testBasic() throws InterruptedException {
+ final long accountRecordId = 1242L;
final String ownerId = UUID.randomUUID().toString();
final String notificationKey = UUID.randomUUID().toString();
final DateTime effDt = new DateTime();
- final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt,
- null, internalCallContext.getTenantRecordId());
+ final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
+ accountRecordId, internalCallContext.getTenantRecordId());
dao.insertNotification(notif, internalCallContext);
Thread.sleep(1000);
final DateTime now = new DateTime();
- final List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, "testBasic", internalCallContext);
+ final List<Notification> notifications = dao.getReadyNotifications(now.toDate(), hostname, 3, internalCallContext);
assertNotNull(notifications);
assertEquals(notifications.size(), 1);
@@ -117,24 +104,25 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
@Test(groups = "slow")
public void testGetByAccountAndDate() throws InterruptedException {
+ final long accountRecordId = 1242L;
final String notificationKey = UUID.randomUUID().toString();
final DateTime effDt = new DateTime();
- final Notification notif1 = new DefaultNotification("testBasic1", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt,
- null, internalCallContext.getTenantRecordId());
+ final Notification notif1 = new DefaultNotification("testBasic1", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
+ accountRecordId, internalCallContext.getTenantRecordId());
dao.insertNotification(notif1, internalCallContext);
- final Notification notif2 = new DefaultNotification("testBasic2", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt,
- null, internalCallContext.getTenantRecordId());
+ final Notification notif2 = new DefaultNotification("testBasic2", hostname, notificationKey.getClass().getName(), notificationKey, UUID.randomUUID(), UUID.randomUUID(), effDt,
+ accountRecordId, internalCallContext.getTenantRecordId());
dao.insertNotification(notif2, internalCallContext);
- List<Notification> notifications = dao.getNotificationForAccountAndDate(accountId.toString(), effDt.toDate(), internalCallContext);
+ List<Notification> notifications = dao.getNotificationForAccountAndDate(accountRecordId, effDt.toDate(), internalCallContext);
assertEquals(notifications.size(), 2);
for (final Notification cur : notifications) {
Assert.assertEquals(cur.getProcessingState(), PersistentQueueEntryLifecycleState.AVAILABLE);
dao.removeNotification(cur.getId().toString(), internalCallContext);
}
- notifications = dao.getNotificationForAccountAndDate(accountId.toString(), effDt.toDate(), internalCallContext);
+ notifications = dao.getNotificationForAccountAndDate(accountRecordId, effDt.toDate(), internalCallContext);
assertEquals(notifications.size(), 2);
for (final Notification cur : notifications) {
Assert.assertEquals(cur.getProcessingState(), PersistentQueueEntryLifecycleState.REMOVED);
@@ -149,8 +137,9 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
" record_id " +
", id" +
", class_name" +
- ", account_id" +
", notification_key" +
+ ", user_token" +
+ ", future_user_token" +
", created_date" +
", creating_owner" +
", effective_date" +
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index ec917f2..ac267a7 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -24,25 +24,39 @@ import java.util.TreeSet;
import java.util.UUID;
import org.joda.time.DateTime;
-import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
-import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.Hostname;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.entity.dao.EntitySqlDao;
import com.ning.billing.util.entity.dao.EntitySqlDaoWrapperFactory;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
-import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
import com.fasterxml.jackson.databind.ObjectMapper;
-public class MockNotificationQueue extends NotificationQueueBase implements NotificationQueue {
+public class MockNotificationQueue implements NotificationQueue {
+
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private final String hostname;
private final TreeSet<Notification> notifications;
+ private final Clock clock;
+ private final String svcName;
+ private final String queueName;
+ private final NotificationQueueHandler handler;
+ private final MockNotificationQueueService queueService;
+
+ private volatile boolean isStarted;
+
+ public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final MockNotificationQueueService mockNotificationQueueService) {
+
+ this.svcName = svcName;
+ this.queueName = queueName;
+ this.handler = handler;
+ this.clock = clock;
+ this.hostname = Hostname.get();
+ this.queueService = mockNotificationQueueService;
- public MockNotificationQueue(final Clock clock, final String svcName, final String queueName, final NotificationQueueHandler handler, final NotificationConfig config) {
- super(clock, svcName, queueName, handler, config);
notifications = new TreeSet<Notification>(new Comparator<Notification>() {
@Override
public int compare(final Notification o1, final Notification o2) {
@@ -56,11 +70,10 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
}
@Override
- public void recordFutureNotification(final DateTime futureNotificationTime, final UUID accountId,
- final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
+ public void recordFutureNotification(final DateTime futureNotificationTime, final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
final String json = objectMapper.writeValueAsString(notificationKey);
- final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, accountId, futureNotificationTime,
- null, 0L);
+ final Notification notification = new DefaultNotification("MockQueue", hostname, notificationKey.getClass().getName(), json, context.getUserToken(),
+ UUID.randomUUID(), futureNotificationTime, null, 0L);
synchronized (notifications) {
notifications.add(notification);
}
@@ -68,85 +81,116 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
@Override
public void recordFutureNotificationFromTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> transactionalDao, final DateTime futureNotificationTime,
- final UUID accountId, final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
- recordFutureNotification(futureNotificationTime, accountId, notificationKey, context);
+ final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
+ recordFutureNotification(futureNotificationTime, notificationKey, context);
}
- public List<Notification> getPendingEvents() {
- final List<Notification> result = new ArrayList<Notification>();
+ @Override
+ public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
+ final List<Notification> toClearNotifications = new ArrayList<Notification>();
for (final Notification notification : notifications) {
- if (notification.getProcessingState() == PersistentQueueEntryLifecycleState.AVAILABLE) {
- result.add(notification);
+ if (notification.getNotificationKey().equals(key.toString())) {
+ toClearNotifications.add(notification);
}
}
- return result;
+ synchronized (notifications) {
+ if (toClearNotifications.size() > 0) {
+ notifications.removeAll(toClearNotifications);
+ }
+ }
}
@Override
- public int doProcessEvents() {
- final int result;
- final List<Notification> processedNotifications = new ArrayList<Notification>();
- final List<Notification> oldNotifications = new ArrayList<Notification>();
-
- final List<Notification> readyNotifications = new ArrayList<Notification>();
+ public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
+ /*
+ final List<Notification> result = new ArrayList<Notification>();
synchronized (notifications) {
- for (final Notification cur : notifications) {
- if (cur.isAvailableForProcessing(getClock().getUTCNow())) {
- readyNotifications.add(cur);
+ for (Notification cur : notifications) {
+ if (cur.getAccountId().equals(accountId) || cur.getEffectiveDate().compareTo(effectiveDate) == 0) {
+ result.add(cur);
}
}
}
+ return result;
+ */
+ return null;
+ }
- result = readyNotifications.size();
- for (final Notification cur : readyNotifications) {
- final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
- getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
- final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
- "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
- PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
- cur.getNotificationKey(), cur.getAccountId(), cur.getEffectiveDate(),
- cur.getAccountRecordId(), cur.getTenantRecordId());
- oldNotifications.add(cur);
- processedNotifications.add(processedNotification);
- }
+ @Override
+ public void removeNotification(final UUID notificationId, final InternalCallContext context) {
synchronized (notifications) {
- if (oldNotifications.size() > 0) {
- notifications.removeAll(oldNotifications);
- }
-
- if (processedNotifications.size() > 0) {
- notifications.addAll(processedNotifications);
+ for (Notification cur : notifications) {
+ if (cur.getId().equals(notificationId)) {
+ notifications.remove(cur);
+ break;
+ }
}
}
- return result;
}
@Override
- public void removeNotificationsByKey(final NotificationKey key, final InternalCallContext context) {
- final List<Notification> toClearNotifications = new ArrayList<Notification>();
- for (final Notification notification : notifications) {
- if (notification.getNotificationKey().equals(key.toString())) {
- toClearNotifications.add(notification);
- }
- }
+ public String getFullQName() {
+ return NotificationQueueDispatcher.getCompositeName(svcName, queueName);
+ }
- synchronized (notifications) {
- if (toClearNotifications.size() > 0) {
- notifications.removeAll(toClearNotifications);
- }
- }
+ @Override
+ public String getServiceName() {
+ return svcName;
}
@Override
- public List<Notification> getNotificationForAccountAndDate(final UUID accountId, final DateTime effectiveDate, final InternalCallContext context) {
- return null;
+ public String getQueueName() {
+ return queueName;
}
@Override
- public void removeNotification(final UUID notificationId, final InternalCallContext context) {
+ public NotificationQueueHandler getHandler() {
+ return handler;
+ }
+
+ @Override
+ public void startQueue() {
+ isStarted = true;
+ queueService.startQueue();
+ }
+
+ @Override
+ public void stopQueue() {
+ isStarted = false;
+ queueService.stopQueue();
+ }
+
+ @Override
+ public boolean isStarted() {
+ return isStarted;
+ }
+
+
+ public List<Notification> getReadyNotifications() {
+ final int result;
+ final List<Notification> processedNotifications = new ArrayList<Notification>();
+ final List<Notification> oldNotifications = new ArrayList<Notification>();
+
+ final List<Notification> readyNotifications = new ArrayList<Notification>();
+ synchronized (notifications) {
+ for (final Notification cur : notifications) {
+ if (cur.isAvailableForProcessing(clock.getUTCNow())) {
+ readyNotifications.add(cur);
+ }
+ }
+ }
+ return readyNotifications;
}
+
+ public void markProcessedNotifications(final List<Notification> toBeremoved, final List<Notification> toBeAdded) {
+ synchronized (notifications) {
+ notifications.removeAll(toBeremoved);
+ notifications.addAll(toBeAdded);
+ }
+ }
+
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
index ee5ca32..0284747 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueueService.java
@@ -16,22 +16,65 @@
package com.ning.billing.util.notificationq;
-import com.ning.billing.util.config.NotificationConfig;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.config.NotificationConfig;
+import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueueEntryLifecycleState;
import com.google.inject.Inject;
public class MockNotificationQueueService extends NotificationQueueServiceBase {
@Inject
- public MockNotificationQueueService(final Clock clock) {
- super(clock);
+ public MockNotificationQueueService(final Clock clock, final NotificationQueueConfig config) {
+ super(clock, config, null, null);
+ }
+
+
+ @Override
+ protected NotificationQueue createNotificationQueueInternal(final String svcName, final String queueName,
+ final NotificationQueueHandler handler) {
+ return new MockNotificationQueue(clock, svcName, queueName, handler, this);
}
+
@Override
- protected NotificationQueue createNotificationQueueInternal(final String svcName,
- final String queueName, final NotificationQueueHandler handler,
- final NotificationConfig config) {
- return new MockNotificationQueue(clock, svcName, queueName, handler, config);
+ public int doProcessEvents() {
+
+ int result = 0;
+
+ for (NotificationQueue cur : queues.values()) {
+ result += doProcessEventsForQueue((MockNotificationQueue) cur);
+ }
+ return result;
+ }
+
+ private int doProcessEventsForQueue(final MockNotificationQueue queue) {
+
+
+ int result = 0;
+ final List<Notification> processedNotifications = new ArrayList<Notification>();
+ final List<Notification> oldNotifications = new ArrayList<Notification>();
+
+ List<Notification> readyNotifications = queue.getReadyNotifications();
+ for (final Notification cur : readyNotifications) {
+ final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
+ queue.getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getFutureUserToken(), cur.getAccountRecordId(), cur.getTenantRecordId());
+ final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
+ "MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
+ PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
+ cur.getNotificationKey(), cur.getUserToken(), cur.getFutureUserToken(), cur.getEffectiveDate(),
+ cur.getAccountRecordId(), cur.getTenantRecordId());
+ oldNotifications.add(cur);
+ processedNotifications.add(processedNotification);
+ result++;
+ }
+
+ queue.markProcessedNotifications(oldNotifications, processedNotifications);
+ return result;
}
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index 31834b2..d12cccf 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -24,9 +24,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.joda.time.DateTime;
-import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.tweak.HandleCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -36,10 +34,8 @@ import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
-import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
-import com.ning.billing.util.config.NotificationConfig;
import com.ning.billing.util.entity.dao.EntitySqlDao;
import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionWrapper;
import com.ning.billing.util.entity.dao.EntitySqlDaoTransactionalJdbiWrapper;
@@ -65,8 +61,6 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
private final Logger log = LoggerFactory.getLogger(TestNotificationQueue.class);
- private static final UUID accountId = UUID.randomUUID();
-
private EntitySqlDaoTransactionalJdbiWrapper entitySqlDaoTransactionalJdbiWrapper;
@Inject
@@ -75,7 +69,8 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
@Inject
private Clock clock;
- private DummySqlTest dao;
+ @Inject
+ NotificationQueueService queueService;
private int eventsReceived;
@@ -97,28 +92,24 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
public int compareTo(TestNotificationKey arg0) {
return value.compareTo(arg0.value);
}
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(value);
+ return sb.toString();
+ }
}
@BeforeSuite(groups = "slow")
public void setup() throws Exception {
final String testDdl = IOUtils.toString(NotificationSqlDao.class.getResourceAsStream("/com/ning/billing/util/ddl_test.sql"));
helper.initDb(testDdl);
- dao = dbi.onDemand(DummySqlTest.class);
entitySqlDaoTransactionalJdbiWrapper = new EntitySqlDaoTransactionalJdbiWrapper(dbi);
}
@BeforeTest(groups = "slow")
public void beforeTest() {
- dbi.withHandle(new HandleCallback<Void>() {
-
- @Override
- public Void withHandle(final Handle handle) throws Exception {
- handle.execute("delete from notifications");
- handle.execute("delete from claimed_notifications");
- handle.execute("delete from dummy");
- return null;
- }
- });
// Reset time to real value
((ClockMock) clock).resetDeltaFromReality();
eventsReceived = 0;
@@ -135,20 +126,19 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
- final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
- synchronized (expectedNotifications) {
- log.info("Handler received key: " + notificationKey);
-
- expectedNotifications.put(notificationKey, Boolean.TRUE);
- expectedNotifications.notify();
- }
- }
- },
- getNotificationConfig(false, 100, 1, 10000),
- new InternalCallContextFactory(dbi, clock));
+ final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+ "foo",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ synchronized (expectedNotifications) {
+ log.info("Handler received key: " + notificationKey);
+
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ });
queue.startQueue();
@@ -161,50 +151,69 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
expectedNotifications.put(notificationKey, Boolean.FALSE);
// Insert dummy to be processed in 2 sec'
- entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
+ entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>()
+
+ {
@Override
- public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
+ public Void inTransaction(
+ final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws
+ Exception {
entitySqlDaoWrapperFactory.transmogrify(DummySqlTest.class).insertDummy(obj);
- queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, accountId, notificationKey, internalCallContext);
+ queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, notificationKey, internalCallContext);
log.info("Posted key: " + notificationKey);
return null;
}
- });
+ }
+
+ );
// Move time in the future after the notification effectiveDate
- ((ClockMock) clock).setDeltaFromReality(3000);
+ ((ClockMock) clock).
+
+ setDeltaFromReality(3000);
// Notification should have kicked but give it at least a sec' for thread scheduling
- await().atMost(1, MINUTES).until(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return expectedNotifications.get(notificationKey);
- }
- });
+ await()
+
+ .
+
+ atMost(1, MINUTES)
+
+ .
+
+ until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws
+ Exception {
+ return expectedNotifications.get(notificationKey);
+ }
+ }
+
+ );
queue.stopQueue();
Assert.assertTrue(expectedNotifications.get(notificationKey));
}
@Test(groups = "slow")
- public void testManyNotifications() throws InterruptedException {
+ public void testManyNotifications() throws Exception {
final Map<NotificationKey, Boolean> expectedNotifications = new TreeMap<NotificationKey, Boolean>();
- final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
- synchronized (expectedNotifications) {
- expectedNotifications.put(notificationKey, Boolean.TRUE);
- expectedNotifications.notify();
- }
- }
- },
- getNotificationConfig(false, 100, 10, 10000),
- new InternalCallContextFactory(dbi, clock));
-
+ final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+ "many",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ synchronized (expectedNotifications) {
+ log.info("Handler received key: " + notificationKey.toString());
+
+ expectedNotifications.put(notificationKey, Boolean.TRUE);
+ expectedNotifications.notify();
+ }
+ }
+ });
queue.startQueue();
final DateTime now = clock.getUTCNow();
@@ -217,7 +226,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final DummyObject obj = new DummyObject("foo", key);
final int currentIteration = i;
- final NotificationKey notificationKey = new TestNotificationKey(key.toString());
+ final NotificationKey notificationKey = new TestNotificationKey(new Integer(i).toString());
expectedNotifications.put(notificationKey, Boolean.FALSE);
entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@@ -226,7 +235,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
entitySqlDaoWrapperFactory.transmogrify(DummySqlTest.class).insertDummy(obj);
queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, now.plus((currentIteration + 1) * nextReadyTimeIncrementMs),
- accountId, notificationKey, internalCallContext);
+ notificationKey, internalCallContext);
return null;
}
});
@@ -255,7 +264,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
success = true;
break;
}
- //log.debug(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
+ log.info(String.format("BEFORE WAIT : Got %d notifications at time %s (real time %s)", completed.size(), clock.getUTCNow(), new DateTime()));
expectedNotifications.wait(1000);
}
} while (nbTry-- > 0);
@@ -281,40 +290,23 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final Map<NotificationKey, Boolean> expectedNotificationsFred = new TreeMap<NotificationKey, Boolean>();
final Map<NotificationKey, Boolean> expectedNotificationsBarney = new TreeMap<NotificationKey, Boolean>();
- final NotificationQueueService notificationQueueService = new DefaultNotificationQueueService(dbi, clock, new InternalCallContextFactory(dbi, clock));
-
- final NotificationConfig config = new NotificationConfig() {
- @Override
- public boolean isNotificationProcessingOff() {
- return false;
- }
-
+ final NotificationQueue queueFred = queueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
@Override
- public long getSleepTimeMs() {
- return 10;
- }
- };
-
- final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
log.info("Fred received key: " + notificationKey);
expectedNotificationsFred.put(notificationKey, Boolean.TRUE);
eventsReceived++;
}
- },
- config);
+ });
- final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
+ final NotificationQueue queueBarney = queueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
log.info("Barney received key: " + notificationKey);
expectedNotificationsBarney.put(notificationKey, Boolean.TRUE);
eventsReceived++;
}
- },
- config);
-
+ });
queueFred.startQueue();
// We don't start Barney so it can never pick up notifications
@@ -335,9 +327,9 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
entitySqlDaoWrapperFactory.transmogrify(DummySqlTest.class).insertDummy(obj);
- queueFred.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, accountId, notificationKeyFred, internalCallContext);
+ queueFred.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, notificationKeyFred, internalCallContext);
log.info("posted key: " + notificationKeyFred.toString());
- queueBarney.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, accountId, notificationKeyBarney, internalCallContext);
+ queueBarney.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, readyTime, notificationKeyBarney, internalCallContext);
log.info("posted key: " + notificationKeyBarney.toString());
return null;
}
@@ -365,40 +357,24 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
Assert.assertFalse(expectedNotificationsFred.get(notificationKeyBarney));
}
- NotificationConfig getNotificationConfig(final boolean off, final long sleepTime, final int maxReadyEvents, final long claimTimeMs) {
- return new NotificationConfig() {
- @Override
- public boolean isNotificationProcessingOff() {
- return off;
- }
-
- @Override
- public long getSleepTimeMs() {
- return sleepTime;
- }
- };
- }
-
@Test(groups = "slow")
- public void testRemoveNotifications() throws InterruptedException {
+ public void testRemoveNotifications() throws Exception {
final UUID key = UUID.randomUUID();
final NotificationKey notificationKey = new TestNotificationKey(key.toString());
final UUID key2 = UUID.randomUUID();
final NotificationKey notificationKey2 = new TestNotificationKey(key2.toString());
- final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
- new NotificationQueueHandler() {
- @Override
- public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
- if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
- log.info("Received notification with key: " + notificationKey);
- eventsReceived++;
- }
- }
- },
- getNotificationConfig(false, 100, 10, 10000),
- new InternalCallContextFactory(dbi, clock));
-
+ final NotificationQueue queue = queueService.createNotificationQueue("test-svc",
+ "remove",
+ new NotificationQueueHandler() {
+ @Override
+ public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final UUID userToken, final Long accountRecordId, final Long tenantRecordId) {
+ if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
+ log.info("Received notification with key: " + notificationKey);
+ eventsReceived++;
+ }
+ }
+ });
queue.startQueue();
final DateTime start = clock.getUTCNow().plusHours(1);
@@ -409,9 +385,9 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
entitySqlDaoTransactionalJdbiWrapper.execute(new EntitySqlDaoTransactionWrapper<Void>() {
@Override
public Void inTransaction(final EntitySqlDaoWrapperFactory<EntitySqlDao> entitySqlDaoWrapperFactory) throws Exception {
- queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(nextReadyTimeIncrementMs), accountId, notificationKey, internalCallContext);
- queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(2 * nextReadyTimeIncrementMs), accountId, notificationKey, internalCallContext);
- queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(3 * nextReadyTimeIncrementMs), accountId, notificationKey2, internalCallContext);
+ queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(nextReadyTimeIncrementMs), notificationKey, internalCallContext);
+ queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(2 * nextReadyTimeIncrementMs), notificationKey, internalCallContext);
+ queue.recordFutureNotificationFromTransaction(entitySqlDaoWrapperFactory, start.plus(3 * nextReadyTimeIncrementMs), notificationKey2, internalCallContext);
return null;
}
});
@@ -436,16 +412,32 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
queue.stopQueue();
}
+ static NotificationQueueConfig getNotificationConfig(final boolean off, final long sleepTime) {
+ return new NotificationQueueConfig() {
+ @Override
+ public boolean isNotificationProcessingOff() {
+ return off;
+ }
+
+ @Override
+ public long getSleepTimeMs() {
+ return sleepTime;
+ }
+ };
+ }
+
public static class TestNotificationQueueModule extends AbstractModule {
@Override
protected void configure() {
- bind(Clock.class).to(ClockMock.class);
+ bind(Clock.class).to(ClockMock.class).asEagerSingleton();
final IDBI dbi = getDBI();
bind(IDBI.class).toInstance(dbi);
final IDBI otherDbi = getDBI();
bind(IDBI.class).annotatedWith(Names.named("global-lock")).toInstance(otherDbi);
+ bind(NotificationQueueService.class).to(DefaultNotificationQueueService.class).asEagerSingleton();
+ bind(NotificationQueueConfig.class).toInstance(getNotificationConfig(false, 100));
}
}
}