killbill-memoizeit

bus: pass the InternalCallContext when posting an event This

10/5/2012 8:33:56 PM

Changes

Details

diff --git a/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
index 908c110..47418dd 100644
--- a/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
@@ -112,7 +112,7 @@ public class AuditedAccountDao implements AccountDao {
 
                     final AccountCreationEvent creationEvent = new DefaultAccountCreationEvent(account, context.getUserToken());
                     try {
-                        eventBus.postFromTransaction(creationEvent, transactionalDao);
+                        eventBus.postFromTransaction(creationEvent, transactionalDao, context);
                     } catch (EventBusException e) {
                         log.warn("Failed to post account creation event for account " + account.getId(), e);
                     }
@@ -158,7 +158,7 @@ public class AuditedAccountDao implements AccountDao {
                     final AccountChangeEvent changeEvent = new DefaultAccountChangeEvent(accountId, context.getUserToken(), currentAccount, account);
                     if (changeEvent.hasChanges()) {
                         try {
-                            eventBus.postFromTransaction(changeEvent, transactional);
+                            eventBus.postFromTransaction(changeEvent, transactional, context);
                         } catch (EventBusException e) {
                             log.warn("Failed to post account change event for account " + accountId, e);
                         }
@@ -207,7 +207,7 @@ public class AuditedAccountDao implements AccountDao {
                     final AccountChangeEvent changeEvent = new DefaultAccountChangeEvent(accountId, context.getUserToken(), currentAccount, account);
                     if (changeEvent.hasChanges()) {
                         try {
-                            eventBus.postFromTransaction(changeEvent, transactional);
+                            eventBus.postFromTransaction(changeEvent, transactional, context);
                         } catch (EventBusException e) {
                             log.warn("Failed to post account change event for account " + accountId, e);
                         }
diff --git a/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java b/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java
index 5824566..ed5966b 100644
--- a/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java
+++ b/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java
@@ -54,7 +54,7 @@ public class MockAccountDao implements AccountDao {
         accounts.put(account.getId(), account);
 
         try {
-            eventBus.post(new DefaultAccountCreationEvent(account, null));
+            eventBus.post(new DefaultAccountCreationEvent(account, null), context);
         } catch (EventBusException ex) {
             throw new RuntimeException(ex);
         }
@@ -97,7 +97,7 @@ public class MockAccountDao implements AccountDao {
         final AccountChangeEvent changeEvent = new DefaultAccountChangeEvent(account.getId(), null, currentAccount, account);
         if (changeEvent.hasChanges()) {
             try {
-                eventBus.post(changeEvent);
+                eventBus.post(changeEvent, context);
             } catch (EventBusException ex) {
                 throw new RuntimeException(ex);
             }
diff --git a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
index 8429aad..ad14c3e 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
@@ -265,26 +265,26 @@ public class TestAnalyticsService extends AnalyticsTestSuiteWithEmbeddedDB {
         Assert.assertNull(accountSqlDao.getAccountByKey(ACCOUNT_KEY, internalCallContext));
 
         // Send events and wait for the async part...
-        bus.post(accountCreationNotification);
+        bus.post(accountCreationNotification, internalCallContext);
         Thread.sleep(5000);
         Assert.assertNotNull(accountSqlDao.getAccountByKey(ACCOUNT_KEY, internalCallContext));
 
         // Test subscriptions integration - this is just to exercise the code. It's hard to test the actual subscriptions
         // as we would need to mock a bunch of APIs (see integration tests in Beatrix instead)
-        bus.post(transition);
+        bus.post(transition, internalCallContext);
         Thread.sleep(5000);
 
         // Test invoice integration - the account creation notification has triggered a BAC update
         Assert.assertEquals(accountSqlDao.getAccountByKey(ACCOUNT_KEY, internalCallContext).getTotalInvoiceBalance().compareTo(INVOICE_AMOUNT), 1);
 
         // Post the same invoice event again - the invoice balance shouldn't change
-        bus.post(invoiceCreationNotification);
+        bus.post(invoiceCreationNotification, internalCallContext);
         Thread.sleep(5000);
         Assert.assertEquals(accountSqlDao.getAccountByKey(ACCOUNT_KEY, internalCallContext).getTotalInvoiceBalance().compareTo(INVOICE_AMOUNT), 1);
 
         // Test payment integration - the fields have already been populated, just make sure the code is exercised
         // It's hard to test the actual payments fields though in bac, since we should mock the plugin
-        bus.post(paymentInfoNotification);
+        bus.post(paymentInfoNotification, internalCallContext);
         Thread.sleep(5000);
 
         // Test the shutdown sequence
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index 0fe716a..e0f5380 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -199,7 +199,8 @@ public class Engine implements EventListener, EntitlementService {
         }
 
         try {
-            eventBus.post(subscription.getTransitionFromEvent(event, theRealSeqId));
+            final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(subscription.getBundleId(), ObjectType.BUNDLE, context);
+            eventBus.post(subscription.getTransitionFromEvent(event, theRealSeqId), internalCallContext);
         } catch (EventBusException e) {
             log.warn("Failed to post entitlement event " + event, e);
         }
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
index 7b84b10..9cc874d 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
@@ -237,7 +237,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
                                                         context);
 
                 // Notify the Bus of the requested change
-                notifyBusOfRequestedChange(transactional, subscription, nextPhase);
+                notifyBusOfRequestedChange(transactional, subscription, nextPhase, context);
 
                 return null;
             }
@@ -312,7 +312,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
 
                 // Notify the Bus of the latest requested change, if needed
                 if (initialEvents.size() > 0) {
-                    notifyBusOfRequestedChange(eventsDaoFromSameTransaction, subscription, initialEvents.get(initialEvents.size() - 1));
+                    notifyBusOfRequestedChange(eventsDaoFromSameTransaction, subscription, initialEvents.get(initialEvents.size() - 1), context);
                 }
 
                 return null;
@@ -341,7 +341,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
                 transactional.insertAuditFromTransaction(audits, context);
 
                 // Notify the Bus of the latest requested change
-                notifyBusOfRequestedChange(transactional, subscription, recreateEvents.get(recreateEvents.size() - 1));
+                notifyBusOfRequestedChange(transactional, subscription, recreateEvents.get(recreateEvents.size() - 1), context);
 
                 return null;
             }
@@ -399,7 +399,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
                     transactional.insertAuditFromTransaction(eventAudits, context);
 
                     // Notify the Bus of the latest requested change
-                    notifyBusOfRequestedChange(transactional, subscription, uncancelEvents.get(uncancelEvents.size() - 1));
+                    notifyBusOfRequestedChange(transactional, subscription, uncancelEvents.get(uncancelEvents.size() - 1), context);
                 }
 
                 return null;
@@ -431,7 +431,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
 
                 // Notify the Bus of the latest requested change
                 final EntitlementEvent finalEvent = changeEvents.get(changeEvents.size() - 1);
-                notifyBusOfRequestedChange(transactional, subscription, finalEvent);
+                notifyBusOfRequestedChange(transactional, subscription, finalEvent, context);
 
                 return null;
             }
@@ -454,7 +454,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
                                                 context);
 
         // Notify the Bus of the requested change
-        notifyBusOfRequestedChange(transactional, subscription, cancelEvent);
+        notifyBusOfRequestedChange(transactional, subscription, cancelEvent, context);
     }
 
     private void cancelNextPhaseEventFromTransaction(final UUID subscriptionId, final EntitlementEventSqlDao dao, final InternalCallContext context) {
@@ -644,7 +644,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
                 try {
                     // Note: we don't send a requested change event here, but a repair event
                     final RepairEntitlementEvent busEvent = new DefaultRepairEntitlementEvent(context.getUserToken(), accountId, bundleId, clock.getUTCNow());
-                    eventBus.postFromTransaction(busEvent, transactional);
+                    eventBus.postFromTransaction(busEvent, transactional, context);
                 } catch (EventBusException e) {
                     log.warn("Failed to post repair entitlement event for bundle " + bundleId, e);
                 }
@@ -697,9 +697,10 @@ public class AuditedEntitlementDao implements EntitlementDao {
         }
     }
 
-    private void notifyBusOfRequestedChange(final EntitlementEventSqlDao transactional, final SubscriptionData subscription, final EntitlementEvent nextEvent) {
+    private void notifyBusOfRequestedChange(final EntitlementEventSqlDao transactional, final SubscriptionData subscription,
+                                            final EntitlementEvent nextEvent, final InternalCallContext context) {
         try {
-            eventBus.postFromTransaction(new DefaultRequestedSubscriptionEvent(subscription, nextEvent), transactional);
+            eventBus.postFromTransaction(new DefaultRequestedSubscriptionEvent(subscription, nextEvent), transactional, context);
         } catch (EventBusException e) {
             log.warn("Failed to post requested change event for subscription " + subscription.getId(), e);
         }
@@ -738,7 +739,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
 
             // Notify the Bus of the latest requested change
             final EntitlementEvent finalEvent = curSubscription.getInitialEvents().get(curSubscription.getInitialEvents().size() - 1);
-            notifyBusOfRequestedChange(transactional, subData, finalEvent);
+            notifyBusOfRequestedChange(transactional, subData, finalEvent, context);
         }
 
         transBundleDao.insertBundle(bundleData, context);
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index bba2528..3275efc 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -46,12 +46,13 @@ import com.ning.billing.invoice.model.ExternalChargeInvoiceItem;
 import com.ning.billing.invoice.template.HtmlInvoiceGenerator;
 import com.ning.billing.util.api.TagApiException;
 import com.ning.billing.util.api.TagUserApi;
-import com.ning.billing.util.svcsapi.bus.Bus;
-import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
 import com.ning.billing.util.callcontext.CallContext;
+import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.callcontext.TenantContext;
 import com.ning.billing.util.dao.ObjectType;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
 import com.ning.billing.util.tag.ControlTagType;
 import com.ning.billing.util.tag.Tag;
 
@@ -153,7 +154,7 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
         // Retrieve the invoice for the account id
         final Invoice invoice = dao.getById(invoiceId, internalCallContextFactory.createInternalTenantContext(context));
         // This is for overdue
-        notifyBusOfInvoiceAdjustment(invoiceId, invoice.getAccountId(), context.getUserToken());
+        notifyBusOfInvoiceAdjustment(invoiceId, invoice.getAccountId(), context.getUserToken(), internalCallContextFactory.createInternalCallContext(invoice.getAccountId(), context));
     }
 
     @Override
@@ -164,7 +165,7 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
         // Retrieve the invoice for the account id
         final Invoice invoice = dao.getById(invoiceId, internalCallContextFactory.createInternalTenantContext(context));
         // This is for overdue
-        notifyBusOfInvoiceAdjustment(invoiceId, invoice.getAccountId(), context.getUserToken());
+        notifyBusOfInvoiceAdjustment(invoiceId, invoice.getAccountId(), context.getUserToken(), internalCallContextFactory.createInternalCallContext(invoice.getAccountId(), context));
     }
 
     @Override
@@ -279,9 +280,9 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
         return generator.generateInvoice(account, invoice, manualPay);
     }
 
