killbill-uncached
Changes
entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java 2(+1 -1)
entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java 6(+3 -3)
invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java 2(+1 -1)
junction/src/main/java/com/ning/billing/junction/plumbing/api/BlockingEntitlementUserApi.java 14(+8 -6)
Details
diff --git a/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountUserApi.java b/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountUserApi.java
index 10c4f7e..e192265 100644
--- a/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountUserApi.java
+++ b/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountUserApi.java
@@ -82,7 +82,7 @@ public class DefaultAccountUserApi implements AccountUserApi {
@Override
public Account getAccountById(final UUID id, final TenantContext context) throws AccountApiException {
- final Account account = accountDao.getById(id, internalCallContextFactory.createInternalTenantContext(id, context));
+ final Account account = accountDao.getById(id, internalCallContextFactory.createInternalTenantContext(context));
if (account == null) {
throw new AccountApiException(ErrorCode.ACCOUNT_DOES_NOT_EXIST_FOR_ID, id);
}
@@ -151,7 +151,7 @@ public class DefaultAccountUserApi implements AccountUserApi {
@Override
public List<AccountEmail> getEmails(final UUID accountId, final TenantContext context) {
- return accountEmailDao.getEmails(accountId, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ return accountEmailDao.getEmails(accountId, internalCallContextFactory.createInternalTenantContext(context));
}
@Override
diff --git a/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
index 3854d11..f0d9563 100644
--- a/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
@@ -103,19 +103,22 @@ public class AuditedAccountDao implements AccountDao {
transactionalDao.create(account, context);
- // Insert history
final Long recordId = accountSqlDao.getRecordId(account.getId().toString(), context);
+ // We need to re-hydrate the context with the account record id
+ final InternalCallContext rehydratedContext = internalCallContextFactory.createInternalCallContext(recordId, context);
+
+ // Insert history
final EntityHistory<Account> history = new EntityHistory<Account>(account.getId(), recordId, account, ChangeType.INSERT);
- accountSqlDao.insertHistoryFromTransaction(history, context);
+ accountSqlDao.insertHistoryFromTransaction(history, rehydratedContext);
// Insert audit
- final Long historyRecordId = accountSqlDao.getHistoryRecordId(recordId, context);
+ final Long historyRecordId = accountSqlDao.getHistoryRecordId(recordId, rehydratedContext);
final EntityAudit audit = new EntityAudit(TableName.ACCOUNT_HISTORY, historyRecordId, ChangeType.INSERT);
- accountSqlDao.insertAuditFromTransaction(audit, context);
+ accountSqlDao.insertAuditFromTransaction(audit, rehydratedContext);
- final AccountCreationEvent creationEvent = new DefaultAccountCreationEvent(account, context.getUserToken());
+ final AccountCreationEvent creationEvent = new DefaultAccountCreationEvent(account, rehydratedContext.getUserToken());
try {
- eventBus.postFromTransaction(creationEvent, transactionalDao, internalCallContextFactory.createInternalCallContext(recordId, context));
+ eventBus.postFromTransaction(creationEvent, transactionalDao, rehydratedContext);
} catch (EventBusException e) {
log.warn("Failed to post account creation event for account " + account.getId(), e);
}
diff --git a/account/src/main/resources/com/ning/billing/account/dao/AccountSqlDao.sql.stg b/account/src/main/resources/com/ning/billing/account/dao/AccountSqlDao.sql.stg
index bcf42ef..45e945f 100644
--- a/account/src/main/resources/com/ning/billing/account/dao/AccountSqlDao.sql.stg
+++ b/account/src/main/resources/com/ning/billing/account/dao/AccountSqlDao.sql.stg
@@ -146,12 +146,13 @@ auditFields(prefix) ::= <<
<prefix>reason_code,
<prefix>comments,
<prefix>user_token,
+ <prefix>account_record_id,
<prefix>tenant_record_id
>>
insertAuditFromTransaction() ::= <<
INSERT INTO audit_log(<auditFields()>)
- VALUES(:tableName, :recordId, :changeType, :createdDate, :userName, :reasonCode, :comment, :userToken, :tenantRecordId);
+ VALUES(:tableName, :recordId, :changeType, :createdDate, :userName, :reasonCode, :comment, :userToken, :accountRecordId, :tenantRecordId);
>>
test() ::= <<
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
index 229e59a..f7c6362 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/timeline/DefaultEntitlementTimelineApi.java
@@ -85,7 +85,7 @@ public class DefaultEntitlementTimelineApi implements EntitlementTimelineApi {
@Override
public BundleTimeline getBundleTimeline(final UUID accountId, final String bundleName, final TenantContext context)
throws EntitlementRepairException {
- final SubscriptionBundle bundle = dao.getSubscriptionBundleFromAccountAndKey(accountId, bundleName, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ final SubscriptionBundle bundle = dao.getSubscriptionBundleFromAccountAndKey(accountId, bundleName, internalCallContextFactory.createInternalTenantContext(context));
return getBundleTimelineInternal(bundle, bundleName + " [accountId= " + accountId.toString() + "]", context);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
index 6d32382..e91adb2 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/api/user/DefaultEntitlementUserApi.java
@@ -91,7 +91,7 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
@Override
public SubscriptionBundle getBundleForAccountAndKey(final UUID accountId, final String bundleKey, final TenantContext context) throws EntitlementUserApiException {
- final SubscriptionBundle result = dao.getSubscriptionBundleFromAccountAndKey(accountId, bundleKey, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ final SubscriptionBundle result = dao.getSubscriptionBundleFromAccountAndKey(accountId, bundleKey, internalCallContextFactory.createInternalTenantContext(context));
if (result == null) {
throw new EntitlementUserApiException(ErrorCode.ENT_GET_INVALID_BUNDLE_KEY, bundleKey);
}
@@ -106,12 +106,12 @@ public class DefaultEntitlementUserApi implements EntitlementUserApi {
@Override
public List<SubscriptionBundle> getBundlesForAccount(final UUID accountId, final TenantContext context) {
- return dao.getSubscriptionBundleForAccount(accountId, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ return dao.getSubscriptionBundleForAccount(accountId, internalCallContextFactory.createInternalTenantContext(context));
}
@Override
public List<Subscription> getSubscriptionsForAccountAndKey(final UUID accountId, final String bundleKey, final TenantContext context) {
- return dao.getSubscriptionsForAccountAndKey(subscriptionFactory, accountId, bundleKey, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ return dao.getSubscriptionsForAccountAndKey(subscriptionFactory, accountId, bundleKey, internalCallContextFactory.createInternalTenantContext(context));
}
@Override
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index e0f5380..c98131e 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -118,14 +118,14 @@ public class Engine implements EventListener, EntitlementService {
try {
final NotificationQueueHandler queueHandler = new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime) {
+ public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
if (!(inputKey instanceof EntitlementNotificationKey)) {
log.error("Entitlement service received an unexpected event type {}" + inputKey.getClass().getName());
return;
}
final EntitlementNotificationKey key = (EntitlementNotificationKey) inputKey;
- final EntitlementEvent event = dao.getEventById(key.getEventId(), internalCallContextFactory.createInternalTenantContext());
+ final EntitlementEvent event = dao.getEventById(key.getEventId(), internalCallContextFactory.createInternalTenantContext(tenantRecordId, accountRecordId));
if (event == null) {
log.warn("Failed to extract event for notification key {}", inputKey);
return;
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/invoice/DefaultInvoicePaymentApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/invoice/DefaultInvoicePaymentApi.java
index 88b2425..4739213 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/invoice/DefaultInvoicePaymentApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/invoice/DefaultInvoicePaymentApi.java
@@ -66,7 +66,7 @@ public class DefaultInvoicePaymentApi implements InvoicePaymentApi {
@Override
public List<Invoice> getAllInvoicesByAccount(final UUID accountId, final TenantContext context) {
- return dao.getAllInvoicesByAccount(accountId, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ return dao.getAllInvoicesByAccount(accountId, internalCallContextFactory.createInternalTenantContext(context));
}
@Override
@@ -119,7 +119,7 @@ public class DefaultInvoicePaymentApi implements InvoicePaymentApi {
@Override
public List<InvoicePayment> getChargebacksByAccountId(final UUID accountId, final TenantContext context) {
- return dao.getChargebacksByAccountId(accountId, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ return dao.getChargebacksByAccountId(accountId, internalCallContextFactory.createInternalTenantContext(context));
}
@Override
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index 4be08a1..bc9a07a 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -85,12 +85,12 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
@Override
public List<Invoice> getInvoicesByAccount(final UUID accountId, final TenantContext context) {
- return dao.getInvoicesByAccount(accountId, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ return dao.getInvoicesByAccount(accountId, internalCallContextFactory.createInternalTenantContext(context));
}
@Override
public List<Invoice> getInvoicesByAccount(final UUID accountId, final LocalDate fromDate, final TenantContext context) {
- return dao.getInvoicesByAccount(accountId, fromDate, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ return dao.getInvoicesByAccount(accountId, fromDate, internalCallContextFactory.createInternalTenantContext(context));
}
@Override
@@ -102,13 +102,13 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
@Override
public BigDecimal getAccountBalance(final UUID accountId, final TenantContext context) {
- final BigDecimal result = dao.getAccountBalance(accountId, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ final BigDecimal result = dao.getAccountBalance(accountId, internalCallContextFactory.createInternalTenantContext(context));
return result == null ? BigDecimal.ZERO : result;
}
@Override
public BigDecimal getAccountCBA(final UUID accountId, final TenantContext context) {
- final BigDecimal result = dao.getAccountCBA(accountId, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ final BigDecimal result = dao.getAccountCBA(accountId, internalCallContextFactory.createInternalTenantContext(context));
return result == null ? BigDecimal.ZERO : result;
}
@@ -124,7 +124,7 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
@Override
public List<Invoice> getUnpaidInvoicesByAccountId(final UUID accountId, final LocalDate upToDate, final TenantContext context) {
- return dao.getUnpaidInvoicesByAccountId(accountId, upToDate, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ return dao.getUnpaidInvoicesByAccountId(accountId, upToDate, internalCallContextFactory.createInternalTenantContext(context));
}
@Override
diff --git a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
index 911b2f1..bd00de0 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
@@ -159,7 +159,7 @@ public class InvoiceDispatcher {
final Account account = accountApi.getAccountById(accountId, internalCallContext);
List<Invoice> invoices = new ArrayList<Invoice>();
if (!billingEvents.isAccountAutoInvoiceOff()) {
- invoices = invoiceDao.getInvoicesByAccount(accountId, internalCallContextFactory.createInternalTenantContext(account.getId(), context)); //no need to fetch, invoicing is off on this account
+ invoices = invoiceDao.getInvoicesByAccount(accountId, internalCallContextFactory.createInternalTenantContext(context)); //no need to fetch, invoicing is off on this account
}
final Currency targetCurrency = account.getCurrency();
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
index 47716df..1cb11d5 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/DefaultNextBillingDateNotifier.java
@@ -82,7 +82,7 @@ public class DefaultNextBillingDateNotifier implements NextBillingDateNotifier {
final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final Long accountRecordId, final Long tenantRecordId) {
try {
if (!(notificationKey instanceof NextBillingDateNotificationKey)) {
log.error("Invoice service received an unexpected event type {}", notificationKey.getClass().getName());
diff --git a/invoice/src/main/java/com/ning/billing/invoice/notification/EmailInvoiceNotifier.java b/invoice/src/main/java/com/ning/billing/invoice/notification/EmailInvoiceNotifier.java
index c42bba9..d86c6e9 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/notification/EmailInvoiceNotifier.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/notification/EmailInvoiceNotifier.java
@@ -65,7 +65,6 @@ public class EmailInvoiceNotifier implements InvoiceNotifier {
@Override
public void notify(final Account account, final Invoice invoice, final TenantContext context) throws InvoiceApiException {
-
final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(context);
final List<String> to = new ArrayList<String>();
to.add(account.getEmail());
diff --git a/invoice/src/test/java/com/ning/billing/invoice/TestInvoiceDispatcher.java b/invoice/src/test/java/com/ning/billing/invoice/TestInvoiceDispatcher.java
index 4ca9713..fd40668 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/TestInvoiceDispatcher.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/TestInvoiceDispatcher.java
@@ -173,7 +173,7 @@ public class TestInvoiceDispatcher extends InvoicingTestBase {
Invoice invoice = dispatcher.processAccount(accountId, target, true, callContext);
Assert.assertNotNull(invoice);
- final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(accountId, callContext);
+ final InternalTenantContext internalTenantContext = internalCallContextFactory.createInternalTenantContext(callContext);
List<Invoice> invoices = invoiceDao.getInvoicesByAccount(accountId, internalTenantContext);
Assert.assertEquals(invoices.size(), 0);
diff --git a/junction/src/main/java/com/ning/billing/junction/plumbing/api/BlockingEntitlementUserApi.java b/junction/src/main/java/com/ning/billing/junction/plumbing/api/BlockingEntitlementUserApi.java
index f6694bd..8c7f0ff 100644
--- a/junction/src/main/java/com/ning/billing/junction/plumbing/api/BlockingEntitlementUserApi.java
+++ b/junction/src/main/java/com/ning/billing/junction/plumbing/api/BlockingEntitlementUserApi.java
@@ -35,6 +35,7 @@ import com.ning.billing.junction.api.BlockingApiException;
import com.ning.billing.junction.block.BlockingChecker;
import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.callcontext.TenantContext;
import com.ning.billing.util.glue.RealImplementation;
import com.ning.billing.util.svcapi.junction.BlockingApi;
@@ -131,8 +132,9 @@ public class BlockingEntitlementUserApi implements EntitlementUserApi {
public SubscriptionBundle createBundleForAccount(final UUID accountId, final String bundleKey, final CallContext context)
throws EntitlementUserApiException {
try {
- checker.checkBlockedChange(accountId, Blockable.Type.ACCOUNT, internalCallContextFactory.createInternalTenantContext(accountId, context));
- return new BlockingSubscriptionBundle(entitlementUserApi.createBundleForAccount(accountId, bundleKey, context), blockingApi, internalCallContextFactory.createInternalTenantContext(context));
+ final InternalTenantContext internalContext = internalCallContextFactory.createInternalTenantContext(context);
+ checker.checkBlockedChange(accountId, Blockable.Type.ACCOUNT, internalContext);
+ return new BlockingSubscriptionBundle(entitlementUserApi.createBundleForAccount(accountId, bundleKey, context), blockingApi, internalContext);
} catch (BlockingApiException e) {
throw new EntitlementUserApiException(e, e.getCode(), e.getMessage());
}
@@ -142,10 +144,10 @@ public class BlockingEntitlementUserApi implements EntitlementUserApi {
public Subscription createSubscription(final UUID bundleId, final PlanPhaseSpecifier spec, final DateTime requestedDate,
final CallContext context) throws EntitlementUserApiException {
try {
- // Retrieve the bundle to get the account id for the internal call context
- final SubscriptionBundle bundle = entitlementUserApi.getBundleFromId(bundleId, context);
- checker.checkBlockedChange(bundleId, Blockable.Type.SUBSCRIPTION_BUNDLE, internalCallContextFactory.createInternalTenantContext(bundle.getAccountId(), context));
- return new BlockingSubscription(entitlementUserApi.createSubscription(bundleId, spec, requestedDate, context), blockingApi, checker, internalCallContextFactory.createInternalTenantContext(context), internalCallContextFactory);
+ final InternalTenantContext internalContext = internalCallContextFactory.createInternalTenantContext(context);
+ checker.checkBlockedChange(bundleId, Blockable.Type.SUBSCRIPTION_BUNDLE, internalContext);
+ return new BlockingSubscription(entitlementUserApi.createSubscription(bundleId, spec, requestedDate, context), blockingApi, checker, internalContext, internalCallContextFactory);
+
} catch (BlockingApiException e) {
throw new EntitlementUserApiException(e, e.getCode(), e.getMessage());
}
diff --git a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
index c1fa16c..dec4388 100644
--- a/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
+++ b/overdue/src/main/java/com/ning/billing/ovedue/notification/DefaultOverdueCheckNotifier.java
@@ -21,7 +21,6 @@ import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.inject.Inject;
import com.ning.billing.config.NotificationConfig;
import com.ning.billing.overdue.OverdueProperties;
import com.ning.billing.overdue.listener.OverdueListener;
@@ -33,6 +32,8 @@ import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotifi
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueHandler;
+import com.google.inject.Inject;
+
public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
private static final Logger log = LoggerFactory.getLogger(DefaultOverdueCheckNotifier.class);
@@ -68,7 +69,7 @@ public class DefaultOverdueCheckNotifier implements OverdueCheckNotifier {
final NotificationQueueHandler notificationQueueHandler = new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDate, final Long accountRecordId, final Long tenantRecordId) {
try {
if (!(notificationKey instanceof OverdueCheckNotificationKey)) {
log.error("Overdue service received Unexpected notificationKey {}", notificationKey.getClass().getName());
diff --git a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
index 6256c43..ce28911 100644
--- a/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
+++ b/payment/src/main/java/com/ning/billing/payment/api/DefaultPaymentApi.java
@@ -83,7 +83,7 @@ public class DefaultPaymentApi implements PaymentApi {
@Override
public List<Payment> getAccountPayments(final UUID accountId, final TenantContext context)
throws PaymentApiException {
- return paymentProcessor.getAccountPayments(accountId, internalCallContextFactory.createInternalTenantContext(accountId, context));
+ return paymentProcessor.getAccountPayments(accountId, internalCallContextFactory.createInternalTenantContext(context));
}
@Override
@@ -129,7 +129,7 @@ public class DefaultPaymentApi implements PaymentApi {
@Override
public List<Refund> getAccountRefunds(final Account account, final TenantContext context)
throws PaymentApiException {
- return refundProcessor.getAccountRefunds(account, internalCallContextFactory.createInternalTenantContext(account.getId(), context));
+ return refundProcessor.getAccountRefunds(account, internalCallContextFactory.createInternalTenantContext(context));
}
@Override
@@ -167,7 +167,7 @@ public class DefaultPaymentApi implements PaymentApi {
@Override
public List<PaymentMethod> getPaymentMethods(final Account account, final boolean withPluginDetail, final TenantContext context)
throws PaymentApiException {
- return methodProcessor.getPaymentMethods(account, withPluginDetail, internalCallContextFactory.createInternalTenantContext(account.getId(), context));
+ return methodProcessor.getPaymentMethods(account, withPluginDetail, internalCallContextFactory.createInternalTenantContext(context));
}
@Override
@@ -179,7 +179,7 @@ public class DefaultPaymentApi implements PaymentApi {
@Override
public PaymentMethod getPaymentMethod(final Account account, final UUID paymentMethod, final boolean withPluginDetail, final TenantContext context)
throws PaymentApiException {
- return methodProcessor.getPaymentMethod(account, paymentMethod, withPluginDetail, internalCallContextFactory.createInternalTenantContext(account.getId(), context));
+ return methodProcessor.getPaymentMethod(account, paymentMethod, withPluginDetail, internalCallContextFactory.createInternalTenantContext(context));
}
@Override
diff --git a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
index de63936..e049c4f 100644
--- a/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
+++ b/payment/src/main/java/com/ning/billing/payment/retry/BaseRetryService.java
@@ -65,13 +65,13 @@ public abstract class BaseRetryService implements RetryService {
getQueueName(),
new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
if (!(notificationKey instanceof PaymentRetryNotificationKey)) {
log.error("Payment service got an unexpected notification type {}", notificationKey.getClass().getName());
return;
}
final PaymentRetryNotificationKey key = (PaymentRetryNotificationKey) notificationKey;
- final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, null);
+ final InternalCallContext callContext = internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, PAYMENT_RETRY_SERVICE, CallOrigin.INTERNAL, UserType.SYSTEM, null);
retry(key.getUuidKey(), callContext);
}
},
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
index 8d8080a..c77521d 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/BusEventEntry.java
@@ -13,6 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
+
package com.ning.billing.util.bus.dao;
import org.joda.time.DateTime;
@@ -28,8 +29,12 @@ public class BusEventEntry implements PersistentQueueEntryLifecycle {
private final PersistentQueueEntryLifecycleState processingState;
private final String busEventClass;
private final String busEventJson;
+ private final Long accountRecordId;
+ private final Long tenantRecordId;
- public BusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable, final PersistentQueueEntryLifecycleState processingState, final String busEventClass, final String busEventJson) {
+ public BusEventEntry(final long id, final String createdOwner, final String owner, final DateTime nextAvailable,
+ final PersistentQueueEntryLifecycleState processingState, final String busEventClass, final String busEventJson,
+ final Long accountRecordId, final Long tenantRecordId) {
this.id = id;
this.createdOwner = createdOwner;
this.owner = owner;
@@ -37,13 +42,15 @@ public class BusEventEntry implements PersistentQueueEntryLifecycle {
this.processingState = processingState;
this.busEventClass = busEventClass;
this.busEventJson = busEventJson;
+ this.accountRecordId = accountRecordId;
+ this.tenantRecordId = tenantRecordId;
}
- public BusEventEntry(final String createdOwner, final String busEventClass, final String busEventJson) {
- this(0, createdOwner, null, null, null, busEventClass, busEventJson);
+ public BusEventEntry(final String createdOwner, final String busEventClass, final String busEventJson,
+ final Long accountRecordId, final Long tenantRecordId) {
+ this(0, createdOwner, null, null, null, busEventClass, busEventJson, accountRecordId, tenantRecordId);
}
-
public long getId() {
return id;
}
@@ -90,8 +97,18 @@ public class BusEventEntry implements PersistentQueueEntryLifecycle {
case PROCESSED:
return false;
default:
- throw new RuntimeException(String.format("Unkwnon IEvent processing state %s", processingState));
+ throw new RuntimeException(String.format("Unknown IEvent processing state %s", processingState));
}
return true;
}
+
+ @Override
+ public Long getAccountRecordId() {
+ return accountRecordId;
+ }
+
+ @Override
+ public Long getTenantRecordId() {
+ return tenantRecordId;
+ }
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
index c80fcba..e9e70de 100644
--- a/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/bus/dao/PersistentBusSqlDao.java
@@ -13,6 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
+
package com.ning.billing.util.bus.dao;
import java.sql.ResultSet;
@@ -42,7 +43,6 @@ import com.ning.billing.util.queue.PersistentQueueEntryLifecycle.PersistentQueue
@ExternalizedSqlViaStringTemplate3()
public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>, CloseMe {
-
@SqlQuery
@Mapper(PersistentBusSqlMapper.class)
public BusEventEntry getNextBusEventEntry(@Bind("max") int max,
@@ -103,8 +103,11 @@ public interface PersistentBusSqlDao extends Transactional<PersistentBusSqlDao>,
final DateTime nextAvailableDate = getDateTime(r, "processing_available_date");
final String processingOwner = r.getString("processing_owner");
final PersistentQueueEntryLifecycleState processingState = PersistentQueueEntryLifecycleState.valueOf(r.getString("processing_state"));
+ final Long accountRecordId = r.getLong("account_record_id");
+ final Long tenantRecordId = r.getLong("tenant_record_id");
- return new BusEventEntry(recordId, createdOwner, processingOwner, nextAvailableDate, processingState, className, eventJson);
+ return new BusEventEntry(recordId, createdOwner, processingOwner, nextAvailableDate, processingState, className,
+ eventJson, accountRecordId, tenantRecordId);
}
}
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
index be503b7..f3e4519 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
@@ -135,9 +135,11 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
return Collections.emptyList();
}
- final boolean claimed = (dao.claimBusEvent(hostname, nextAvailable, input.getId(), now, context) == 1);
+ // We need to re-hydrate the context with the record ids from the BusEventEntry
+ final InternalCallContext rehydratedContext = internalCallContextFactory.createInternalCallContext(input.getTenantRecordId(), input.getAccountRecordId(), context);
+ final boolean claimed = (dao.claimBusEvent(hostname, nextAvailable, input.getId(), now, rehydratedContext) == 1);
if (claimed) {
- dao.insertClaimedHistory(hostname, now, input.getId(), context);
+ dao.insertClaimedHistory(hostname, now, input.getId(), rehydratedContext);
return Collections.singletonList(input);
}
return Collections.emptyList();
@@ -175,7 +177,7 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
private void postFromTransaction(final BusEvent event, final InternalCallContext context, final PersistentBusSqlDao transactional) {
try {
final String json = objectMapper.writeValueAsString(event);
- final BusEventEntry entry = new BusEventEntry(hostname, event.getClass().getName(), json);
+ final BusEventEntry entry = new BusEventEntry(hostname, event.getClass().getName(), json, context.getAccountRecordId(), context.getTenantRecordId());
transactional.insertBusEvent(entry, context);
} catch (Exception e) {
log.error("Failed to post BusEvent " + event, e);
diff --git a/util/src/main/java/com/ning/billing/util/callcontext/InternalCallContextFactory.java b/util/src/main/java/com/ning/billing/util/callcontext/InternalCallContextFactory.java
index eb3dea1..17ea0a5 100644
--- a/util/src/main/java/com/ning/billing/util/callcontext/InternalCallContextFactory.java
+++ b/util/src/main/java/com/ning/billing/util/callcontext/InternalCallContextFactory.java
@@ -26,8 +26,6 @@ import javax.inject.Inject;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.dao.ObjectType;
@@ -35,9 +33,6 @@ import com.ning.billing.util.dao.TableName;
public class InternalCallContextFactory {
- private static final Logger log = LoggerFactory.getLogger(InternalCallContextFactory.class);
-
- public static final UUID INTERNAL_TENANT_ID = new UUID(0L, 0L);
public static final long INTERNAL_TENANT_RECORD_ID = 0L;
private final IDBI dbi;
@@ -51,35 +46,29 @@ public class InternalCallContextFactory {
this.clock = clock;
}
- // Internal use only (notification queue, etc.) - no tenant for now
- public InternalTenantContext createInternalTenantContext() {
- return createInternalTenantContext(INTERNAL_TENANT_RECORD_ID, null);
- }
-
// Used for r/o operations - we don't need the account id in that case
public InternalTenantContext createInternalTenantContext(final TenantContext context) {
return createInternalTenantContext(getTenantRecordId(context), null);
}
- // Used for r/w operations - we need the account id to populate the account_record_id field
- public InternalTenantContext createInternalTenantContext(final UUID accountId, final TenantContext context) {
- return createInternalTenantContext(getTenantRecordId(context), getAccountRecordId(accountId));
- }
-
- // Internal use only (notification queue, etc.) - no tenant for now
- public InternalCallContext createInternalCallContext(final String userName, final CallOrigin callOrigin, final UserType userType, @Nullable final UUID userToken) {
- return createInternalCallContext(INTERNAL_TENANT_RECORD_ID, null, new DefaultCallContext(INTERNAL_TENANT_ID, userName, callOrigin, userType, userToken, clock));
+ // Used by notification queue and persistent bus - accountRecordId is expected to be non null
+ public InternalCallContext createInternalCallContext(final Long tenantRecordId, final Long accountRecordId, final String userName,
+ final CallOrigin callOrigin, final UserType userType, @Nullable final UUID userToken) {
+ return new InternalCallContext(tenantRecordId, accountRecordId, userToken, userName, callOrigin, userType, null, null, clock.getUTCNow(), clock.getUTCNow());
}
public InternalCallContext createInternalCallContext(final UUID objectId, final ObjectType objectType, final String userName,
final CallOrigin callOrigin, final UserType userType, @Nullable final UUID userToken) {
- // TODO retrieve the same way the tenant
- return createInternalCallContext(objectId, objectType, new DefaultCallContext(INTERNAL_TENANT_ID, userName, callOrigin, userType, userToken, clock));
+ final Long tenantRecordId = retrieveTenantRecordIdFromObject(objectId, objectType);
+ final Long accountRecordId = retrieveAccountRecordIdFromObject(objectId, objectType);
+ return createInternalCallContext(tenantRecordId, accountRecordId, userName, callOrigin, userType, userToken);
}
- public InternalCallContext createInternalCallContext(final UUID accountId, final String userName, final CallOrigin callOrigin, final UserType userType, @Nullable final UUID userToken) {
- // TODO retrieve the same way the tenant
- return createInternalCallContext(accountId, new DefaultCallContext(INTERNAL_TENANT_ID, userName, callOrigin, userType, userToken, clock));
+ public InternalCallContext createInternalCallContext(final UUID accountId, final String userName, final CallOrigin callOrigin,
+ final UserType userType, @Nullable final UUID userToken) {
+ final Long tenantRecordId = retrieveTenantRecordIdFromObject(accountId, ObjectType.ACCOUNT);
+ final Long accountRecordId = getAccountRecordId(accountId);
+ return createInternalCallContext(tenantRecordId, accountRecordId, userName, callOrigin, userType, userToken);
}
/**
@@ -91,38 +80,16 @@ public class InternalCallContextFactory {
* @return internal call context from context, with a non null account_record_id (if found)
*/
public InternalCallContext createInternalCallContext(final UUID objectId, final ObjectType objectType, final CallContext context) {
- final Long accountRecordId;
-
- final TableName tableName = TableName.fromObjectType(objectType);
- if (tableName != null) {
- accountRecordId = dbi.withHandle(new HandleCallback<Long>() {
- @Override
- public Long withHandle(final Handle handle) throws Exception {
- final String columnName;
- if (TableName.TAG_DEFINITIONS.equals(tableName) || TableName.TAG_DEFINITION_HISTORY.equals(tableName)) {
- // Not tied to an account
- return null;
- } else if (TableName.ACCOUNT.equals(tableName) || TableName.ACCOUNT_HISTORY.equals(tableName)) {
- // Lookup the record_id directly
- columnName = "record_id";
- } else {
- // The table should have an account_record_id column
- columnName = "account_record_id";
- }
-
- final List<Map<String, Object>> values = handle.select(String.format("select %s from %s where id = ?;", columnName, tableName.getTableName()), objectId.toString());
- if (values.size() == 0) {
- return null;
- } else {
- return (Long) values.get(0).get(columnName);
- }
- }
- });
+ // Don't trust the context to have populated correctly the tenant...
+ final Long tenantRecordId;
+ if (context.getTenantId() == null) {
+ tenantRecordId = retrieveTenantRecordIdFromObject(objectId, objectType);
} else {
- accountRecordId = null;
+ tenantRecordId = getTenantRecordId(context);
}
- return createInternalCallContext(getTenantRecordId(context), accountRecordId, context);
+ final Long accountRecordId = retrieveAccountRecordIdFromObject(objectId, objectType);
+ return createInternalCallContext(tenantRecordId, accountRecordId, context);
}
// Used for update/delete operations - we don't need the account id in that case
@@ -136,7 +103,7 @@ public class InternalCallContextFactory {
return createInternalCallContext(getTenantRecordId(context), getAccountRecordId(accountId), context);
}
- private InternalTenantContext createInternalTenantContext(final Long tenantRecordId, @Nullable final Long accountRecordId) {
+ public InternalTenantContext createInternalTenantContext(final Long tenantRecordId, @Nullable final Long accountRecordId) {
return new InternalTenantContext(tenantRecordId, accountRecordId);
}
@@ -151,6 +118,13 @@ public class InternalCallContextFactory {
context.getCreatedDate(), context.getUpdatedDate());
}
+ // Used when we need to re-hydrate the context with the tenant_record_id and account_record_id (when claiming bus events)
+ public InternalCallContext createInternalCallContext(final Long tenantRecordId, final Long accountRecordId, final InternalCallContext context) {
+ return new InternalCallContext(tenantRecordId, accountRecordId, context.getUserToken(), context.getUserName(),
+ context.getCallOrigin(), context.getUserType(), context.getReasonCode(), context.getComment(),
+ context.getCreatedDate(), context.getUpdatedDate());
+ }
+
private Long getTenantRecordId(final TenantContext context) {
// Default to single default tenant (e.g. single tenant mode)
if (context.getTenantId() == null) {
@@ -163,4 +137,74 @@ public class InternalCallContextFactory {
private Long getAccountRecordId(final UUID accountId) {
return callContextSqlDao.getAccountRecordId(accountId.toString());
}
+
+ private Long retrieveAccountRecordIdFromObject(final UUID objectId, final ObjectType objectType) {
+ final Long accountRecordId;
+
+ final TableName tableName = TableName.fromObjectType(objectType);
+ if (tableName != null) {
+ accountRecordId = dbi.withHandle(new HandleCallback<Long>() {
+ @Override
+ public Long withHandle(final Handle handle) throws Exception {
+ final String columnName;
+ if (TableName.TAG_DEFINITIONS.equals(tableName) || TableName.TAG_DEFINITION_HISTORY.equals(tableName)) {
+ // Not tied to an account
+ return null;
+ } else if (TableName.ACCOUNT.equals(tableName) || TableName.ACCOUNT_HISTORY.equals(tableName)) {
+ // Lookup the record_id directly
+ columnName = "record_id";
+ } else {
+ // The table should have an account_record_id column
+ columnName = "account_record_id";
+ }
+
+ final List<Map<String, Object>> values = handle.select(String.format("select %s from %s where id = ?;", columnName, tableName.getTableName()), objectId.toString());
+ if (values.size() == 0) {
+ return null;
+ } else {
+ return (Long) values.get(0).get(columnName);
+ }
+ }
+ });
+ } else {
+ accountRecordId = null;
+ }
+ return accountRecordId;
+ }
+
+ private Long retrieveTenantRecordIdFromObject(final UUID objectId, final ObjectType objectType) {
+ final Long tenantRecordId;
+
+ final TableName tableName = TableName.fromObjectType(objectType);
+ if (tableName != null) {
+ tenantRecordId = dbi.withHandle(new HandleCallback<Long>() {
+ @Override
+ public Long withHandle(final Handle handle) throws Exception {
+ final String columnName;
+ if (TableName.TENANT.equals(tableName)) {
+ // Lookup the record_id directly
+ columnName = "record_id";
+ } else {
+ // The table should have an tenant_record_id column
+ columnName = "tenant_record_id";
+ }
+
+ final List<Map<String, Object>> values = handle.select(String.format("select %s from %s where id = ?;", columnName, tableName.getTableName()), objectId.toString());
+ if (values.size() == 0) {
+ return null;
+ } else {
+ return (Long) values.get(0).get(columnName);
+ }
+ }
+ });
+ } else {
+ tenantRecordId = null;
+ }
+ return tenantRecordId;
+ }
+
+ // TODO - remove
+ public InternalCallContext createInternalCallContext(final String userName, final CallOrigin callOrigin, final UserType userType, final UUID userToken) {
+ return createInternalCallContext(INTERNAL_TENANT_RECORD_ID, null, userName, callOrigin, userType, userToken);
+ }
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
index 0398a68..811846e 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/dao/NotificationSqlDao.java
@@ -95,6 +95,7 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
@InternalTenantContextBinder final InternalCallContext context);
public static class NotificationSqlDaoBinder extends BinderBase implements Binder<Bind, Notification> {
+
@Override
public void bind(@SuppressWarnings("rawtypes") final SQLStatement stmt, final Bind bind, final Notification evt) {
stmt.bind("id", evt.getId().toString());
@@ -111,8 +112,8 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
}
}
-
public static class NotificationSqlMapper extends MapperBase implements ResultSetMapper<Notification> {
+
@Override
public Notification map(final int index, final ResultSet r, final StatementContext ctx)
throws SQLException {
@@ -128,10 +129,12 @@ public interface NotificationSqlDao extends Transactional<NotificationSqlDao>, C
final DateTime nextAvailableDate = getDateTime(r, "processing_available_date");
final String processingOwner = r.getString("processing_owner");
final PersistentQueueEntryLifecycleState processingState = PersistentQueueEntryLifecycleState.valueOf(r.getString("processing_state"));
+ final Long accountRecordId = r.getLong("account_record_id");
+ final Long tenantRecordId = r.getLong("tenant_record_id");
return new DefaultNotification(ordering, id, createdOwner, processingOwner, queueName, nextAvailableDate,
- processingState, className, notificationKey, accountId, effectiveDate);
-
+ processingState, className, notificationKey, accountId, effectiveDate,
+ accountRecordId, tenantRecordId);
}
}
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
index 70c49c4..848ed9a 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotification.java
@@ -23,6 +23,7 @@ import org.joda.time.DateTime;
import com.ning.billing.util.entity.EntityBase;
public class DefaultNotification extends EntityBase implements Notification {
+
private final long ordering;
private final String owner;
private final String createdOwner;
@@ -33,11 +34,13 @@ public class DefaultNotification extends EntityBase implements Notification {
private final String notificationKey;
private final DateTime effectiveDate;
private final UUID accountId;
+ private final Long accountRecordId;
+ private final Long tenantRecordId;
-
- public DefaultNotification(final long ordering, final UUID id, final String createdOwner, final String owner, final String queueName, final DateTime nextAvailableDate,
- final PersistentQueueEntryLifecycleState lifecycleState,
- final String notificationKeyClass, final String notificationKey, final UUID accountId, final DateTime effectiveDate) {
+ public DefaultNotification(final long ordering, final UUID id, final String createdOwner, final String owner, final String queueName,
+ final DateTime nextAvailableDate, final PersistentQueueEntryLifecycleState lifecycleState,
+ final String notificationKeyClass, final String notificationKey, final UUID accountId, final DateTime effectiveDate,
+ final Long accountRecordId, final Long tenantRecordId) {
super(id);
this.ordering = ordering;
this.owner = owner;
@@ -49,10 +52,15 @@ public class DefaultNotification extends EntityBase implements Notification {
this.notificationKey = notificationKey;
this.accountId = accountId;
this.effectiveDate = effectiveDate;
+ this.accountRecordId = accountRecordId;
+ this.tenantRecordId = tenantRecordId;
}
- public DefaultNotification(final String queueName, final String createdOwner, final String notificationKeyClass, final String notificationKey, final UUID accountId, final DateTime effectiveDate) {
- this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE, notificationKeyClass, notificationKey, accountId, effectiveDate);
+ public DefaultNotification(final String queueName, final String createdOwner, final String notificationKeyClass,
+ final String notificationKey, final UUID accountId, final DateTime effectiveDate,
+ final Long accountRecordId, final Long tenantRecordId) {
+ this(-1L, UUID.randomUUID(), createdOwner, null, queueName, null, PersistentQueueEntryLifecycleState.AVAILABLE,
+ notificationKeyClass, notificationKey, accountId, effectiveDate, accountRecordId, tenantRecordId);
}
@Override
@@ -99,7 +107,6 @@ public class DefaultNotification extends EntityBase implements Notification {
return notificationKeyClass;
}
-
@Override
public String getNotificationKey() {
return notificationKey;
@@ -124,4 +131,14 @@ public class DefaultNotification extends EntityBase implements Notification {
public UUID getAccountId() {
return accountId;
}
+
+ @Override
+ public Long getAccountRecordId() {
+ return accountRecordId;
+ }
+
+ @Override
+ public Long getTenantRecordId() {
+ return tenantRecordId;
+ }
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
index 197e366..3d78f33 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/DefaultNotificationQueue.java
@@ -22,6 +22,8 @@ import java.util.Date;
import java.util.List;
import java.util.UUID;
+import javax.annotation.Nullable;
+
import org.joda.time.DateTime;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
@@ -54,10 +56,9 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
@Override
public int doProcessEvents() {
- final InternalCallContext context = createCallContext();
-
logDebug("ENTER doProcessEvents");
- final List<Notification> notifications = getReadyNotifications(context);
+ // Finding and claiming notifications is not done per tenant (yet?)
+ final List<Notification> notifications = getReadyNotifications(createCallContext(null, null));
if (notifications.size() == 0) {
logDebug("EXIT doProcessEvents");
return 0;
@@ -70,9 +71,9 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
getNbProcessedEvents().incrementAndGet();
logDebug("handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
- getHandler().handleReadyNotification(key, cur.getEffectiveDate());
+ getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
result++;
- clearNotification(cur, context);
+ clearNotification(cur, createCallContext(cur.getTenantRecordId(), cur.getAccountRecordId()));
logDebug("done handling notification %s, key = %s for time %s", cur.getId(), cur.getNotificationKey(), cur.getEffectiveDate());
}
@@ -103,7 +104,8 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
final NotificationSqlDao thisDao,
final InternalCallContext context) throws IOException {
final String json = objectMapper.writeValueAsString(notificationKey);
- final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json, accountId, futureNotificationTime);
+ final Notification notification = new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), json,
+ accountId, futureNotificationTime, context.getAccountRecordId(), context.getTenantRecordId());
thisDao.insertNotification(notification, context);
}
@@ -162,7 +164,7 @@ public class DefaultNotificationQueue extends NotificationQueueBase {
dao.removeNotification(notificationId.toString(), context);
}
- private InternalCallContext createCallContext() {
- return internalCallContextFactory.createInternalCallContext("NotificationQueue", CallOrigin.INTERNAL, UserType.SYSTEM, null);
+ private InternalCallContext createCallContext(@Nullable final Long tenantRecordId, @Nullable final Long accountRecordId) {
+ return internalCallContextFactory.createInternalCallContext(tenantRecordId, accountRecordId, "NotificationQueue", CallOrigin.INTERNAL, UserType.SYSTEM, null);
}
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
index 8e578c9..a790d15 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/Notification.java
@@ -24,6 +24,7 @@ import com.ning.billing.util.entity.Entity;
import com.ning.billing.util.queue.PersistentQueueEntryLifecycle;
public interface Notification extends PersistentQueueEntryLifecycle, Entity {
+
public Long getOrdering();
public String getNotificationKeyClass();
@@ -34,5 +35,6 @@ public interface Notification extends PersistentQueueEntryLifecycle, Entity {
public String getQueueName();
+ // TODO - do we still need it now we have account_record_id?
public UUID getAccountId();
}
diff --git a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
index 3b3d74d..a2d39dd 100644
--- a/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
+++ b/util/src/main/java/com/ning/billing/util/notificationq/NotificationQueueService.java
@@ -20,19 +20,22 @@ import org.joda.time.DateTime;
import com.ning.billing.config.NotificationConfig;
-
public interface NotificationQueueService {
public interface NotificationQueueHandler {
+
/**
* Called for each notification ready
*
* @param notificationKey the notification key associated to that notification entry
+ * @param accountRecordId account record id associated with that notification entry
+ * @param tenantRecordId tenant record id associated with that notification entry
*/
- public void handleReadyNotification(NotificationKey notificationKey, DateTime eventDateTime);
+ public void handleReadyNotification(NotificationKey notificationKey, DateTime eventDateTime, Long accountRecordId, Long tenantRecordId);
}
public static final class NotificationQueueAlreadyExists extends Exception {
+
private static final long serialVersionUID = 1541281L;
public NotificationQueueAlreadyExists(final String msg) {
@@ -41,6 +44,7 @@ public interface NotificationQueueService {
}
public static final class NoSuchNotificationQueue extends Exception {
+
private static final long serialVersionUID = 1541281L;
public NoSuchNotificationQueue(final String msg) {
@@ -84,7 +88,6 @@ public interface NotificationQueueService {
public void deleteNotificationQueue(final String svcName, final String queueName)
throws NoSuchNotificationQueue;
-
/**
* @param services
* @return the number of processed notifications
diff --git a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java
index 6382faa..ea4ab88 100644
--- a/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java
+++ b/util/src/main/java/com/ning/billing/util/queue/PersistentQueueEntryLifecycle.java
@@ -18,7 +18,6 @@ package com.ning.billing.util.queue;
import org.joda.time.DateTime;
-
public interface PersistentQueueEntryLifecycle {
public enum PersistentQueueEntryLifecycleState {
@@ -28,6 +27,10 @@ public interface PersistentQueueEntryLifecycle {
REMOVED
}
+ public Long getTenantRecordId();
+
+ public Long getAccountRecordId();
+
public String getOwner();
public String getCreatedOwner();
@@ -36,6 +39,5 @@ public interface PersistentQueueEntryLifecycle {
public PersistentQueueEntryLifecycleState getProcessingState();
-
public boolean isAvailableForProcessing(DateTime now);
}
diff --git a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
index c582bba..5d67ee3 100644
--- a/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/bus/dao/PersistentBusSqlDao.sql.stg
@@ -13,6 +13,8 @@ getNextBusEventEntry() ::= <<
, processing_owner
, processing_available_date
, processing_state
+ , account_record_id
+ , tenant_record_id
from bus_events
where
processing_state != 'PROCESSED'
diff --git a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
index 5bc3692..8287ea4 100644
--- a/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
+++ b/util/src/main/resources/com/ning/billing/util/notificationq/dao/NotificationSqlDao.sql.stg
@@ -17,6 +17,8 @@ getReadyNotifications() ::= <<
, processing_owner
, processing_available_date
, processing_state
+ , account_record_id
+ , tenant_record_id
from notifications
FORCE INDEX (idx_comp_where)
where
@@ -47,6 +49,8 @@ getNotificationForAccountAndDate() ::= <<
, processing_owner
, processing_available_date
, processing_state
+ , account_record_id
+ , tenant_record_id
from notifications
where
account_id = :accountId AND effective_date = :effectiveDate
diff --git a/util/src/test/java/com/ning/billing/dbi/MysqlTestingHelper.java b/util/src/test/java/com/ning/billing/dbi/MysqlTestingHelper.java
index e61b746..04d3d05 100644
--- a/util/src/test/java/com/ning/billing/dbi/MysqlTestingHelper.java
+++ b/util/src/test/java/com/ning/billing/dbi/MysqlTestingHelper.java
@@ -192,8 +192,10 @@ public class MysqlTestingHelper {
initDb("drop table if exists tenants; create table tenants(record_id int(11) unsigned not null auto_increment, id char(36) not null, primary key(record_id)) engine=innodb;");
// We always want the basic tables when we do account_record_id lookups (e.g. for custom fields, tags or junction)
- initDb("drop table if exists bundles; create table bundles(record_id int(11) unsigned not null auto_increment, id char(36) not null, account_record_id int(11) unsigned not null, primary key(record_id)) engine=innodb;");
- initDb("drop table if exists subscriptions; create table subscriptions(record_id int(11) unsigned not null auto_increment, id char(36) not null, account_record_id int(11) unsigned not null, primary key(record_id)) engine=innodb;");
+ initDb("drop table if exists bundles; create table bundles(record_id int(11) unsigned not null auto_increment, id char(36) not null, " +
+ "account_record_id int(11) unsigned not null, tenant_record_id int(11) unsigned default 0, primary key(record_id)) engine=innodb;");
+ initDb("drop table if exists subscriptions; create table subscriptions(record_id int(11) unsigned not null auto_increment, id char(36) not null, " +
+ "account_record_id int(11) unsigned not null, tenant_record_id int(11) unsigned default 0, primary key(record_id)) engine=innodb;");
for (final String pack : new String[]{"account", "analytics", "entitlement", "util", "payment", "invoice", "junction", "tenant"}) {
final String ddl;
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
index 7026787..54be599 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/dao/TestNotificationSqlDao.java
@@ -80,7 +80,8 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
final String notificationKey = UUID.randomUUID().toString();
final DateTime effDt = new DateTime();
- final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt);
+ final Notification notif = new DefaultNotification("testBasic", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt,
+ null, internalCallContext.getTenantRecordId());
dao.insertNotification(notif, internalCallContext);
Thread.sleep(1000);
@@ -122,10 +123,12 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
public void testGetByAccountAndDate() throws InterruptedException {
final String notificationKey = UUID.randomUUID().toString();
final DateTime effDt = new DateTime();
- final Notification notif1 = new DefaultNotification("testBasic1", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt);
+ final Notification notif1 = new DefaultNotification("testBasic1", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt,
+ null, internalCallContext.getTenantRecordId());
dao.insertNotification(notif1, internalCallContext);
- final Notification notif2 = new DefaultNotification("testBasic2", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt);
+ final Notification notif2 = new DefaultNotification("testBasic2", hostname, notificationKey.getClass().getName(), notificationKey, accountId, effDt,
+ null, internalCallContext.getTenantRecordId());
dao.insertNotification(notif2, internalCallContext);
List<Notification> notifications = dao.getNotificationForAccountAndDate(accountId.toString(), effDt.toDate(), internalCallContext);
@@ -159,6 +162,8 @@ public class TestNotificationSqlDao extends UtilTestSuiteWithEmbeddedDB {
", processing_owner" +
", processing_available_date" +
", processing_state" +
+ ", account_record_id" +
+ ", tenant_record_id" +
" from notifications " +
" where " +
" id = '" + notificationId + "';")
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
index 8e4bd5c..d59100b 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/MockNotificationQueue.java
@@ -57,7 +57,8 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
public void recordFutureNotification(final DateTime futureNotificationTime, final UUID accountId,
final NotificationKey notificationKey, final InternalCallContext context) throws IOException {
final String json = objectMapper.writeValueAsString(notificationKey);
- final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, accountId, futureNotificationTime);
+ final Notification notification = new DefaultNotification("MockQueue", getHostname(), notificationKey.getClass().getName(), json, accountId, futureNotificationTime,
+ null, 0L);
synchronized (notifications) {
notifications.add(notification);
}
@@ -99,11 +100,12 @@ public class MockNotificationQueue extends NotificationQueueBase implements Noti
result = readyNotifications.size();
for (final Notification cur : readyNotifications) {
final NotificationKey key = deserializeEvent(cur.getNotificationKeyClass(), cur.getNotificationKey());
- getHandler().handleReadyNotification(key, cur.getEffectiveDate());
+ getHandler().handleReadyNotification(key, cur.getEffectiveDate(), cur.getAccountRecordId(), cur.getTenantRecordId());
final DefaultNotification processedNotification = new DefaultNotification(-1L, cur.getId(), getHostname(), getHostname(),
"MockQueue", getClock().getUTCNow().plus(CLAIM_TIME_MS),
PersistentQueueEntryLifecycleState.PROCESSED, cur.getNotificationKeyClass(),
- cur.getNotificationKey(), cur.getAccountId(), cur.getEffectiveDate());
+ cur.getNotificationKey(), cur.getAccountId(), cur.getEffectiveDate(),
+ cur.getAccountRecordId(), cur.getTenantRecordId());
oldNotifications.add(cur);
processedNotifications.add(processedNotification);
}
diff --git a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
index f45d417..af23273 100644
--- a/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
+++ b/util/src/test/java/com/ning/billing/util/notificationq/TestNotificationQueue.java
@@ -136,7 +136,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "foo",
new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
synchronized (expectedNotifications) {
log.info("Handler received key: " + notificationKey);
@@ -194,7 +194,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
synchronized (expectedNotifications) {
expectedNotifications.put(notificationKey, Boolean.TRUE);
expectedNotifications.notify();
@@ -297,7 +297,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final NotificationQueue queueFred = notificationQueueService.createNotificationQueue("UtilTest", "Fred", new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
log.info("Fred received key: " + notificationKey);
expectedNotificationsFred.put(notificationKey, Boolean.TRUE);
eventsReceived++;
@@ -307,7 +307,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final NotificationQueue queueBarney = notificationQueueService.createNotificationQueue("UtilTest", "Barney", new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime) {
+ public void handleReadyNotification(final NotificationKey notificationKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
log.info("Barney received key: " + notificationKey);
expectedNotificationsBarney.put(notificationKey, Boolean.TRUE);
eventsReceived++;
@@ -391,7 +391,7 @@ public class TestNotificationQueue extends UtilTestSuiteWithEmbeddedDB {
final DefaultNotificationQueue queue = new DefaultNotificationQueue(dbi, clock, "test-svc", "many",
new NotificationQueueHandler() {
@Override
- public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime) {
+ public void handleReadyNotification(final NotificationKey inputKey, final DateTime eventDateTime, final Long accountRecordId, final Long tenantRecordId) {
if (inputKey.equals(notificationKey) || inputKey.equals(notificationKey2)) { //ignore stray events from other tests
log.info("Received notification with key: " + notificationKey);
eventsReceived++;