killbill-memoizeit
Changes
analytics/src/test/java/com/ning/billing/analytics/api/user/TestDefaultAnalyticsUserApi.java 5(+2 -3)
entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java 25(+13 -12)
entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java 2(+1 -1)
invoice/src/test/java/com/ning/billing/invoice/api/invoice/TestDefaultInvoicePaymentApi.java 2(+1 -1)
Details
diff --git a/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
index b770d14..3854d11 100644
--- a/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
@@ -34,8 +34,9 @@ import com.ning.billing.account.api.AccountCreationEvent;
import com.ning.billing.account.api.user.DefaultAccountChangeEvent;
import com.ning.billing.account.api.user.DefaultAccountCreationEvent;
import com.ning.billing.util.ChangeType;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.Bus.EventBusException;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.dao.EntityAudit;
@@ -51,11 +52,13 @@ public class AuditedAccountDao implements AccountDao {
private final AccountSqlDao accountSqlDao;
private final Bus eventBus;
+ private final InternalCallContextFactory internalCallContextFactory;
@Inject
- public AuditedAccountDao(final IDBI dbi, final Bus eventBus) {
+ public AuditedAccountDao(final IDBI dbi, final Bus eventBus, final InternalCallContextFactory internalCallContextFactory) {
this.eventBus = eventBus;
this.accountSqlDao = dbi.onDemand(AccountSqlDao.class);
+ this.internalCallContextFactory = internalCallContextFactory;
}
@Override
@@ -112,7 +115,7 @@ public class AuditedAccountDao implements AccountDao {
final AccountCreationEvent creationEvent = new DefaultAccountCreationEvent(account, context.getUserToken());
try {
- eventBus.postFromTransaction(creationEvent, transactionalDao);
+ eventBus.postFromTransaction(creationEvent, transactionalDao, internalCallContextFactory.createInternalCallContext(recordId, context));
} catch (EventBusException e) {
log.warn("Failed to post account creation event for account " + account.getId(), e);
}
@@ -158,7 +161,7 @@ public class AuditedAccountDao implements AccountDao {
final AccountChangeEvent changeEvent = new DefaultAccountChangeEvent(accountId, context.getUserToken(), currentAccount, account);
if (changeEvent.hasChanges()) {
try {
- eventBus.postFromTransaction(changeEvent, transactional);
+ eventBus.postFromTransaction(changeEvent, transactional, context);
} catch (EventBusException e) {
log.warn("Failed to post account change event for account " + accountId, e);
}
@@ -207,7 +210,7 @@ public class AuditedAccountDao implements AccountDao {
final AccountChangeEvent changeEvent = new DefaultAccountChangeEvent(accountId, context.getUserToken(), currentAccount, account);
if (changeEvent.hasChanges()) {
try {
- eventBus.postFromTransaction(changeEvent, transactional);
+ eventBus.postFromTransaction(changeEvent, transactional, context);
} catch (EventBusException e) {
log.warn("Failed to post account change event for account " + accountId, e);
}
diff --git a/account/src/test/java/com/ning/billing/account/api/user/TestDefaultAccountUserApi.java b/account/src/test/java/com/ning/billing/account/api/user/TestDefaultAccountUserApi.java
index ed1ad17..ac3869c 100644
--- a/account/src/test/java/com/ning/billing/account/api/user/TestDefaultAccountUserApi.java
+++ b/account/src/test/java/com/ning/billing/account/api/user/TestDefaultAccountUserApi.java
@@ -36,7 +36,7 @@ import com.ning.billing.account.dao.AccountEmailDao;
import com.ning.billing.account.dao.MockAccountDao;
import com.ning.billing.account.dao.MockAccountEmailDao;
import com.ning.billing.catalog.api.Currency;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.callcontext.CallContextFactory;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
diff --git a/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java b/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
index cb4e87f..594faa5 100644
--- a/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
+++ b/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
@@ -22,8 +22,10 @@ import org.skife.jdbi.v2.IDBI;
import org.testng.annotations.BeforeClass;
import com.ning.billing.account.AccountTestSuiteWithEmbeddedDB;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.BusService;
import com.ning.billing.util.bus.DefaultBusService;
import com.ning.billing.util.bus.InMemoryBus;
import com.ning.billing.util.tag.api.user.TagEventBuilder;
@@ -48,7 +50,7 @@ public abstract class AccountDaoTestBase extends AccountTestSuiteWithEmbeddedDB
final BusService busService = new DefaultBusService(bus);
((DefaultBusService) busService).startBus();
- accountDao = new AuditedAccountDao(dbi, bus);
+ accountDao = new AuditedAccountDao(dbi, bus, new InternalCallContextFactory(dbi, new ClockMock()));
// Health check test to make sure MySQL is setup properly
accountDao.test(internalCallContext);
diff --git a/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java b/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java
index 9aff9e2..ed5966b 100644
--- a/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java
+++ b/account/src/test/java/com/ning/billing/account/dao/MockAccountDao.java
@@ -26,8 +26,8 @@ import com.ning.billing.account.api.Account;
import com.ning.billing.account.api.AccountChangeEvent;
import com.ning.billing.account.api.user.DefaultAccountChangeEvent;
import com.ning.billing.account.api.user.DefaultAccountCreationEvent;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.Bus.EventBusException;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.entity.EntityPersistenceException;
@@ -54,7 +54,7 @@ public class MockAccountDao implements AccountDao {
accounts.put(account.getId(), account);
try {
- eventBus.post(new DefaultAccountCreationEvent(account, null));
+ eventBus.post(new DefaultAccountCreationEvent(account, null), context);
} catch (EventBusException ex) {
throw new RuntimeException(ex);
}
@@ -97,7 +97,7 @@ public class MockAccountDao implements AccountDao {
final AccountChangeEvent changeEvent = new DefaultAccountChangeEvent(account.getId(), null, currentAccount, account);
if (changeEvent.hasChanges()) {
try {
- eventBus.post(changeEvent);
+ eventBus.post(changeEvent, context);
} catch (EventBusException ex) {
throw new RuntimeException(ex);
}
diff --git a/analytics/src/main/java/com/ning/billing/analytics/api/DefaultAnalyticsService.java b/analytics/src/main/java/com/ning/billing/analytics/api/DefaultAnalyticsService.java
index 3f61530..d85cd38 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/api/DefaultAnalyticsService.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/api/DefaultAnalyticsService.java
@@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.ning.billing.analytics.AnalyticsListener;
import com.ning.billing.lifecycle.LifecycleHandlerType;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
public class DefaultAnalyticsService implements AnalyticsService {
private static final Logger log = LoggerFactory.getLogger(DefaultAnalyticsService.class);
diff --git a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
index dc3e1a8..ad14c3e 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
@@ -79,7 +79,7 @@ import com.ning.billing.payment.api.PaymentStatus;
import com.ning.billing.payment.dao.PaymentAttemptModelDao;
import com.ning.billing.payment.dao.PaymentDao;
import com.ning.billing.payment.dao.PaymentModelDao;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.DefaultClock;
@@ -265,26 +265,26 @@ public class TestAnalyticsService extends AnalyticsTestSuiteWithEmbeddedDB {
Assert.assertNull(accountSqlDao.getAccountByKey(ACCOUNT_KEY, internalCallContext));
// Send events and wait for the async part...
- bus.post(accountCreationNotification);
+ bus.post(accountCreationNotification, internalCallContext);
Thread.sleep(5000);
Assert.assertNotNull(accountSqlDao.getAccountByKey(ACCOUNT_KEY, internalCallContext));
// Test subscriptions integration - this is just to exercise the code. It's hard to test the actual subscriptions
// as we would need to mock a bunch of APIs (see integration tests in Beatrix instead)
- bus.post(transition);
+ bus.post(transition, internalCallContext);
Thread.sleep(5000);
// Test invoice integration - the account creation notification has triggered a BAC update
Assert.assertEquals(accountSqlDao.getAccountByKey(ACCOUNT_KEY, internalCallContext).getTotalInvoiceBalance().compareTo(INVOICE_AMOUNT), 1);
// Post the same invoice event again - the invoice balance shouldn't change
- bus.post(invoiceCreationNotification);
+ bus.post(invoiceCreationNotification, internalCallContext);
Thread.sleep(5000);
Assert.assertEquals(accountSqlDao.getAccountByKey(ACCOUNT_KEY, internalCallContext).getTotalInvoiceBalance().compareTo(INVOICE_AMOUNT), 1);
// Test payment integration - the fields have already been populated, just make sure the code is exercised
// It's hard to test the actual payments fields though in bac, since we should mock the plugin
- bus.post(paymentInfoNotification);
+ bus.post(paymentInfoNotification, internalCallContext);
Thread.sleep(5000);
// Test the shutdown sequence
diff --git a/analytics/src/test/java/com/ning/billing/analytics/api/user/TestDefaultAnalyticsUserApi.java b/analytics/src/test/java/com/ning/billing/analytics/api/user/TestDefaultAnalyticsUserApi.java
index b76e1cf..70cb427 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/api/user/TestDefaultAnalyticsUserApi.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/api/user/TestDefaultAnalyticsUserApi.java
@@ -20,7 +20,6 @@ import java.math.BigDecimal;
import java.util.UUID;
import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
import org.mockito.Mockito;
import org.skife.jdbi.v2.IDBI;
import org.testng.Assert;
@@ -97,7 +96,7 @@ public class TestDefaultAnalyticsUserApi extends AnalyticsTestSuiteWithEmbeddedD
final TimeSeriesData data = analyticsUserApi.getAccountsCreatedOverTime(tenantContext);
Assert.assertEquals(data.getDates().size(), 1);
- Assert.assertEquals(data.getDates().get(0), new LocalDate());
+ Assert.assertEquals(data.getDates().get(0), clock.getUTCToday());
Assert.assertEquals(data.getValues().size(), 1);
Assert.assertEquals(data.getValues().get(0), (double) 1);
}
@@ -131,7 +130,7 @@ public class TestDefaultAnalyticsUserApi extends AnalyticsTestSuiteWithEmbeddedD
final TimeSeriesData data = analyticsUserApi.getSubscriptionsCreatedOverTime(productType, phase.getName(), tenantContext);
Assert.assertEquals(data.getDates().size(), 1);
- Assert.assertEquals(data.getDates().get(0), new LocalDate());
+ Assert.assertEquals(data.getDates().get(0), clock.getUTCToday());
Assert.assertEquals(data.getValues().size(), 1);
Assert.assertEquals(data.getValues().get(0), (double) 1);
}
diff --git a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
index c4d1c4d..51700d1 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/TestBusinessTagRecorder.java
@@ -54,6 +54,7 @@ import com.ning.billing.mock.MockAccountBuilder;
import com.ning.billing.util.bus.InMemoryBus;
import com.ning.billing.util.callcontext.DefaultCallContextFactory;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.clock.DefaultClock;
import com.ning.billing.util.dao.ObjectType;
import com.ning.billing.util.notificationq.DefaultNotificationQueueService;
@@ -75,7 +76,7 @@ public class TestBusinessTagRecorder extends AnalyticsTestSuiteWithEmbeddedDB {
final BusinessInvoicePaymentTagSqlDao invoicePaymentTagSqlDao = dbi.onDemand(BusinessInvoicePaymentTagSqlDao.class);
subscriptionTransitionTagSqlDao = dbi.onDemand(BusinessSubscriptionTransitionTagSqlDao.class);
eventBus = new InMemoryBus();
- final AccountDao accountDao = new AuditedAccountDao(dbi, eventBus);
+ final AccountDao accountDao = new AuditedAccountDao(dbi, eventBus, new InternalCallContextFactory(dbi, new ClockMock()));
final AccountEmailDao accountEmailDao = new AuditedAccountEmailDao(dbi);
final DefaultClock clock = new DefaultClock();
callContextFactory = new DefaultCallContextFactory(clock);
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/BeatrixModule.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/BeatrixModule.java
index e82d1bd..0f1bdd3 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/BeatrixModule.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/BeatrixModule.java
@@ -52,7 +52,7 @@ import com.ning.billing.overdue.OverdueService;
import com.ning.billing.payment.api.PaymentService;
import com.ning.billing.payment.glue.PaymentModule;
import com.ning.billing.payment.provider.MockPaymentProviderPluginModule;
-import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.svcsapi.bus.BusService;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.email.EmailModule;
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/MockOverdueService.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/MockOverdueService.java
index 7dc1ddb..1af2d4c 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/MockOverdueService.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/overdue/MockOverdueService.java
@@ -23,7 +23,7 @@ import com.ning.billing.overdue.OverdueUserApi;
import com.ning.billing.overdue.listener.OverdueListener;
import com.ning.billing.overdue.service.DefaultOverdueService;
import com.ning.billing.overdue.wrapper.OverdueWrapperFactory;
-import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.svcsapi.bus.BusService;
public class MockOverdueService extends DefaultOverdueService {
@Inject
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java
index 910895e..0d3e8f8 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegrationBase.java
@@ -76,7 +76,7 @@ import com.ning.billing.payment.api.PaymentApiException;
import com.ning.billing.payment.api.PaymentMethodPlugin;
import com.ning.billing.payment.provider.MockPaymentProviderPlugin;
import com.ning.billing.util.api.TagUserApi;
-import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.svcsapi.bus.BusService;
import com.ning.billing.util.clock.ClockMock;
import com.google.common.base.Function;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
index a52efb9..e0f5380 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/core/Engine.java
@@ -49,8 +49,8 @@ import com.ning.billing.entitlement.events.user.ApiEventCancel;
import com.ning.billing.entitlement.exceptions.EntitlementError;
import com.ning.billing.lifecycle.LifecycleHandlerType;
import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.Bus.EventBusException;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.callcontext.CallContextFactory;
import com.ning.billing.util.callcontext.CallOrigin;
@@ -199,7 +199,8 @@ public class Engine implements EventListener, EntitlementService {
}
try {
- eventBus.post(subscription.getTransitionFromEvent(event, theRealSeqId));
+ final InternalCallContext internalCallContext = internalCallContextFactory.createInternalCallContext(subscription.getBundleId(), ObjectType.BUNDLE, context);
+ eventBus.post(subscription.getTransitionFromEvent(event, theRealSeqId), internalCallContext);
} catch (EventBusException e) {
log.warn("Failed to post entitlement event " + event, e);
}
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
index 7ec7bca..9cc874d 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/AuditedEntitlementDao.java
@@ -67,8 +67,8 @@ import com.ning.billing.entitlement.events.user.ApiEventChange;
import com.ning.billing.entitlement.events.user.ApiEventType;
import com.ning.billing.entitlement.exceptions.EntitlementError;
import com.ning.billing.util.ChangeType;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.Bus.EventBusException;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.clock.Clock;
@@ -237,7 +237,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
context);
// Notify the Bus of the requested change
- notifyBusOfRequestedChange(transactional, subscription, nextPhase);
+ notifyBusOfRequestedChange(transactional, subscription, nextPhase, context);
return null;
}
@@ -312,7 +312,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
// Notify the Bus of the latest requested change, if needed
if (initialEvents.size() > 0) {
- notifyBusOfRequestedChange(eventsDaoFromSameTransaction, subscription, initialEvents.get(initialEvents.size() - 1));
+ notifyBusOfRequestedChange(eventsDaoFromSameTransaction, subscription, initialEvents.get(initialEvents.size() - 1), context);
}
return null;
@@ -341,7 +341,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
transactional.insertAuditFromTransaction(audits, context);
// Notify the Bus of the latest requested change
- notifyBusOfRequestedChange(transactional, subscription, recreateEvents.get(recreateEvents.size() - 1));
+ notifyBusOfRequestedChange(transactional, subscription, recreateEvents.get(recreateEvents.size() - 1), context);
return null;
}
@@ -399,7 +399,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
transactional.insertAuditFromTransaction(eventAudits, context);
// Notify the Bus of the latest requested change
- notifyBusOfRequestedChange(transactional, subscription, uncancelEvents.get(uncancelEvents.size() - 1));
+ notifyBusOfRequestedChange(transactional, subscription, uncancelEvents.get(uncancelEvents.size() - 1), context);
}
return null;
@@ -431,7 +431,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
// Notify the Bus of the latest requested change
final EntitlementEvent finalEvent = changeEvents.get(changeEvents.size() - 1);
- notifyBusOfRequestedChange(transactional, subscription, finalEvent);
+ notifyBusOfRequestedChange(transactional, subscription, finalEvent, context);
return null;
}
@@ -454,7 +454,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
context);
// Notify the Bus of the requested change
- notifyBusOfRequestedChange(transactional, subscription, cancelEvent);
+ notifyBusOfRequestedChange(transactional, subscription, cancelEvent, context);
}
private void cancelNextPhaseEventFromTransaction(final UUID subscriptionId, final EntitlementEventSqlDao dao, final InternalCallContext context) {
@@ -644,7 +644,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
try {
// Note: we don't send a requested change event here, but a repair event
final RepairEntitlementEvent busEvent = new DefaultRepairEntitlementEvent(context.getUserToken(), accountId, bundleId, clock.getUTCNow());
- eventBus.postFromTransaction(busEvent, transactional);
+ eventBus.postFromTransaction(busEvent, transactional, context);
} catch (EventBusException e) {
log.warn("Failed to post repair entitlement event for bundle " + bundleId, e);
}
@@ -697,9 +697,10 @@ public class AuditedEntitlementDao implements EntitlementDao {
}
}
- private void notifyBusOfRequestedChange(final EntitlementEventSqlDao transactional, final SubscriptionData subscription, final EntitlementEvent nextEvent) {
+ private void notifyBusOfRequestedChange(final EntitlementEventSqlDao transactional, final SubscriptionData subscription,
+ final EntitlementEvent nextEvent, final InternalCallContext context) {
try {
- eventBus.postFromTransaction(new DefaultRequestedSubscriptionEvent(subscription, nextEvent), transactional);
+ eventBus.postFromTransaction(new DefaultRequestedSubscriptionEvent(subscription, nextEvent), transactional, context);
} catch (EventBusException e) {
log.warn("Failed to post requested change event for subscription " + subscription.getId(), e);
}
@@ -738,7 +739,7 @@ public class AuditedEntitlementDao implements EntitlementDao {
// Notify the Bus of the latest requested change
final EntitlementEvent finalEvent = curSubscription.getInitialEvents().get(curSubscription.getInitialEvents().size() - 1);
- notifyBusOfRequestedChange(transactional, subData, finalEvent);
+ notifyBusOfRequestedChange(transactional, subData, finalEvent, context);
}
transBundleDao.insertBundle(bundleData, context);
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index 40cf2f4..55dd482 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -80,7 +80,7 @@ import com.ning.billing.entitlement.events.phase.PhaseEvent;
import com.ning.billing.entitlement.events.user.ApiEvent;
import com.ning.billing.entitlement.events.user.ApiEventType;
import com.ning.billing.mock.MockAccountBuilder;
-import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.svcsapi.bus.BusService;
import com.ning.billing.util.bus.DefaultBusService;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
index d4aeaab..e525e46 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/engine/dao/MockEntitlementDaoSql.java
@@ -20,7 +20,7 @@ import org.skife.jdbi.v2.IDBI;
import com.ning.billing.catalog.api.CatalogService;
import com.ning.billing.entitlement.engine.addon.AddonUtils;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.notificationq.NotificationQueueService;
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/DefaultInvoiceService.java b/invoice/src/main/java/com/ning/billing/invoice/api/DefaultInvoiceService.java
index c6e57c7..aa28c01 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/DefaultInvoiceService.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/DefaultInvoiceService.java
@@ -21,7 +21,7 @@ import com.ning.billing.invoice.TagHandler;
import com.ning.billing.invoice.notification.NextBillingDateNotifier;
import com.ning.billing.lifecycle.LifecycleHandlerType;
import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
diff --git a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
index 759c867..4be08a1 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/api/user/DefaultInvoiceUserApi.java
@@ -44,8 +44,6 @@ import com.ning.billing.invoice.model.CreditAdjInvoiceItem;
import com.ning.billing.invoice.model.ExternalChargeInvoiceItem;
import com.ning.billing.invoice.template.HtmlInvoiceGenerator;
import com.ning.billing.util.api.TagApiException;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.Bus.EventBusException;
import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
@@ -54,6 +52,8 @@ import com.ning.billing.util.callcontext.TenantContext;
import com.ning.billing.util.dao.ObjectType;
import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.tag.TagInternalApi;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.ning.billing.util.tag.ControlTagType;
import com.ning.billing.util.tag.Tag;
@@ -155,7 +155,7 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
// Retrieve the invoice for the account id
final Invoice invoice = dao.getById(invoiceId, internalContext);
// This is for overdue
- notifyBusOfInvoiceAdjustment(invoiceId, invoice.getAccountId(), context.getUserToken());
+ notifyBusOfInvoiceAdjustment(invoiceId, invoice.getAccountId(), context.getUserToken(), internalCallContextFactory.createInternalCallContext(invoice.getAccountId(), context));
}
@Override
@@ -167,7 +167,7 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
// Retrieve the invoice for the account id
final Invoice invoice = dao.getById(invoiceId, internalContext);
// This is for overdue
- notifyBusOfInvoiceAdjustment(invoiceId, invoice.getAccountId(), context.getUserToken());
+ notifyBusOfInvoiceAdjustment(invoiceId, invoice.getAccountId(), context.getUserToken(), internalCallContextFactory.createInternalCallContext(invoice.getAccountId(), context));
}
@Override
@@ -283,9 +283,9 @@ public class DefaultInvoiceUserApi implements InvoiceUserApi {
return generator.generateInvoice(account, invoice, manualPay);
}
- private void notifyBusOfInvoiceAdjustment(final UUID invoiceId, final UUID accountId, final UUID userToken) {
+ private void notifyBusOfInvoiceAdjustment(final UUID invoiceId, final UUID accountId, final UUID userToken, final InternalCallContext context) {
try {
- eventBus.post(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken));
+ eventBus.post(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken), context);
} catch (EventBusException e) {
log.warn("Failed to post adjustment event for invoice " + invoiceId, e);
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/AuditedInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/AuditedInvoiceDao.java
index 61bc576..90f071d 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/AuditedInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/AuditedInvoiceDao.java
@@ -55,13 +55,13 @@ import com.ning.billing.invoice.model.RecurringInvoiceItem;
import com.ning.billing.invoice.model.RefundAdjInvoiceItem;
import com.ning.billing.invoice.notification.NextBillingDatePoster;
import com.ning.billing.util.ChangeType;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.Bus.EventBusException;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.dao.EntityAudit;
import com.ning.billing.util.dao.TableName;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
@@ -399,7 +399,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
}
// Notify the bus since the balance of the invoice changed
- notifyBusOfInvoiceAdjustment(transactional, invoice.getId(), invoice.getAccountId(), context.getUserToken());
+ notifyBusOfInvoiceAdjustment(transactional, invoice.getId(), invoice.getAccountId(), context.getUserToken(), context);
// Save audit logs
transactional.insertAuditFromTransaction(audits, context);
@@ -540,7 +540,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
// Notify the bus since the balance of the invoice changed
final UUID accountId = transactional.getAccountIdFromInvoicePaymentId(chargeBack.getId().toString(), context);
- notifyBusOfInvoiceAdjustment(transactional, payment.getInvoiceId(), accountId, context.getUserToken());
+ notifyBusOfInvoiceAdjustment(transactional, payment.getInvoiceId(), accountId, context.getUserToken(), context);
return chargeBack;
}
@@ -634,7 +634,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
}
// Notify the bus since the balance of the invoice changed
- notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken());
+ notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken(), context);
// Save audit logs
transactional.insertAuditFromTransaction(audits, context);
@@ -673,7 +673,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
insertItemAndAddCBAIfNeeded(transactional, credit, audits, context);
// Notify the bus since the balance of the invoice changed
- notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken());
+ notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken(), context);
// Save audit logs
transactional.insertAuditFromTransaction(audits, context);
@@ -694,7 +694,7 @@ public class AuditedInvoiceDao implements InvoiceDao {
final InvoiceItem invoiceItemAdjustment = createAdjustmentItem(transactional, invoiceId, invoiceItemId, positiveAdjAmount,
currency, effectiveDate, context);
insertItemAndAddCBAIfNeeded(transactional, invoiceItemAdjustment, audits, context);
- notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken());
+ notifyBusOfInvoiceAdjustment(transactional, invoiceId, accountId, context.getUserToken(), context);
// Save audit logs
transactional.insertAuditFromTransaction(audits, context);
@@ -975,9 +975,10 @@ public class AuditedInvoiceDao implements InvoiceDao {
}
}
- private void notifyBusOfInvoiceAdjustment(final Transmogrifier transactional, final UUID invoiceId, final UUID accountId, final UUID userToken) {
+ private void notifyBusOfInvoiceAdjustment(final Transmogrifier transactional, final UUID invoiceId, final UUID accountId,
+ final UUID userToken, final InternalCallContext context) {
try {
- eventBus.postFromTransaction(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken), transactional);
+ eventBus.postFromTransaction(new DefaultInvoiceAdjustmentEvent(invoiceId, accountId, userToken), transactional, context);
} catch (EventBusException e) {
log.warn("Failed to post adjustment event for invoice " + invoiceId, e);
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
index ba0d380..8fbecb4 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/InvoiceDispatcher.java
@@ -48,8 +48,6 @@ import com.ning.billing.invoice.generator.InvoiceDateUtils;
import com.ning.billing.invoice.generator.InvoiceGenerator;
import com.ning.billing.invoice.model.FixedPriceInvoiceItem;
import com.ning.billing.invoice.model.RecurringInvoiceItem;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.Bus.EventBusException;
import com.ning.billing.util.bus.BusEvent;
import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.callcontext.InternalCallContext;
@@ -63,6 +61,8 @@ import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
import com.ning.billing.util.svcapi.junction.BillingEventSet;
import com.ning.billing.util.svcapi.junction.BillingInternalApi;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
@@ -175,7 +175,7 @@ public class InvoiceDispatcher {
log.info("Generated null invoice.");
if (!dryRun) {
final BusEvent event = new DefaultNullInvoiceEvent(accountId, clock.getUTCToday(), context.getUserToken());
- postEvent(event, accountId);
+ postEvent(event, accountId, internalCallContext);
}
} else {
log.info("Generated invoice {} with {} items.", invoice.getId().toString(), invoice.getNumberOfItems());
@@ -199,7 +199,7 @@ public class InvoiceDispatcher {
context.getUserToken());
if (isRealInvoiceWithItems) {
- postEvent(event, accountId);
+ postEvent(event, accountId, internalCallContext);
}
}
}
@@ -234,9 +234,9 @@ public class InvoiceDispatcher {
}
}
- private void postEvent(final BusEvent event, final UUID accountId) {
+ private void postEvent(final BusEvent event, final UUID accountId, final InternalCallContext context) {
try {
- eventBus.post(event);
+ eventBus.post(event, context);
} catch (EventBusException e) {
log.error(String.format("Failed to post event %s for account %s", event.getBusEventType(), accountId), e);
}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/api/invoice/TestDefaultInvoicePaymentApi.java b/invoice/src/test/java/com/ning/billing/invoice/api/invoice/TestDefaultInvoicePaymentApi.java
index 0fb1ae3..cbe61de 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/api/invoice/TestDefaultInvoicePaymentApi.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/api/invoice/TestDefaultInvoicePaymentApi.java
@@ -47,10 +47,10 @@ import com.ning.billing.invoice.dao.InvoiceItemSqlDao;
import com.ning.billing.invoice.dao.InvoiceSqlDao;
import com.ning.billing.invoice.notification.MockNextBillingDatePoster;
import com.ning.billing.invoice.notification.NextBillingDatePoster;
-import com.ning.billing.util.bus.Bus;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
diff --git a/invoice/src/test/java/com/ning/billing/invoice/api/migration/InvoiceApiTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/api/migration/InvoiceApiTestBase.java
index 908fb45..e1b7176 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/api/migration/InvoiceApiTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/api/migration/InvoiceApiTestBase.java
@@ -50,7 +50,7 @@ import com.ning.billing.invoice.notification.NullInvoiceNotifier;
import com.ning.billing.invoice.tests.InvoicingTestBase;
import com.ning.billing.mock.api.MockBillCycleDay;
import com.ning.billing.util.api.TagUserApi;
-import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.svcsapi.bus.BusService;
import com.ning.billing.util.bus.DefaultBusService;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
index f5e0046..01a6bff 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/InvoiceDaoTestBase.java
@@ -35,10 +35,10 @@ import com.ning.billing.invoice.glue.InvoiceModuleWithEmbeddedDb;
import com.ning.billing.invoice.notification.MockNextBillingDatePoster;
import com.ning.billing.invoice.notification.NextBillingDatePoster;
import com.ning.billing.invoice.tests.InvoicingTestBase;
-import com.ning.billing.util.bus.Bus;
import com.ning.billing.util.bus.InMemoryBus;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.tag.api.user.TagEventBuilder;
public class InvoiceDaoTestBase extends InvoicingTestBase {
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
index 58f5990..c689a21 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/MockInvoiceDao.java
@@ -34,7 +34,7 @@ import com.ning.billing.invoice.api.InvoiceApiException;
import com.ning.billing.invoice.api.InvoiceItem;
import com.ning.billing.invoice.api.InvoicePayment;
import com.ning.billing.invoice.api.user.DefaultInvoiceCreationEvent;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
@@ -59,7 +59,7 @@ public class MockInvoiceDao implements InvoiceDao {
try {
eventBus.post(new DefaultInvoiceCreationEvent(invoice.getId(), invoice.getAccountId(),
invoice.getBalance(), invoice.getCurrency(),
- null));
+ null), context);
} catch (Bus.EventBusException ex) {
throw new RuntimeException(ex);
}
diff --git a/invoice/src/test/java/com/ning/billing/invoice/dao/TestDefaultInvoiceDao.java b/invoice/src/test/java/com/ning/billing/invoice/dao/TestDefaultInvoiceDao.java
index 3d860a1..7a114a1 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/dao/TestDefaultInvoiceDao.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/dao/TestDefaultInvoiceDao.java
@@ -35,9 +35,9 @@ import com.ning.billing.invoice.api.Invoice;
import com.ning.billing.invoice.api.InvoiceApiException;
import com.ning.billing.invoice.api.InvoicePayment;
import com.ning.billing.invoice.notification.NextBillingDatePoster;
-import com.ning.billing.util.bus.Bus;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.clock.Clock;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.tag.dao.MockTagDao;
import com.ning.billing.util.tag.dao.MockTagDefinitionDao;
import com.ning.billing.util.tag.dao.TagDao;
diff --git a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
index 6d287e1..9f3c2f9 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/notification/TestNextBillingDateNotifier.java
@@ -50,7 +50,7 @@ import com.ning.billing.invoice.glue.InvoiceModuleWithMocks;
import com.ning.billing.lifecycle.KillbillService;
import com.ning.billing.mock.glue.MockClockModule;
import com.ning.billing.mock.glue.MockJunctionModule;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.callcontext.CallContextFactory;
import com.ning.billing.util.callcontext.DefaultCallContextFactory;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
diff --git a/invoice/src/test/java/com/ning/billing/invoice/TestInvoiceDispatcher.java b/invoice/src/test/java/com/ning/billing/invoice/TestInvoiceDispatcher.java
index 76aa307..e6889a3 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/TestInvoiceDispatcher.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/TestInvoiceDispatcher.java
@@ -54,7 +54,7 @@ import com.ning.billing.invoice.notification.NextBillingDateNotifier;
import com.ning.billing.invoice.notification.NullInvoiceNotifier;
import com.ning.billing.invoice.tests.InvoicingTestBase;
import com.ning.billing.mock.api.MockBillCycleDay;
-import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.svcsapi.bus.BusService;
import com.ning.billing.util.bus.DefaultBusService;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.callcontext.InternalTenantContext;
diff --git a/invoice/src/test/java/com/ning/billing/invoice/tests/TestChargeBacks.java b/invoice/src/test/java/com/ning/billing/invoice/tests/TestChargeBacks.java
index 01576b8..28f1bc9 100644
--- a/invoice/src/test/java/com/ning/billing/invoice/tests/TestChargeBacks.java
+++ b/invoice/src/test/java/com/ning/billing/invoice/tests/TestChargeBacks.java
@@ -51,10 +51,10 @@ import com.ning.billing.invoice.dao.InvoiceSqlDao;
import com.ning.billing.invoice.glue.InvoiceModuleWithEmbeddedDb;
import com.ning.billing.invoice.notification.MockNextBillingDatePoster;
import com.ning.billing.invoice.notification.NextBillingDatePoster;
-import com.ning.billing.util.bus.Bus;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.clock.ClockMock;
+import com.ning.billing.util.svcsapi.bus.Bus;
public class TestChargeBacks extends InvoiceTestSuiteWithEmbeddedDB {
diff --git a/junction/src/main/java/com/ning/billing/junction/dao/BlockingStateSqlDao.java b/junction/src/main/java/com/ning/billing/junction/dao/BlockingStateSqlDao.java
index c84933c..776e144 100644
--- a/junction/src/main/java/com/ning/billing/junction/dao/BlockingStateSqlDao.java
+++ b/junction/src/main/java/com/ning/billing/junction/dao/BlockingStateSqlDao.java
@@ -95,7 +95,7 @@ public interface BlockingStateSqlDao extends BlockingStateDao, CloseMe, Transmog
final boolean blockBilling;
final Type type;
try {
- timestamp = new DateTime(r.getDate("created_date"));
+ timestamp = getDateTime(r, "created_date");
blockableId = UUID.fromString(r.getString("id"));
stateName = r.getString("state") == null ? DefaultBlockingState.CLEAR_STATE_NAME : r.getString("state");
type = Type.get(r.getString("type"));
diff --git a/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java b/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
index 1b849c3..3272f2c 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/applicator/OverdueStateApplicator.java
@@ -45,7 +45,6 @@ import com.ning.billing.overdue.OverdueService;
import com.ning.billing.overdue.OverdueState;
import com.ning.billing.overdue.config.api.BillingState;
import com.ning.billing.overdue.config.api.OverdueException;
-import com.ning.billing.util.bus.Bus;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.clock.Clock;
@@ -57,6 +56,7 @@ import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.entitlement.EntitlementInternalApi;
import com.ning.billing.util.svcapi.junction.BlockingApi;
import com.ning.billing.util.svcapi.junction.DefaultBlockingState;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
@@ -129,7 +129,7 @@ public class OverdueStateApplicator<T extends Blockable> {
}
try {
- bus.post(createOverdueEvent(overdueable, previousOverdueStateName, nextOverdueState.getName()));
+ bus.post(createOverdueEvent(overdueable, previousOverdueStateName, nextOverdueState.getName()), context);
} catch (Exception e) {
log.error("Error posting overdue change event to bus", e);
}
diff --git a/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java b/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java
index acce124..a128a26 100644
--- a/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java
+++ b/overdue/src/main/java/com/ning/billing/overdue/service/DefaultOverdueService.java
@@ -32,8 +32,8 @@ import com.ning.billing.overdue.api.DefaultOverdueUserApi;
import com.ning.billing.overdue.config.OverdueConfig;
import com.ning.billing.overdue.listener.OverdueListener;
import com.ning.billing.overdue.wrapper.OverdueWrapperFactory;
-import com.ning.billing.util.bus.Bus.EventBusException;
-import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
+import com.ning.billing.util.svcsapi.bus.BusService;
import com.ning.billing.util.config.XMLLoader;
public class DefaultOverdueService implements ExtendedOverdueService {
diff --git a/overdue/src/test/java/com/ning/billing/overdue/applicator/TestOverdueStateApplicator.java b/overdue/src/test/java/com/ning/billing/overdue/applicator/TestOverdueStateApplicator.java
index 7ce68f7..abd5869 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/applicator/TestOverdueStateApplicator.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/applicator/TestOverdueStateApplicator.java
@@ -34,7 +34,7 @@ import com.ning.billing.overdue.OverdueChangeEvent;
import com.ning.billing.overdue.OverdueState;
import com.ning.billing.overdue.OverdueTestBase;
import com.ning.billing.overdue.config.OverdueConfig;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.config.XMLLoader;
import com.ning.billing.util.svcapi.junction.DefaultBlockingState;
diff --git a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
index ccc589c..babb41e 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
@@ -55,7 +55,7 @@ import com.ning.billing.overdue.OverdueProperties;
import com.ning.billing.overdue.OverdueTestSuiteWithEmbeddedDB;
import com.ning.billing.overdue.glue.DefaultOverdueModule;
import com.ning.billing.overdue.listener.OverdueListener;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.callcontext.CallContextFactory;
import com.ning.billing.util.callcontext.DefaultCallContextFactory;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
diff --git a/overdue/src/test/java/com/ning/billing/overdue/OverdueTestBase.java b/overdue/src/test/java/com/ning/billing/overdue/OverdueTestBase.java
index 64d8524..5032bb8 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/OverdueTestBase.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/OverdueTestBase.java
@@ -55,7 +55,7 @@ import com.ning.billing.overdue.config.OverdueConfig;
import com.ning.billing.overdue.glue.DefaultOverdueModule;
import com.ning.billing.overdue.service.DefaultOverdueService;
import com.ning.billing.overdue.wrapper.OverdueWrapperFactory;
-import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.svcsapi.bus.BusService;
import com.ning.billing.util.callcontext.TenantContext;
import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.email.EmailModule;
diff --git a/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java
index 6419c15..4b79f9e 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/PaymentMethodProcessor.java
@@ -44,12 +44,12 @@ import com.ning.billing.payment.plugin.api.PaymentPluginApiException;
import com.ning.billing.payment.provider.DefaultNoOpPaymentMethodPlugin;
import com.ning.billing.payment.provider.ExternalPaymentProviderPlugin;
import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
-import com.ning.billing.util.bus.Bus;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.globallocker.GlobalLocker;
import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.tag.TagInternalApi;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
diff --git a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
index 03f1f95..bb5e678 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/PaymentProcessor.java
@@ -59,7 +59,6 @@ import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
import com.ning.billing.payment.retry.AutoPayRetryService.AutoPayRetryServiceScheduler;
import com.ning.billing.payment.retry.FailedPaymentRetryService.FailedPaymentRetryServiceScheduler;
import com.ning.billing.payment.retry.PluginFailureRetryService.PluginFailureRetryServiceScheduler;
-import com.ning.billing.util.bus.Bus;
import com.ning.billing.util.bus.BusEvent;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
@@ -67,6 +66,7 @@ import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.globallocker.GlobalLocker;
import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.tag.TagInternalApi;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
@@ -214,7 +214,7 @@ public class PaymentProcessor extends ProcessorBase {
// This means that events will be posted for null and zero dollar invoices (e.g. trials).
final PaymentErrorEvent event = new DefaultPaymentErrorEvent(account.getId(), invoiceId, null,
ErrorCode.PAYMENT_NO_DEFAULT_PAYMENT_METHOD.toString(), context.getUserToken());
- postPaymentEvent(event, account.getId());
+ postPaymentEvent(event, account.getId(), context);
throw e;
}
@@ -484,7 +484,7 @@ public class PaymentProcessor extends ProcessorBase {
throw new PaymentApiException(ErrorCode.INVOICE_NOT_FOUND, invoice.getId(), e.toString());
} finally {
if (event != null) {
- postPaymentEvent(event, account.getId());
+ postPaymentEvent(event, account.getId(), context);
}
}
return new DefaultPayment(payment, allAttempts, Collections.<RefundModelDao>emptyList());
diff --git a/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java b/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
index 6a9ab3b..d04c1bc 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/ProcessorBase.java
@@ -33,8 +33,6 @@ import com.ning.billing.payment.dao.PaymentMethodModelDao;
import com.ning.billing.payment.plugin.api.PaymentPluginApi;
import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
import com.ning.billing.util.api.TagApiException;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.Bus.EventBusException;
import com.ning.billing.util.bus.BusEvent;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
@@ -45,6 +43,8 @@ import com.ning.billing.util.globallocker.GlobalLocker.LockerType;
import com.ning.billing.util.globallocker.LockFailedException;
import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.tag.TagInternalApi;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.ning.billing.util.tag.ControlTagType;
import com.ning.billing.util.tag.Tag;
@@ -125,12 +125,12 @@ public abstract class ProcessorBase {
return getPaymentProviderPlugin(paymentMethodId, context);
}
- protected void postPaymentEvent(final BusEvent ev, final UUID accountId) {
+ protected void postPaymentEvent(final BusEvent ev, final UUID accountId, final InternalCallContext context) {
if (ev == null) {
return;
}
try {
- eventBus.post(ev);
+ eventBus.post(ev, context);
} catch (EventBusException e) {
log.error("Failed to post Payment event event for account {} ", accountId, e);
}
diff --git a/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java b/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java
index f536241..97e3848 100644
--- a/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/core/RefundProcessor.java
@@ -49,7 +49,6 @@ import com.ning.billing.payment.dao.RefundModelDao.RefundStatus;
import com.ning.billing.payment.plugin.api.PaymentPluginApi;
import com.ning.billing.payment.plugin.api.PaymentPluginApiException;
import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
-import com.ning.billing.util.bus.Bus;
import com.ning.billing.util.callcontext.CallOrigin;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
@@ -58,6 +57,7 @@ import com.ning.billing.util.callcontext.UserType;
import com.ning.billing.util.globallocker.GlobalLocker;
import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.tag.TagInternalApi;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.google.common.base.Function;
import com.google.common.base.Objects;
diff --git a/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java b/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
index cc97e10..1991433 100644
--- a/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
+++ b/payment/src/main/java/com/ning/billing/payment/glue/DefaultPaymentService.java
@@ -29,7 +29,7 @@ import com.ning.billing.payment.bus.TagHandler;
import com.ning.billing.payment.retry.AutoPayRetryService;
import com.ning.billing.payment.retry.FailedPaymentRetryService;
import com.ning.billing.payment.retry.PluginFailureRetryService;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.notificationq.NotificationQueueService.NoSuchNotificationQueue;
import com.ning.billing.util.notificationq.NotificationQueueService.NotificationQueueAlreadyExists;
diff --git a/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java b/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
index 690305d..152f6ba 100644
--- a/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
+++ b/payment/src/test/java/com/ning/billing/payment/api/TestPaymentApi.java
@@ -45,8 +45,8 @@ import com.ning.billing.payment.TestHelper;
import com.ning.billing.payment.api.Payment.PaymentAttempt;
import com.ning.billing.payment.glue.PaymentTestModuleWithMocks;
import com.ning.billing.payment.provider.DefaultNoOpPaymentMethodPlugin;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.Bus.EventBusException;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.glue.CallContextModule;
diff --git a/payment/src/test/java/com/ning/billing/payment/core/TestPaymentMethodProcessor.java b/payment/src/test/java/com/ning/billing/payment/core/TestPaymentMethodProcessor.java
index 318c4e0..1b73ace 100644
--- a/payment/src/test/java/com/ning/billing/payment/core/TestPaymentMethodProcessor.java
+++ b/payment/src/test/java/com/ning/billing/payment/core/TestPaymentMethodProcessor.java
@@ -32,11 +32,11 @@ import com.ning.billing.payment.api.PaymentMethod;
import com.ning.billing.payment.dao.MockPaymentDao;
import com.ning.billing.payment.provider.DefaultPaymentProviderPluginRegistry;
import com.ning.billing.payment.provider.ExternalPaymentProviderPlugin;
-import com.ning.billing.util.bus.Bus;
import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.globallocker.GlobalLocker;
import com.ning.billing.util.svcapi.account.AccountInternalApi;
import com.ning.billing.util.svcapi.tag.TagInternalApi;
+import com.ning.billing.util.svcsapi.bus.Bus;
public class TestPaymentMethodProcessor extends PaymentTestSuite {
diff --git a/payment/src/test/java/com/ning/billing/payment/TestHelper.java b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
index a57697f..60d9ad5 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestHelper.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestHelper.java
@@ -34,8 +34,9 @@ import com.ning.billing.payment.api.PaymentApi;
import com.ning.billing.payment.api.PaymentMethodPlugin;
import com.ning.billing.payment.glue.PaymentTestModuleWithMocks;
import com.ning.billing.payment.provider.DefaultNoOpPaymentMethodPlugin;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.Bus.EventBusException;
+import com.ning.billing.util.callcontext.InternalCallContextFactory;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus.EventBusException;
import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.callcontext.CallContextFactory;
import com.ning.billing.util.callcontext.CallOrigin;
@@ -52,15 +53,17 @@ public class TestHelper {
private final CallContext context;
private final Bus eventBus;
private final Clock clock;
+ private final InternalCallContextFactory internalCallContextFactory;
@Inject
public TestHelper(final CallContextFactory factory, final AccountUserApi accountUserApi, final InvoicePaymentApi invoicePaymentApi,
- final PaymentApi paymentApi, final Bus eventBus, final Clock clock) {
+ final PaymentApi paymentApi, final Bus eventBus, final Clock clock, final InternalCallContextFactory internalCallContextFactory) {
this.eventBus = eventBus;
this.accountUserApi = accountUserApi;
this.invoicePaymentApi = invoicePaymentApi;
this.paymentApi = paymentApi;
this.clock = clock;
+ this.internalCallContextFactory = internalCallContextFactory;
context = factory.createCallContext(null, "Princess Buttercup", CallOrigin.TEST, UserType.TEST);
}
@@ -94,7 +97,7 @@ public class TestHelper {
invoice.getInvoiceDate(),
context.getUserToken());
- eventBus.post(event);
+ eventBus.post(event, internalCallContextFactory.createInternalCallContext(account.getId(), context));
return invoice;
}
diff --git a/payment/src/test/java/com/ning/billing/payment/TestRetryService.java b/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
index a57a177..20a4996 100644
--- a/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
+++ b/payment/src/test/java/com/ning/billing/payment/TestRetryService.java
@@ -48,7 +48,7 @@ import com.ning.billing.payment.provider.MockPaymentProviderPlugin;
import com.ning.billing.payment.provider.PaymentProviderPluginRegistry;
import com.ning.billing.payment.retry.FailedPaymentRetryService;
import com.ning.billing.payment.retry.PluginFailureRetryService;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.clock.ClockMock;
import com.ning.billing.util.glue.CallContextModule;
diff --git a/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java b/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
index c230b33..8ae4937 100644
--- a/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
+++ b/server/src/main/java/com/ning/billing/server/listeners/KillbillGuiceListener.java
@@ -27,8 +27,8 @@ import com.ning.billing.server.config.KillbillServerConfig;
import com.ning.billing.server.healthchecks.KillbillHealthcheck;
import com.ning.billing.server.modules.KillbillServerModule;
import com.ning.billing.server.security.TenantFilter;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.BusService;
import com.ning.jetty.base.modules.ServerModuleBuilder;
import com.ning.jetty.core.listeners.SetupServer;
diff --git a/server/src/test/java/com/ning/billing/server/security/TestKillbillJdbcRealm.java b/server/src/test/java/com/ning/billing/server/security/TestKillbillJdbcRealm.java
index 2ec77e1..aa8344d 100644
--- a/server/src/test/java/com/ning/billing/server/security/TestKillbillJdbcRealm.java
+++ b/server/src/test/java/com/ning/billing/server/security/TestKillbillJdbcRealm.java
@@ -33,7 +33,7 @@ import com.ning.billing.dbi.MysqlTestingHelper;
import com.ning.billing.server.ServerTestSuiteWithEmbeddedDB;
import com.ning.billing.tenant.api.DefaultTenant;
import com.ning.billing.tenant.dao.DefaultTenantDao;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.jolbox.bonecp.BoneCPConfig;
import com.jolbox.bonecp.BoneCPDataSource;
diff --git a/tenant/src/main/java/com/ning/billing/tenant/dao/DefaultTenantDao.java b/tenant/src/main/java/com/ning/billing/tenant/dao/DefaultTenantDao.java
index 04d2705..e50406a 100644
--- a/tenant/src/main/java/com/ning/billing/tenant/dao/DefaultTenantDao.java
+++ b/tenant/src/main/java/com/ning/billing/tenant/dao/DefaultTenantDao.java
@@ -29,7 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ning.billing.tenant.api.Tenant;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.entity.EntityPersistenceException;
diff --git a/tenant/src/test/java/com/ning/billing/tenant/dao/TestDefaultTenantDao.java b/tenant/src/test/java/com/ning/billing/tenant/dao/TestDefaultTenantDao.java
index ad51b08..c3ef3a7 100644
--- a/tenant/src/test/java/com/ning/billing/tenant/dao/TestDefaultTenantDao.java
+++ b/tenant/src/test/java/com/ning/billing/tenant/dao/TestDefaultTenantDao.java
@@ -28,7 +28,7 @@ import org.testng.annotations.Test;
import com.ning.billing.tenant.TenantTestSuiteWithEmbeddedDb;
import com.ning.billing.tenant.api.DefaultTenant;
import com.ning.billing.tenant.security.KillbillCredentialsMatcher;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
public class TestDefaultTenantDao extends TenantTestSuiteWithEmbeddedDb {
diff --git a/util/src/main/java/com/ning/billing/util/bus/DefaultBusService.java b/util/src/main/java/com/ning/billing/util/bus/DefaultBusService.java
index 64370ee..79e0f87 100644
--- a/util/src/main/java/com/ning/billing/util/bus/DefaultBusService.java
+++ b/util/src/main/java/com/ning/billing/util/bus/DefaultBusService.java
@@ -18,6 +18,8 @@ package com.ning.billing.util.bus;
import com.ning.billing.lifecycle.LifecycleHandlerType;
import com.ning.billing.lifecycle.LifecycleHandlerType.LifecycleLevel;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.BusService;
import com.google.inject.Inject;
diff --git a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
index 1e082b1..e7d969d 100644
--- a/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/InMemoryBus.java
@@ -25,6 +25,9 @@ import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.ning.billing.util.callcontext.InternalCallContext;
+import com.ning.billing.util.svcsapi.bus.Bus;
+
import com.google.common.eventbus.AsyncEventBus;
public class InMemoryBus implements Bus {
@@ -87,13 +90,13 @@ public class InMemoryBus implements Bus {
}
@Override
- public void post(final BusEvent event) throws EventBusException {
+ public void post(final BusEvent event, final InternalCallContext context) throws EventBusException {
checkInitialized("post");
delegate.post(event);
}
@Override
- public void postFromTransaction(final BusEvent event, final Transmogrifier dao) throws EventBusException {
+ public void postFromTransaction(final BusEvent event, final Transmogrifier dao, final InternalCallContext context) throws EventBusException {
checkInitialized("postFromTransaction");
delegate.post(event);
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
index a6d6e92..be503b7 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
@@ -22,8 +22,6 @@ import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import javax.annotation.Nullable;
-
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
@@ -40,6 +38,7 @@ import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.callcontext.UserType;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.queue.PersistentQueueBase;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.google.common.eventbus.EventBus;
import com.google.inject.Inject;
@@ -108,7 +107,8 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
@Override
public int doProcessEvents() {
- final InternalCallContext context = createCallContext(null);
+ // Note: retrieving and clearing bus events is not done per tenant (yet?)
+ final InternalCallContext context = internalCallContextFactory.createInternalCallContext("PersistentBus", CallOrigin.INTERNAL, UserType.SYSTEM, null);
final List<BusEventEntry> events = getNextBusEvent(context);
if (events.size() == 0) {
@@ -154,36 +154,31 @@ public class PersistentBus extends PersistentQueueBase implements Bus {
}
@Override
- public void post(final BusEvent event) throws EventBusException {
+ public void post(final BusEvent event, final InternalCallContext context) throws EventBusException {
dao.inTransaction(new Transaction<Void, PersistentBusSqlDao>() {
@Override
public Void inTransaction(final PersistentBusSqlDao transactional,
final TransactionStatus status) throws Exception {
- postFromTransaction(event, transactional);
+ postFromTransaction(event, context, transactional);
return null;
}
});
}
@Override
- public void postFromTransaction(final BusEvent event, final Transmogrifier transmogrifier)
+ public void postFromTransaction(final BusEvent event, final Transmogrifier transmogrifier, final InternalCallContext context)
throws EventBusException {
final PersistentBusSqlDao transactional = transmogrifier.become(PersistentBusSqlDao.class);
- postFromTransaction(event, transactional);
+ postFromTransaction(event, context, transactional);
}
- private void postFromTransaction(final BusEvent event, final PersistentBusSqlDao transactional) {
+ private void postFromTransaction(final BusEvent event, final InternalCallContext context, final PersistentBusSqlDao transactional) {
try {
final String json = objectMapper.writeValueAsString(event);
final BusEventEntry entry = new BusEventEntry(hostname, event.getClass().getName(), json);
- transactional.insertBusEvent(entry, createCallContext(event));
+ transactional.insertBusEvent(entry, context);
} catch (Exception e) {
log.error("Failed to post BusEvent " + event, e);
}
}
-
- private InternalCallContext createCallContext(@Nullable final BusEvent event) {
- return internalCallContextFactory.createInternalCallContext("PersistentBus", CallOrigin.INTERNAL, UserType.SYSTEM,
- event == null ? null : event.getUserToken());
- }
}
diff --git a/util/src/main/java/com/ning/billing/util/callcontext/InternalCallContextFactory.java b/util/src/main/java/com/ning/billing/util/callcontext/InternalCallContextFactory.java
index 8ad3651..eb3dea1 100644
--- a/util/src/main/java/com/ning/billing/util/callcontext/InternalCallContextFactory.java
+++ b/util/src/main/java/com/ning/billing/util/callcontext/InternalCallContextFactory.java
@@ -144,6 +144,13 @@ public class InternalCallContextFactory {
return new InternalCallContext(tenantRecordId, accountRecordId, context);
}
+ // Used when we need to re-hydrate the context with the account_record_id (when creating the account)
+ public InternalCallContext createInternalCallContext(final Long accountRecordId, final InternalCallContext context) {
+ return new InternalCallContext(context.getTenantRecordId(), accountRecordId, context.getUserToken(), context.getUserName(),
+ context.getCallOrigin(), context.getUserType(), context.getReasonCode(), context.getComment(),
+ context.getCreatedDate(), context.getUpdatedDate());
+ }
+
private Long getTenantRecordId(final TenantContext context) {
// Default to single default tenant (e.g. single tenant mode)
if (context.getTenantId() == null) {
diff --git a/util/src/main/java/com/ning/billing/util/glue/BusModule.java b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
index 3d2261b..814a2a0 100644
--- a/util/src/main/java/com/ning/billing/util/glue/BusModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
@@ -18,8 +18,8 @@ package com.ning.billing.util.glue;
import org.skife.config.ConfigurationObjectFactory;
-import com.ning.billing.util.bus.Bus;
-import com.ning.billing.util.bus.BusService;
+import com.ning.billing.util.svcsapi.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.BusService;
import com.ning.billing.util.bus.DefaultBusService;
import com.ning.billing.util.bus.InMemoryBus;
import com.ning.billing.util.bus.PersistentBus;
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
index d7f6ffc..26b13ed 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/AuditedTagDao.java
@@ -33,7 +33,7 @@ import com.ning.billing.ErrorCode;
import com.ning.billing.util.ChangeType;
import com.ning.billing.util.api.TagApiException;
import com.ning.billing.util.api.TagDefinitionApiException;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.dao.AuditedCollectionDaoBase;
@@ -130,7 +130,7 @@ public class AuditedTagDao extends AuditedCollectionDaoBase<Tag, Tag> implements
tagEvent = tagEventBuilder.newUserTagCreationEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
}
try {
- bus.postFromTransaction(tagEvent, AuditedTagDao.this.tagSqlDao);
+ bus.postFromTransaction(tagEvent, tagSqlDao, context);
} catch (Bus.EventBusException e) {
log.warn("Failed to post tag creation event for tag " + tag.getId().toString(), e);
}
@@ -180,7 +180,7 @@ public class AuditedTagDao extends AuditedCollectionDaoBase<Tag, Tag> implements
tagEvent = tagEventBuilder.newUserTagDeletionEvent(tag.getId(), objectId, objectType, tagDefinition, context.getUserToken());
}
try {
- bus.postFromTransaction(tagEvent, tagSqlDao);
+ bus.postFromTransaction(tagEvent, tagSqlDao, context);
} catch (Bus.EventBusException e) {
log.warn("Failed to post tag deletion event for tag " + tag.getId().toString(), e);
}
diff --git a/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java b/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
index 1cd1615..5fe367e 100644
--- a/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
+++ b/util/src/main/java/com/ning/billing/util/tag/dao/DefaultTagDefinitionDao.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
import com.ning.billing.ErrorCode;
import com.ning.billing.util.api.TagDefinitionApiException;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalTenantContext;
import com.ning.billing.util.tag.ControlTagType;
@@ -144,7 +144,7 @@ public class DefaultTagDefinitionDao implements TagDefinitionDao {
tagDefinitionEvent = tagEventBuilder.newUserTagDefinitionCreationEvent(tagDefinition.getId(), tagDefinition, context.getUserToken());
}
try {
- bus.postFromTransaction(tagDefinitionEvent, tagDefinitionSqlDao);
+ bus.postFromTransaction(tagDefinitionEvent, tagDefinitionSqlDao, context);
} catch (Bus.EventBusException e) {
log.warn("Failed to post tag definition creation event for tag " + tagDefinition.getId(), e);
}
@@ -199,7 +199,7 @@ public class DefaultTagDefinitionDao implements TagDefinitionDao {
tagDefinitionEvent = tagEventBuilder.newUserTagDefinitionDeletionEvent(tagDefinition.getId(), tagDefinition, context.getUserToken());
}
try {
- bus.postFromTransaction(tagDefinitionEvent, tagDefinitionSqlDao);
+ bus.postFromTransaction(tagDefinitionEvent, tagDefinitionSqlDao, context);
} catch (Bus.EventBusException e) {
log.warn("Failed to post tag definition deletion event for tag " + tagDefinition.getId(), e);
}
diff --git a/util/src/test/java/com/ning/billing/util/audit/dao/TestDefaultAuditDao.java b/util/src/test/java/com/ning/billing/util/audit/dao/TestDefaultAuditDao.java
index db20a04..84386f2 100644
--- a/util/src/test/java/com/ning/billing/util/audit/dao/TestDefaultAuditDao.java
+++ b/util/src/test/java/com/ning/billing/util/audit/dao/TestDefaultAuditDao.java
@@ -35,7 +35,7 @@ import com.ning.billing.util.api.AuditLevel;
import com.ning.billing.util.api.TagApiException;
import com.ning.billing.util.api.TagDefinitionApiException;
import com.ning.billing.util.audit.AuditLog;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.dao.ObjectType;
import com.ning.billing.util.dao.TableName;
diff --git a/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java b/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java
index 38ac2fc..57a95ef 100644
--- a/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java
+++ b/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java
@@ -26,6 +26,7 @@ import org.testng.annotations.BeforeClass;
import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
import com.ning.billing.util.bus.BusEvent.BusEventType;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -200,7 +201,7 @@ public abstract class TestEventBusBase extends UtilTestSuiteWithEmbeddedDB {
final MyEventHandler handler = new MyEventHandler(1);
eventBus.register(handler);
- eventBus.post(new MyEventWithException("my-event", 1L, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()));
+ eventBus.post(new MyEventWithException("my-event", 1L, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()), internalCallContext);
Thread.sleep(50000);
} catch (Exception ignored) {
@@ -214,7 +215,7 @@ public abstract class TestEventBusBase extends UtilTestSuiteWithEmbeddedDB {
eventBus.register(handler);
for (int i = 0; i < nbEvents; i++) {
- eventBus.post(new MyEvent("my-event", (long) i, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()));
+ eventBus.post(new MyEvent("my-event", (long) i, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()), internalCallContext);
}
final boolean completed = handler.waitForCompletion(10000);
@@ -230,9 +231,9 @@ public abstract class TestEventBusBase extends UtilTestSuiteWithEmbeddedDB {
eventBus.register(handler);
for (int i = 0; i < 5; i++) {
- eventBus.post(new MyOtherEvent("my-other-event", (double) i, UUID.randomUUID(), BusEventType.BUNDLE_REPAIR.toString()));
+ eventBus.post(new MyOtherEvent("my-other-event", (double) i, UUID.randomUUID(), BusEventType.BUNDLE_REPAIR.toString()), internalCallContext);
}
- eventBus.post(new MyEvent("my-event", 11l, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()));
+ eventBus.post(new MyEvent("my-event", 11l, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()), internalCallContext);
final boolean completed = handler.waitForCompletion(10000);
Assert.assertEquals(completed, true);
diff --git a/util/src/test/java/com/ning/billing/util/tag/dao/TestAuditedTagDao.java b/util/src/test/java/com/ning/billing/util/tag/dao/TestAuditedTagDao.java
index f6dc67d..0b96e06 100644
--- a/util/src/test/java/com/ning/billing/util/tag/dao/TestAuditedTagDao.java
+++ b/util/src/test/java/com/ning/billing/util/tag/dao/TestAuditedTagDao.java
@@ -32,7 +32,7 @@ import org.testng.annotations.Test;
import com.ning.billing.dbi.MysqlTestingHelper;
import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
import com.ning.billing.util.api.TagDefinitionApiException;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.bus.BusEvent;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.dao.ObjectType;
diff --git a/util/src/test/java/com/ning/billing/util/tag/dao/TestDefaultTagDefinitionDao.java b/util/src/test/java/com/ning/billing/util/tag/dao/TestDefaultTagDefinitionDao.java
index f9283a2..b2edd12 100644
--- a/util/src/test/java/com/ning/billing/util/tag/dao/TestDefaultTagDefinitionDao.java
+++ b/util/src/test/java/com/ning/billing/util/tag/dao/TestDefaultTagDefinitionDao.java
@@ -30,7 +30,7 @@ import org.testng.annotations.Test;
import com.ning.billing.dbi.MysqlTestingHelper;
import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.bus.BusEvent;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.tag.MockTagStoreModuleSql;
diff --git a/util/src/test/java/com/ning/billing/util/tag/MockTagStoreModuleSql.java b/util/src/test/java/com/ning/billing/util/tag/MockTagStoreModuleSql.java
index aafb006..2bdc97c 100644
--- a/util/src/test/java/com/ning/billing/util/tag/MockTagStoreModuleSql.java
+++ b/util/src/test/java/com/ning/billing/util/tag/MockTagStoreModuleSql.java
@@ -21,7 +21,7 @@ import org.skife.jdbi.v2.IDBI;
import com.ning.billing.KillbillTestSuiteWithEmbeddedDB;
import com.ning.billing.dbi.MysqlTestingHelper;
import com.ning.billing.mock.glue.MockClockModule;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.bus.InMemoryBus;
import com.ning.billing.util.glue.TagStoreModule;
diff --git a/util/src/test/java/com/ning/billing/util/tag/TestTagStore.java b/util/src/test/java/com/ning/billing/util/tag/TestTagStore.java
index 9bb7802..2363604 100644
--- a/util/src/test/java/com/ning/billing/util/tag/TestTagStore.java
+++ b/util/src/test/java/com/ning/billing/util/tag/TestTagStore.java
@@ -37,7 +37,7 @@ import com.ning.billing.dbi.MysqlTestingHelper;
import com.ning.billing.util.UtilTestSuiteWithEmbeddedDB;
import com.ning.billing.util.api.TagApiException;
import com.ning.billing.util.api.TagDefinitionApiException;
-import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.svcsapi.bus.Bus;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.dao.ObjectType;
import com.ning.billing.util.tag.dao.TagDao;