-    private void notifyBusOfInvoiceAdjustment(final UUID invoiceId, final UUID accountId, final UUID userToken) {
+    private void notifyBusOfInvoiceAdjustment(final UUID invoiceId, final UUID accountId, final UUID userToken, final InternalCallContext context) {
         try {
-            eventBus.post(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken));
+            eventBus.post(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken), context);
         } catch (EventBusException e) {
             log.warn("Failed to post adjustment event for invoice " + invoiceId, e);
         }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/AuditedInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/AuditedInvoiceDao.java
index 40cd8c9..a30b40c 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/AuditedInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/AuditedInvoiceDao.java
@@ -402,7 +402,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
                 }
 
                 // Notify the bus since the balance of the invoice changed
-                notifyBusOfInvoiceAdjustment(transactional, invoice.getId(), invoice.getAccountId(), context.getUserToken());
+                notifyBusOfInvoiceAdjustment(transactional, invoice.getId(), invoice.getAccountId(), context.getUserToken(), context);
 
                 // Save audit logs
                 transactional.insertAuditFromTransaction(audits, context);
@@ -543,7 +543,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
 
                     // Notify the bus since the balance of the invoice changed
                     final UUID accountId = transactional.getAccountIdFromInvoicePaymentId(chargeBack.getId().toString(), context);
