Details
diff --git a/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
index 908c110..47418dd 100644
--- a/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
@@ -112,7 +112,7 @@ public class AuditedAccountDao implements AccountDao {
final AccountCreationEvent creationEvent = new DefaultAccountCreationEvent(account, context.getUserToken());
try {
- eventBus.postFromTransaction(creationEvent, transactionalDao);
+ eventBus.postFromTransaction(creationEvent, transactionalDao, context);
} catch (EventBusException e) {
log.warn("Failed to post account creation event for account " + account.getId(), e);
}
@@ -158,7 +158,7 @@ public class AuditedAccountDao implements AccountDao {
final AccountChangeEvent changeEvent = new DefaultAccountChangeEvent(accountId, context.getUserToken(), currentAccount, account);
if (changeEvent.hasChanges()) {
try {
- eventBus.postFromTransaction(changeEvent, transactional);
+ eventBus.postFromTransaction(changeEvent, transactional, context);
} catch (EventBusException e) {
log.warn("Failed to post account change event for account " + accountId, e);
}
@@ -207,7 +207,7 @@ public class AuditedAccountDao implements AccountDao {
final AccountChangeEvent changeEvent = new DefaultAccountChangeEvent(accountId, context.getUserToken(), currentAccount, account);
if (changeEvent.hasChanges()) {
try {
- eventBus.postFromTransaction(changeEvent, transactional);
+ eventBus.postFromTransaction(changeEvent, transactional, context);
} catch (EventBusException e) {
log.warn("Failed to post account change event for account " + accountId, e);
}
diff --git a/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java b/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java
index 5824566..ed5966b 100644
--- a/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java
+++ b/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java
@@ -54,7 +54,7 @@ public class MockAccountDao implements AccountDao {
accounts.put(account.getId(), account);
try {
- eventBus.post(new DefaultAccountCreationEvent(account, null));
+ eventBus.post(new DefaultAccountCreationEvent(account, null), context);
} catch (EventBusException ex) {
throw new RuntimeException(ex);
}
@@ -97,7 +97,7 @@ public class MockAccountDao implements AccountDao {
final AccountChangeEvent changeEvent = new DefaultAccountChangeEvent(account.getId(), null, currentAccount, account);
if (changeEvent.hasChanges()) {
try {
- eventBus.post(changeEvent);
+ eventBus.post(changeEvent, context);
} catch (EventBusException ex) {
throw new RuntimeException(ex);
}
diff --git a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
index 8429aad..ad14c3e 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
@@ -265,26 +265,26 @@ public class TestAnalyticsService extends AnalyticsTestSuiteWithEmbeddedDB {
Assert.assertNull(accountSqlDao.getAccountByKey(ACCOUNT_KEY, internalCallContext));
// Send events and wait for the async part...
- bus.post(accountCreationNotification);
+ bus.post(accountCreationNotification, internalCallContext);
Thread.sleep(5000);
Assert.assertNotNull(accountSqlDao.getAccountByKey(ACCOUNT_KEY, internalCallContext));
// Test subscriptions integration - this is just to exercise the code. It's hard to test the actual subscriptions
// as we would need to mock a bunch of APIs (see integration tests in Beatrix instead)
- bus.post(transition);
+ bus.post(transition, internalCallContext);
Thread.sleep(5000);
// Test invoice integration - the account creation notification has triggered a BAC update
Assert.assertEquals(accountSqlDao.getAccountByKey(ACCOUNT_KEY, internalCallContext).getTotalInvoiceBalance().compareTo(INVOICE_AMOUNT), 1);
// Post the same invoice event again - the invoice balance shouldn't change
- bus.post(invoiceCreationNotification);
+ bus.post(invoiceCreationNotification, internalCallContext);
Thread.sleep(5000);
Assert.assertEquals(accountSqlDao.getAccountByKey(ACCOUNT_KEY, internalCallContext).getTotalInvoiceBalance().compareTo(INVOICE_AMOUNT), 1);
// Test payment integration - the fields have already been populated, just make sure the code is exercised
// It's hard to test the actual payments fields though in bac, since we should mock the plugin
- bus.post(paymentInfoNotification);
+ bus.post(paymentInfoNotification, internalCallContext);
Thread.sleep(5000);
// Test the shutdown sequence
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 0fe716a..e0f5380 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -199,7 +199,8 @@ public class Engine implements EventListener, EntitlementService {
}
try {
- eventBus.post(subscription.getTransitionFromEvent(event, theRealSeqId));
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(subscription.getBundleId(), ObjectType.BUNDLE, context);
+ eventBus.post(subscription.getTransitionFromEvent(event, theRealSeqId), internalCallContext);
} catch (EventBusException e) {
log.warn("Failed to post entitlement event " + event, e);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
index 7b84b10..9cc874d 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
@@ -237,7 +237,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
context);
// Notify the Bus of the requested change
- notifyBusOfRequestedChange(transactional, subscription, nextPhase);
+ notifyBusOfRequestedChange(transactional, subscription, nextPhase, context);
return null;
}
@@ -312,7 +312,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
// Notify the Bus of the latest requested change, if needed
if (initialEvents.size() > 0) {
- notifyBusOfRequestedChange(eventsDaoFromSameTransaction, subscription, initialEvents.get(initialEvents.size() - 1));
+ notifyBusOfRequestedChange(eventsDaoFromSameTransaction, subscription, initialEvents.get(initialEvents.size() - 1), context);
}
return null;
@@ -341,7 +341,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
transactional.insertAuditFromTransaction(audits, context);
// Notify the Bus of the latest requested change
- notifyBusOfRequestedChange(transactional, subscription, recreateEvents.get(recreateEvents.size() - 1));
+ notifyBusOfRequestedChange(transactional, subscription, recreateEvents.get(recreateEvents.size() - 1), context);
return null;
}
@@ -399,7 +399,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
transactional.insertAuditFromTransaction(eventAudits, context);
// Notify the Bus of the latest requested change
- notifyBusOfRequestedChange(transactional, subscription, uncancelEvents.get(uncancelEvents.size() - 1));
+ notifyBusOfRequestedChange(transactional, subscription, uncancelEvents.get(uncancelEvents.size() - 1), context);
}
return null;
@@ -431,7 +431,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
// Notify the Bus of the latest requested change
final EntitlementEvent finalEvent = changeEvents.get(changeEvents.size() - 1);
- notifyBusOfRequestedChange(transactional, subscription, finalEvent);
+ notifyBusOfRequestedChange(transactional, subscription, finalEvent, context);
return null;
}
@@ -454,7 +454,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
context);
// Notify the Bus of the requested change
- notifyBusOfRequestedChange(transactional, subscription, cancelEvent);
+ notifyBusOfRequestedChange(transactional, subscription, cancelEvent, context);
}
private void cancelNextPhaseEventFromTransaction(final UUID subscriptionId, final EntitlementEventSqlDao dao, final InternalCallContext context) {
@@ -644,7 +644,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
try {
// Note: we don't send a requested change event here, but a repair event
final RepairEntitlementEvent busEvent = new DefaultRepairEntitlementEvent(context.getUserToken(), accountId, bundleId, clock.getUTCNow());
- eventBus.postFromTransaction(busEvent, transactional);
+ eventBus.postFromTransaction(busEvent, transactional, context);
} catch (EventBusException e) {
log.warn("Failed to post repair entitlement event for bundle " + bundleId, e);
}
@@ -697,9 +697,10 @@ public class AuditedEntitlementDao implements EntitlementDao {
}
}
- private void notifyBusOfRequestedChange(final EntitlementEventSqlDao transactional, final SubscriptionData subscription, final EntitlementEvent nextEvent) {
+ private void notifyBusOfRequestedChange(final EntitlementEventSqlDao transactional, final SubscriptionData subscription,
+ final EntitlementEvent nextEvent, final InternalCallContext context) {
try {
- eventBus.postFromTransaction(new DefaultRequestedSubscriptionEvent(subscription, nextEvent), transactional);
+ eventBus.postFromTransaction(new DefaultRequestedSubscriptionEvent(subscription, nextEvent), transactional, context);
} catch (EventBusException e) {
log.warn("Failed to post requested change event for subscription " + subscription.getId(), e);
}
@@ -738,7 +739,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
// Notify the Bus of the latest requested change
final EntitlementEvent finalEvent = curSubscription.getInitialEvents().get(curSubscription.getInitialEvents().size() - 1);
- notifyBusOfRequestedChange(transactional, subData, finalEvent);
+ notifyBusOfRequestedChange(transactional, subData, finalEvent, context);
}
transBundleDao.insertBundle(bundleData, context);
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index bba2528..3275efc 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -46,12 +46,13 @@ import com.ning.billing.invoice.model.ExternalChargeInvoiceItem;
import com.ning.billing.invoice.template.HtmlInvoiceGenerator;
import com.ning.billing.util.api.TagApiException;
import com.ning.billing.util.api.TagUserApi;
-import com.ning.billing.util.svcsapi.bus.Bus;
-import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.callcontext.TenantContext;
import com.ning.billing.util.dao.ObjectType;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.ning.billing.util.tag.ControlTagType;
import com.ning.billing.util.tag.Tag;
@@ -153,7 +154,7 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
// Retrieve the invoice for the account id
final Invoice invoice = dao.getById(invoiceId, internalCallContextFactory.createInternalTenantContext(context));
// This is for overdue
- notifyBusOfInvoiceAdjustment(invoiceId, invoice.getAccountId(), context.getUserToken());
+ notifyBusOfInvoiceAdjustment(invoiceId, invoice.getAccountId(), context.getUserToken(), internalCallContextFactory.createInternalCallContext(invoice.getAccountId(), context));
}
@Override
@@ -164,7 +165,7 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
// Retrieve the invoice for the account id
final Invoice invoice = dao.getById(invoiceId, internalCallContextFactory.createInternalTenantContext(context));
// This is for overdue
- notifyBusOfInvoiceAdjustment(invoiceId, invoice.getAccountId(), context.getUserToken());
+ notifyBusOfInvoiceAdjustment(invoiceId, invoice.getAccountId(), context.getUserToken(), internalCallContextFactory.createInternalCallContext(invoice.getAccountId(), context));
}
@Override
@@ -279,9 +280,9 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
return generator.generateInvoice(account, invoice, manualPay);
}
- private void notifyBusOfInvoiceAdjustment(final UUID invoiceId, final UUID accountId, final UUID userToken) {
+ private void notifyBusOfInvoiceAdjustment(final UUID invoiceId, final UUID accountId, final UUID userToken, final InternalCallContext context) {
try {
- eventBus.post(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken));
+ eventBus.post(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken), context);
} catch (EventBusException e) {
log.warn("Failed to post adjustment event for invoice " + invoiceId, e);
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/AuditedInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/AuditedInvoiceDao.java
index 40cd8c9..a30b40c 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/AuditedInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/AuditedInvoiceDao.java
@@ -402,7 +402,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
}
// Notify the bus since the balance of the invoice changed
- notifyBusOfInvoiceAdjustment(transactional, invoice.getId(), invoice.getAccountId(), context.getUserToken());
+ notifyBusOfInvoiceAdjustment(transactional, invoice.getId(), invoice.getAccountId(), context.getUserToken(), context);
// Save audit logs
transactional.insertAuditFromTransaction(audits, context);
@@ -543,7 +543,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
// Notify the bus since the balance of the invoice changed
final UUID accountId = transactional.getAccountIdFromInvoicePaymentId(chargeBack.getId().toString(), context);
- notifyBusOfInvoiceAdjustment(transactional, payment.getInvoiceId(), accountId, context.getUserToken());
+ notifyBusOfInvoiceAdjustment(transactional, payment.getInvoiceId(), accountId, context.getUserToken(), context);
return chargeBack;
}
@@ -637,7 +637,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
}
// Notify the bus since the balance of the invoice changed
- notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken());
+ notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken(), context);
// Save audit logs
transactional.insertAuditFromTransaction(audits, context);
@@ -676,7 +676,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
insertItemAndAddCBAIfNeeded(transactional, credit, audits, context);
// Notify the bus since the balance of the invoice changed
- notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken());
+ notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken(), context);
// Save audit logs
transactional.insertAuditFromTransaction(audits, context);
@@ -697,7 +697,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
final InvoiceItem invoiceItemAdjustment = createAdjustmentItem(transactional, invoiceId, invoiceItemId, positiveAdjAmount,
currency, effectiveDate, context);
insertItemAndAddCBAIfNeeded(transactional, invoiceItemAdjustment, audits, context);
- notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken());
+ notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken(), context);
// Save audit logs
transactional.insertAuditFromTransaction(audits, context);
@@ -978,9 +978,10 @@ public class AuditedInvoiceDao implements InvoiceDao {
}
}
- private void notifyBusOfInvoiceAdjustment(final Transmogrifier transactional, final UUID invoiceId, final UUID accountId, final UUID userToken) {
+ private void notifyBusOfInvoiceAdjustment(final Transmogrifier transactional, final UUID invoiceId, final UUID accountId,
+ final UUID userToken, final InternalCallContext context) {
try {
- eventBus.postFromTransaction(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken), transactional);
+ eventBus.postFromTransaction(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken), transactional, context);
} catch (EventBusException e) {
log.warn("Failed to post adjustment event for invoice " + invoiceId, e);
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
index 5ddce5d..c2cc396 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
@@ -167,7 +167,7 @@ public class InvoiceDispatcher {
log.info("Generated null invoice.");
if (!dryRun) {
final BusEvent event = new DefaultNullInvoiceEvent(accountId, clock.getUTCToday(), context.getUserToken());
- postEvent(event, accountId);
+ postEvent(event, accountId, internalCallContext);
}
} else {
log.info("Generated invoice {} with {} items.", invoice.getId().toString(), invoice.getNumberOfItems());
@@ -191,7 +191,7 @@ public class InvoiceDispatcher {
context.getUserToken());
if (isRealInvoiceWithItems) {
- postEvent(event, accountId);
+ postEvent(event, accountId, internalCallContext);
}
}
}
@@ -226,9 +226,9 @@ public class InvoiceDispatcher {
}
}
- private void postEvent(final BusEvent event, final UUID accountId) {
+ private void postEvent(final BusEvent event, final UUID accountId, final InternalCallContext context) {
try {
- eventBus.post(event);
+ eventBus.post(event, context);
} catch (EventBusException e) {
log.error(String.format("Failed to post event %s for account %s", event.getBusEventType(), accountId), e);
}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
index 152a762..c689a21 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
@@ -59,7 +59,7 @@ public class MockInvoiceDao implements InvoiceDao {
try {
eventBus.post(new DefaultInvoiceCreationEvent(invoice.getId(), invoice.getAccountId(),
invoice.getBalance(), invoice.getCurrency(),
- null));
+ null), context);
} catch (Bus.EventBusException ex) {
throw new RuntimeException(ex);
}
diff --git a/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java b/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
index d89032d..2096662 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
@@ -134,7 +134,7 @@ public class OverdueStateApplicator<T extends Blockable> {
}
try {
- bus.post(createOverdueEvent(overdueable, previousOverdueStateName, nextOverdueState.getName()));
+ bus.post(createOverdueEvent(overdueable, previousOverdueStateName, nextOverdueState.getName()), context);
} catch (Exception e) {
log.error("Error posting overdue change event to bus", e);
}
diff --git a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
index e3f141a..5a953e1 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
@@ -214,7 +214,7 @@ public class PaymentProcessor extends ProcessorBase {
// This means that events will be posted for null and zero dollar invoices (e.g. trials).
final PaymentErrorEvent event = new DefaultPaymentErrorEvent(account.getId(), invoiceId, null,
ErrorCode.PAYMENT_NO_DEFAULT_PAYMENT_METHOD.toString(), context.getUserToken());
- postPaymentEvent(event, account.getId());
+ postPaymentEvent(event, account.getId(), context);
throw e;
}
@@ -484,7 +484,7 @@ public class PaymentProcessor extends ProcessorBase {
throw new PaymentApiException(ErrorCode.INVOICE_NOT_FOUND, invoice.getId(), e.toString());
} finally {
if (event != null) {
- postPaymentEvent(event, account.getId());
+ postPaymentEvent(event, account.getId(), context);
}
}
return new DefaultPayment(payment, allAttempts, Collections.<RefundModelDao>emptyList());
diff --git a/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java b/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
index 65ba0e3..30c72b0 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
@@ -125,12 +125,12 @@ public abstract class ProcessorBase {
return getPaymentProviderPlugin(paymentMethodId, context);
}
- protected void postPaymentEvent(final BusEvent ev, final UUID accountId) {
+ protected void postPaymentEvent(final BusEvent ev, final UUID accountId, final InternalCallContext context) {
if (ev == null) {
return;
}
try {
- eventBus.post(ev);
+ eventBus.post(ev, context);
} catch (EventBusException e) {
log.error("Failed to post Payment event event for account {} ", accountId, e);
}
diff --git a/payment/src/test/java/com/ning/billing/payment/TestHelper.java b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
index b37516d..60d9ad5 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestHelper.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
@@ -34,6 +34,7 @@ import com.ning.billing.payment.api.PaymentApi;
import com.ning.billing.payment.api.PaymentMethodPlugin;
import com.ning.billing.payment.glue.PaymentTestModuleWithMocks;
import com.ning.billing.payment.provider.DefaultNoOpPaymentMethodPlugin;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.ning.billing.util.callcontext.CallContext;
@@ -52,15 +53,17 @@ public class TestHelper {
private final CallContext context;
private final Bus eventBus;
private final Clock clock;
+ private final InternalCallContextFactory internalCallContextFactory;
@Inject
public TestHelper(final CallContextFactory factory, final AccountUserApi accountUserApi, final InvoicePaymentApi invoicePaymentApi,
- final PaymentApi paymentApi, final Bus eventBus, final Clock clock) {
+ final PaymentApi paymentApi, final Bus eventBus, final Clock clock, final InternalCallContextFactory internalCallContextFactory) {
this.eventBus = eventBus;
this.accountUserApi = accountUserApi;
this.invoicePaymentApi = invoicePaymentApi;
this.paymentApi = paymentApi;
this.clock = clock;
+ this.internalCallContextFactory = internalCallContextFactory;
context = factory.createCallContext(null, "Princess Buttercup", CallOrigin.TEST, UserType.TEST);
}
@@ -94,7 +97,7 @@ public class TestHelper {
invoice.getInvoiceDate(),
context.getUserToken());
- eventBus.post(event);
+ eventBus.post(event, internalCallContextFactory.createInternalCallContext(account.getId(), context));
return invoice;
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
index 440296c..e7d969d 100644
--- a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
@@ -25,6 +25,7 @@ import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.svcsapi.bus.Bus;
import com.google.common.eventbus.AsyncEventBus;
@@ -89,13 +90,13 @@ public class InMemoryBus implements Bus {
}
@Override
- public void post(final BusEvent event) throws EventBusException {
+ public void post(final BusEvent event, final InternalCallContext context) throws EventBusException {
checkInitialized("post");
delegate.post(event);
}
@Override
- public void postFromTransaction(final BusEvent event, final Transmogrifier dao) throws EventBusException {
+ public void postFromTransaction(final BusEvent event, final Transmogrifier dao, final InternalCallContext context) throws EventBusException {
checkInitialized("postFromTransaction");
delegate.post(event);
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
index fcb2d07..be503b7 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
@@ -22,8 +22,6 @@ import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import javax.annotation.Nullable;
-
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
@@ -109,7 +107,8 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
@Override
public int doProcessEvents() {
- final InternalCallContext context = createCallContext(null);
+ // Note: retrieving and clearing bus events is not done per tenant (yet?)
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext("PersistentBus", CallOrigin.INTERNAL, UserType.SYSTEM, null);
final List<BusEventEntry> events = getNextBusEvent(context);
if (events.size() == 0) {
@@ -155,36 +154,31 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
}
@Override
- public void post(final BusEvent event) throws EventBusException {
+ public void post(final BusEvent event, final InternalCallContext context) throws EventBusException {
dao.inTransaction(new Transaction<Void, PersistentBusSqlDao>() {
@Override
public Void inTransaction(final PersistentBusSqlDao transactional,
final TransactionStatus status) throws Exception {
- postFromTransaction(event, transactional);
+ postFromTransaction(event, context, transactional);
return null;
}
});
}
@Override
- public void postFromTransaction(final BusEvent event, final Transmogrifier transmogrifier)
+ public void postFromTransaction(final BusEvent event, final Transmogrifier transmogrifier, final InternalCallContext context)
throws EventBusException {
final PersistentBusSqlDao transactional = transmogrifier.become(PersistentBusSqlDao.class);
- postFromTransaction(event, transactional);
+ postFromTransaction(event, context, transactional);
}
- private void postFromTransaction(final BusEvent event, final PersistentBusSqlDao transactional) {
+ private void postFromTransaction(final BusEvent event, final InternalCallContext context, final PersistentBusSqlDao transactional) {
try {
final String json = objectMapper.writeValueAsString(event);
final BusEventEntry entry = new BusEventEntry(hostname, event.getClass().getName(), json);
- transactional.insertBusEvent(entry, createCallContext(event));
+ transactional.insertBusEvent(entry, context);
} catch (Exception e) {
log.error("Failed to post BusEvent " + event, e);
}
}
-
- private InternalCallContext createCallContext(@Nullable final BusEvent event) {
- return internalCallContextFactory.createInternalCallContext("PersistentBus", CallOrigin.INTERNAL, UserType.SYSTEM,
- event == null ? null : event.getUserToken());
- }
}
diff --git a/util/src/main/java/com/ning/billing/util/svcsapi/bus/Bus.java b/util/src/main/java/com/ning/billing/util/svcsapi/bus/Bus.java
index 8212d49..adf3d01 100644
--- a/util/src/main/java/com/ning/billing/util/svcsapi/bus/Bus.java
+++ b/util/src/main/java/com/ning/billing/util/svcsapi/bus/Bus.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.svcsapi.bus;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import com.ning.billing.util.bus.BusEvent;
+import com.ning.billing.util.callcontext.InternalCallContext;
import com.google.common.eventbus.Subscribe;
@@ -32,7 +33,6 @@ import com.google.common.eventbus.Subscribe;
*/
public interface Bus {
-
public class EventBusException extends Exception {
private static final long serialVersionUID = 12355236L;
@@ -64,38 +64,36 @@ public interface Bus {
* Registers all handler methods on {@code object} to receive events.
* Handler methods need to be Annotated with {@link Subscribe}
*
- * @param handlerInstance
+ * @param handlerInstance handler to register
* @throws EventBusException if bus not been started yet
*/
public void register(Object handlerInstance) throws EventBusException;
-
/**
* Unregister the handler for a particular type of event
*
- * @param handlerInstance
+ * @param handlerInstance handler to unregister
* @throws EventBusException
*/
public void unregister(Object handlerInstance) throws EventBusException;
-
/**
* Post an event asynchronously
*
- * @param event to be posted
+ * @param event to be posted
+ * @param context call context. account record id and tenant id are expected to be populated
* @throws EventBusException if bus not been started yet
*/
- public void post(BusEvent event) throws EventBusException;
+ public void post(BusEvent event, InternalCallContext context) throws EventBusException;
/**
* Post an event from within a transaction.
* Guarantees that the event is persisted on disk from within the same transaction
*
- * @param event to be posted
- * @param dao a valid DAO object obtained through the DBI.onDemand() API.
+ * @param event to be posted
+ * @param dao a valid DAO object obtained through the DBI.onDemand() API.
+ * @param context call context. account record id and tenant id are expected to be populated
* @throws EventBusException if bus not been started yet
*/
- public void postFromTransaction(BusEvent event, Transmogrifier dao) throws EventBusException;
-
-
+ public void postFromTransaction(BusEvent event, Transmogrifier dao, InternalCallContext context) throws EventBusException;
}
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
index c6d63a9..26b13ed 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
@@ -130,7 +130,7 @@ public class AuditedTagDao extends AuditedCollectionDaoBase<Tag, Tag> implements
tagEvent = tagEventBuilder.newUserTagCreationEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
}
try {
- bus.postFromTransaction(tagEvent, AuditedTagDao.this.tagSqlDao);
+ bus.postFromTransaction(tagEvent, tagSqlDao, context);
} catch (Bus.EventBusException e) {
log.warn("Failed to post tag creation event for tag " + tag.getId().toString(), e);
}
@@ -180,7 +180,7 @@ public class AuditedTagDao extends AuditedCollectionDaoBase<Tag, Tag> implements
tagEvent = tagEventBuilder.newUserTagDeletionEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
}
try {
- bus.postFromTransaction(tagEvent, tagSqlDao);
+ bus.postFromTransaction(tagEvent, tagSqlDao, context);
} catch (Bus.EventBusException e) {
log.warn("Failed to post tag deletion event for tag " + tag.getId().toString(), e);
}
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
index 39538ce..5fe367e 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
@@ -144,7 +144,7 @@ public class DefaultTagDefinitionDao implements TagDefinitionDao {
tagDefinitionEvent = tagEventBuilder.newUserTagDefinitionCreationEvent(tagDefinition.getId(), tagDefinition, context.getUserToken());
}
try {
- bus.postFromTransaction(tagDefinitionEvent, tagDefinitionSqlDao);
+ bus.postFromTransaction(tagDefinitionEvent, tagDefinitionSqlDao, context);
} catch (Bus.EventBusException e) {
log.warn("Failed to post tag definition creation event for tag " + tagDefinition.getId(), e);
}
@@ -199,7 +199,7 @@ public class DefaultTagDefinitionDao implements TagDefinitionDao {
tagDefinitionEvent = tagEventBuilder.newUserTagDefinitionDeletionEvent(tagDefinition.getId(), tagDefinition, context.getUserToken());
}
try {
- bus.postFromTransaction(tagDefinitionEvent, tagDefinitionSqlDao);
+ bus.postFromTransaction(tagDefinitionEvent, tagDefinitionSqlDao, context);
} catch (Bus.EventBusException e) {
log.warn("Failed to post tag definition deletion event for tag " + tagDefinition.getId(), e);
}
diff --git a/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java b/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java
index 156ba7f..57a95ef 100644
--- a/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java
+++ b/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java
@@ -201,7 +201,7 @@ public abstract class TestEventBusBase extends UtilTestSuiteWithEmbeddedDB {
final MyEventHandler handler = new MyEventHandler(1);
eventBus.register(handler);
- eventBus.post(new MyEventWithException("my-event", 1L, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()));
+ eventBus.post(new MyEventWithException("my-event", 1L, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()), internalCallContext);
Thread.sleep(50000);
} catch (Exception ignored) {
@@ -215,7 +215,7 @@ public abstract class TestEventBusBase extends UtilTestSuiteWithEmbeddedDB {
eventBus.register(handler);
for (int i = 0; i < nbEvents; i++) {
- eventBus.post(new MyEvent("my-event", (long) i, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()));
+ eventBus.post(new MyEvent("my-event", (long) i, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()), internalCallContext);
}
final boolean completed = handler.waitForCompletion(10000);
@@ -231,9 +231,9 @@ public abstract class TestEventBusBase extends UtilTestSuiteWithEmbeddedDB {
eventBus.register(handler);
for (int i = 0; i < 5; i++) {
- eventBus.post(new MyOtherEvent("my-other-event", (double) i, UUID.randomUUID(), BusEventType.BUNDLE_REPAIR.toString()));
+ eventBus.post(new MyOtherEvent("my-other-event", (double) i, UUID.randomUUID(), BusEventType.BUNDLE_REPAIR.toString()), internalCallContext);
}
- eventBus.post(new MyEvent("my-event", 11l, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()));
+ eventBus.post(new MyEvent("my-event", 11l, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()), internalCallContext);
final boolean completed = handler.waitForCompletion(10000);
Assert.assertEquals(completed, true);