-                    notifyBusOfInvoiceAdjustment(transactional, payment.getInvoiceId(), accountId, context.getUserToken());
+                    notifyBusOfInvoiceAdjustment(transactional, payment.getInvoiceId(), accountId, context.getUserToken(), context);
 
                     return chargeBack;
                 }
@@ -637,7 +637,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
                 }
 
                 // Notify the bus since the balance of the invoice changed
-                notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken());
+                notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken(), context);
 
                 // Save audit logs
                 transactional.insertAuditFromTransaction(audits, context);
@@ -676,7 +676,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
                 insertItemAndAddCBAIfNeeded(transactional, credit, audits, context);
 
                 // Notify the bus since the balance of the invoice changed
-                notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken());
+                notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken(), context);
 
                 // Save audit logs
                 transactional.insertAuditFromTransaction(audits, context);
@@ -697,7 +697,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
                 final InvoiceItem invoiceItemAdjustment = createAdjustmentItem(transactional, invoiceId, invoiceItemId, positiveAdjAmount,
                                                                                currency, effectiveDate, context);
                 insertItemAndAddCBAIfNeeded(transactional, invoiceItemAdjustment, audits, context);
-                notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken());
+                notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken(), context);
 
                 // Save audit logs
                 transactional.insertAuditFromTransaction(audits, context);
@@ -978,9 +978,10 @@ public class AuditedInvoiceDao implements InvoiceDao {
         }
     }
 
-    private void notifyBusOfInvoiceAdjustment(final Transmogrifier transactional, final UUID invoiceId, final UUID accountId, final UUID userToken) {
+    private void notifyBusOfInvoiceAdjustment(final Transmogrifier transactional, final UUID invoiceId, final UUID accountId,
+                                              final UUID userToken, final InternalCallContext context) {
         try {
-            eventBus.postFromTransaction(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken), transactional);
+            eventBus.postFromTransaction(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken), transactional, context);
         } catch (EventBusException e) {
             log.warn("Failed to post adjustment event for invoice " + invoiceId, e);
         }
diff --git a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
index 5ddce5d..c2cc396 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
@@ -167,7 +167,7 @@ public class InvoiceDispatcher {
                 log.info("Generated null invoice.");
                 if (!dryRun) {
                     final BusEvent event = new DefaultNullInvoiceEvent(accountId, clock.getUTCToday(), context.getUserToken());
-                    postEvent(event, accountId);
+                    postEvent(event, accountId, internalCallContext);
                 }
             } else {
                 log.info("Generated invoice {} with {} items.", invoice.getId().toString(), invoice.getNumberOfItems());
@@ -191,7 +191,7 @@ public class InvoiceDispatcher {
                                                                                        context.getUserToken());
 
                     if (isRealInvoiceWithItems) {
-                        postEvent(event, accountId);
+                        postEvent(event, accountId, internalCallContext);
                     }
                 }
             }
@@ -226,9 +226,9 @@ public class InvoiceDispatcher {
         }
     }
 
-    private void postEvent(final BusEvent event, final UUID accountId) {
+    private void postEvent(final BusEvent event, final UUID accountId, final InternalCallContext context) {
         try {
-            eventBus.post(event);
+            eventBus.post(event, context);
         } catch (EventBusException e) {
             log.error(String.format("Failed to post event %s for account %s", event.getBusEventType(), accountId), e);
         }
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
index 152a762..c689a21 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
@@ -59,7 +59,7 @@ public class MockInvoiceDao implements InvoiceDao {
         try {
             eventBus.post(new DefaultInvoiceCreationEvent(invoice.getId(), invoice.getAccountId(),
                                                           invoice.getBalance(), invoice.getCurrency(),
-                                                          null));
+                                                          null), context);
         } catch (Bus.EventBusException ex) {
             throw new RuntimeException(ex);
         }
diff --git a/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java b/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
index d89032d..2096662 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
@@ -134,7 +134,7 @@ public class OverdueStateApplicator<T extends Blockable> {
         }
 
         try {
-            bus.post(createOverdueEvent(overdueable, previousOverdueStateName, nextOverdueState.getName()));
+            bus.post(createOverdueEvent(overdueable, previousOverdueStateName, nextOverdueState.getName()), context);
         } catch (Exception e) {
             log.error("Error posting overdue change event to bus", e);
         }
diff --git a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
index e3f141a..5a953e1 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
@@ -214,7 +214,7 @@ public class PaymentProcessor extends ProcessorBase {
             // This means that events will be posted for null and zero dollar invoices (e.g. trials).
             final PaymentErrorEvent event = new DefaultPaymentErrorEvent(account.getId(), invoiceId, null,
                                                                          ErrorCode.PAYMENT_NO_DEFAULT_PAYMENT_METHOD.toString(), context.getUserToken());
-            postPaymentEvent(event, account.getId());
+            postPaymentEvent(event, account.getId(), context);
             throw e;
         }
 
@@ -484,7 +484,7 @@ public class PaymentProcessor extends ProcessorBase {
             throw new PaymentApiException(ErrorCode.INVOICE_NOT_FOUND, invoice.getId(), e.toString());
         } finally {
             if (event != null) {
-                postPaymentEvent(event, account.getId());
+                postPaymentEvent(event, account.getId(), context);
             }
         }
         return new DefaultPayment(payment, allAttempts, Collections.<RefundModelDao>emptyList());
diff --git a/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java b/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
index 65ba0e3..30c72b0 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
@@ -125,12 +125,12 @@ public abstract class ProcessorBase {
         return getPaymentProviderPlugin(paymentMethodId, context);
     }
 
-    protected void postPaymentEvent(final BusEvent ev, final UUID accountId) {
+    protected void postPaymentEvent(final BusEvent ev, final UUID accountId, final InternalCallContext context) {
         if (ev == null) {
             return;
         }
         try {
-            eventBus.post(ev);
+            eventBus.post(ev, context);
         } catch (EventBusException e) {
             log.error("Failed to post Payment event event for account {} ", accountId, e);
         }
diff --git a/payment/src/test/java/com/ning/billing/payment/TestHelper.java b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
index b37516d..60d9ad5 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestHelper.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
@@ -34,6 +34,7 @@ import com.ning.billing.payment.api.PaymentApi;
 import com.ning.billing.payment.api.PaymentMethodPlugin;
 import com.ning.billing.payment.glue.PaymentTestModuleWithMocks;
 import com.ning.billing.payment.provider.DefaultNoOpPaymentMethodPlugin;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
 import com.ning.billing.util.svcsapi.bus.Bus;
 import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
 import com.ning.billing.util.callcontext.CallContext;
@@ -52,15 +53,17 @@ public class TestHelper {
     private final CallContext context;
     private final Bus eventBus;
     private final Clock clock;
+    private final InternalCallContextFactory internalCallContextFactory;
 
     @Inject
     public TestHelper(final CallContextFactory factory, final AccountUserApi accountUserApi, final InvoicePaymentApi invoicePaymentApi,
-                      final PaymentApi paymentApi, final Bus eventBus, final Clock clock) {
+                      final PaymentApi paymentApi, final Bus eventBus, final Clock clock, final InternalCallContextFactory internalCallContextFactory) {
         this.eventBus = eventBus;
         this.accountUserApi = accountUserApi;
         this.invoicePaymentApi = invoicePaymentApi;
         this.paymentApi = paymentApi;
         this.clock = clock;
+        this.internalCallContextFactory = internalCallContextFactory;
         context = factory.createCallContext(null, "Princess Buttercup", CallOrigin.TEST, UserType.TEST);
     }
 
@@ -94,7 +97,7 @@ public class TestHelper {
                                                                         invoice.getInvoiceDate(),
                                                                         context.getUserToken());
 
-        eventBus.post(event);
+        eventBus.post(event, internalCallContextFactory.createInternalCallContext(account.getId(), context));
         return invoice;
     }
 
diff --git a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
index 440296c..e7d969d 100644
--- a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
@@ -25,6 +25,7 @@ import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.svcsapi.bus.Bus;
 
 import com.google.common.eventbus.AsyncEventBus;
@@ -89,13 +90,13 @@ public class InMemoryBus implements Bus {
     }
 
     @Override
-    public void post(final BusEvent event) throws EventBusException {
+    public void post(final BusEvent event, final InternalCallContext context) throws EventBusException {
         checkInitialized("post");
         delegate.post(event);
     }
 
     @Override
-    public void postFromTransaction(final BusEvent event, final Transmogrifier dao) throws EventBusException {
+    public void postFromTransaction(final BusEvent event, final Transmogrifier dao, final InternalCallContext context) throws EventBusException {
         checkInitialized("postFromTransaction");
         delegate.post(event);
     }
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
index fcb2d07..be503b7 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
@@ -22,8 +22,6 @@ import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
-import javax.annotation.Nullable;
-
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Transaction;
 import org.skife.jdbi.v2.TransactionStatus;
@@ -109,7 +107,8 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
 
     @Override
     public int doProcessEvents() {
-        final InternalCallContext context = createCallContext(null);
+        // Note: retrieving and clearing bus events is not done per tenant (yet?)
+        final InternalCallContext context = internalCallContextFactory.createInternalCallContext("PersistentBus", CallOrigin.INTERNAL, UserType.SYSTEM, null);
 
         final List<BusEventEntry> events = getNextBusEvent(context);
         if (events.size() == 0) {
@@ -155,36 +154,31 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
     }
 
     @Override
-    public void post(final BusEvent event) throws EventBusException {
+    public void post(final BusEvent event, final InternalCallContext context) throws EventBusException {
         dao.inTransaction(new Transaction<Void, PersistentBusSqlDao>() {
             @Override
             public Void inTransaction(final PersistentBusSqlDao transactional,
                                       final TransactionStatus status) throws Exception {
-                postFromTransaction(event, transactional);
+                postFromTransaction(event, context, transactional);
                 return null;
             }
         });
     }
 
     @Override
-    public void postFromTransaction(final BusEvent event, final Transmogrifier transmogrifier)
+    public void postFromTransaction(final BusEvent event, final Transmogrifier transmogrifier, final InternalCallContext context)
             throws EventBusException {
         final PersistentBusSqlDao transactional = transmogrifier.become(PersistentBusSqlDao.class);
-        postFromTransaction(event, transactional);
+        postFromTransaction(event, context, transactional);
     }
 
-    private void postFromTransaction(final BusEvent event, final PersistentBusSqlDao transactional) {
+    private void postFromTransaction(final BusEvent event, final InternalCallContext context, final PersistentBusSqlDao transactional) {
         try {
             final String json = objectMapper.writeValueAsString(event);
             final BusEventEntry entry = new BusEventEntry(hostname, event.getClass().getName(), json);
-            transactional.insertBusEvent(entry, createCallContext(event));
+            transactional.insertBusEvent(entry, context);
         } catch (Exception e) {
             log.error("Failed to post BusEvent " + event, e);
         }
     }
-
-    private InternalCallContext createCallContext(@Nullable final BusEvent event) {
-        return internalCallContextFactory.createInternalCallContext("PersistentBus", CallOrigin.INTERNAL, UserType.SYSTEM,
-                                                                    event == null ? null : event.getUserToken());
-    }
 }
diff --git a/util/src/main/java/com/ning/billing/util/svcsapi/bus/Bus.java b/util/src/main/java/com/ning/billing/util/svcsapi/bus/Bus.java
index 8212d49..adf3d01 100644
--- a/util/src/main/java/com/ning/billing/util/svcsapi/bus/Bus.java
+++ b/util/src/main/java/com/ning/billing/util/svcsapi/bus/Bus.java
@@ -19,6 +19,7 @@ package com.ning.billing.util.svcsapi.bus;
 import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
 
 import com.ning.billing.util.bus.BusEvent;
+import com.ning.billing.util.callcontext.InternalCallContext;
 
 import com.google.common.eventbus.Subscribe;
 
@@ -32,7 +33,6 @@ import com.google.common.eventbus.Subscribe;
  */
 public interface Bus {
 
-
     public class EventBusException extends Exception {
 
         private static final long serialVersionUID = 12355236L;
@@ -64,38 +64,36 @@ public interface Bus {
      * Registers all handler methods on {@code object} to receive events.
      * Handler methods need to be Annotated with {@link Subscribe}
      *
-     * @param handlerInstance
+     * @param handlerInstance handler to register
      * @throws EventBusException if bus not been started yet
      */
     public void register(Object handlerInstance) throws EventBusException;
 
-
     /**
      * Unregister the handler for a particular type of event
      *
-     * @param handlerInstance
+     * @param handlerInstance handler to unregister
      * @throws EventBusException
      */
     public void unregister(Object handlerInstance) throws EventBusException;
 
-
     /**
      * Post an event asynchronously
      *
-     * @param event to be posted
+     * @param event   to be posted
+     * @param context call context. account record id and tenant id are expected to be populated
      * @throws EventBusException if bus not been started yet
      */
-    public void post(BusEvent event) throws EventBusException;
+    public void post(BusEvent event, InternalCallContext context) throws EventBusException;
 
     /**
      * Post an event from within a transaction.
      * Guarantees that the event is persisted on disk from within the same transaction
      *
-     * @param event to be posted
-     * @param dao   a valid DAO object obtained through the DBI.onDemand() API.
+     * @param event   to be posted
+     * @param dao     a valid DAO object obtained through the DBI.onDemand() API.
+     * @param context call context. account record id and tenant id are expected to be populated
      * @throws EventBusException if bus not been started yet
      */
-    public void postFromTransaction(BusEvent event, Transmogrifier dao) throws EventBusException;
-
-
+    public void postFromTransaction(BusEvent event, Transmogrifier dao, InternalCallContext context) throws EventBusException;
 }
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
index c6d63a9..26b13ed 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
@@ -130,7 +130,7 @@ public class AuditedTagDao extends AuditedCollectionDaoBase<Tag, Tag> implements
                     tagEvent = tagEventBuilder.newUserTagCreationEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
                 }
                 try {
-                    bus.postFromTransaction(tagEvent, AuditedTagDao.this.tagSqlDao);
+                    bus.postFromTransaction(tagEvent, tagSqlDao, context);
                 } catch (Bus.EventBusException e) {
                     log.warn("Failed to post tag creation event for tag " + tag.getId().toString(), e);
                 }
@@ -180,7 +180,7 @@ public class AuditedTagDao extends AuditedCollectionDaoBase<Tag, Tag> implements
                         tagEvent = tagEventBuilder.newUserTagDeletionEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
                     }
                     try {
-                        bus.postFromTransaction(tagEvent, tagSqlDao);
+                        bus.postFromTransaction(tagEvent, tagSqlDao, context);
                     } catch (Bus.EventBusException e) {
                         log.warn("Failed to post tag deletion event for tag " + tag.getId().toString(), e);
                     }
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
index 39538ce..5fe367e 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
@@ -144,7 +144,7 @@ public class DefaultTagDefinitionDao implements TagDefinitionDao {
                         tagDefinitionEvent = tagEventBuilder.newUserTagDefinitionCreationEvent(tagDefinition.getId(), tagDefinition, context.getUserToken());
                     }
                     try {
-                        bus.postFromTransaction(tagDefinitionEvent, tagDefinitionSqlDao);
+                        bus.postFromTransaction(tagDefinitionEvent, tagDefinitionSqlDao, context);
                     } catch (Bus.EventBusException e) {
                         log.warn("Failed to post tag definition creation event for tag " + tagDefinition.getId(), e);
                     }
@@ -199,7 +199,7 @@ public class DefaultTagDefinitionDao implements TagDefinitionDao {
                         tagDefinitionEvent = tagEventBuilder.newUserTagDefinitionDeletionEvent(tagDefinition.getId(), tagDefinition, context.getUserToken());
                     }
                     try {
-                        bus.postFromTransaction(tagDefinitionEvent, tagDefinitionSqlDao);
+                        bus.postFromTransaction(tagDefinitionEvent, tagDefinitionSqlDao, context);
                     } catch (Bus.EventBusException e) {
                         log.warn("Failed to post tag definition deletion event for tag " + tagDefinition.getId(), e);
                     }
diff --git a/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java b/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java
index 156ba7f..57a95ef 100644
--- a/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java
+++ b/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java
@@ -201,7 +201,7 @@ public abstract class TestEventBusBase extends UtilTestSuiteWithEmbeddedDB {
             final MyEventHandler handler = new MyEventHandler(1);
             eventBus.register(handler);
 
-            eventBus.post(new MyEventWithException("my-event", 1L, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()));
+            eventBus.post(new MyEventWithException("my-event", 1L, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()), internalCallContext);
 
             Thread.sleep(50000);
         } catch (Exception ignored) {
@@ -215,7 +215,7 @@ public abstract class TestEventBusBase extends UtilTestSuiteWithEmbeddedDB {
             eventBus.register(handler);
 
             for (int i = 0; i < nbEvents; i++) {
-                eventBus.post(new MyEvent("my-event", (long) i, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()));
+                eventBus.post(new MyEvent("my-event", (long) i, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()), internalCallContext);
             }
 
             final boolean completed = handler.waitForCompletion(10000);
@@ -231,9 +231,9 @@ public abstract class TestEventBusBase extends UtilTestSuiteWithEmbeddedDB {
             eventBus.register(handler);
 
             for (int i = 0; i < 5; i++) {
-                eventBus.post(new MyOtherEvent("my-other-event", (double) i, UUID.randomUUID(), BusEventType.BUNDLE_REPAIR.toString()));
+                eventBus.post(new MyOtherEvent("my-other-event", (double) i, UUID.randomUUID(), BusEventType.BUNDLE_REPAIR.toString()), internalCallContext);
             }
-            eventBus.post(new MyEvent("my-event", 11l, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()));
+            eventBus.post(new MyEvent("my-event", 11l, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()), internalCallContext);
 
             final boolean completed = handler.waitForCompletion(10000);
             Assert.assertEquals(completed, true);