killbill-memoizeit
Changes
analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java 4(+2 -2)
Details
diff --git a/account/src/main/java/com/ning/billing/account/api/DefaultAccount.java b/account/src/main/java/com/ning/billing/account/api/DefaultAccount.java
index 7fc3a32..496fb9b 100644
--- a/account/src/main/java/com/ning/billing/account/api/DefaultAccount.java
+++ b/account/src/main/java/com/ning/billing/account/api/DefaultAccount.java
@@ -53,7 +53,7 @@ public class DefaultAccount extends ExtendedEntityBase implements Account {
private final String updatedBy;
private final DateTime updatedDate;
-
+
//intended for creation and migration
public DefaultAccount(final String createdBy, final DateTime createdDate,
final String updatedBy, final DateTime updatedDate,
diff --git a/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountCreationEvent.java b/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountCreationEvent.java
index f944ccb..5dabd40 100644
--- a/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountCreationEvent.java
+++ b/account/src/main/java/com/ning/billing/account/api/user/DefaultAccountCreationEvent.java
@@ -19,6 +19,7 @@ package com.ning.billing.account.api.user;
import com.ning.billing.account.api.Account;
import com.ning.billing.account.api.AccountCreationEvent;
import com.ning.billing.account.api.AccountData;
+import com.ning.billing.account.api.DefaultAccount;
import com.ning.billing.catalog.api.Currency;
import java.util.UUID;
@@ -45,7 +46,7 @@ public class DefaultAccountCreationEvent implements AccountCreationEvent {
public DefaultAccountCreationEvent(Account data, UUID userToken) {
this.id = data.getId();
- this.data = data;
+ this.data = new DefaultAccountData(data);
this.userToken = userToken;
}
@@ -131,6 +132,29 @@ public class DefaultAccountCreationEvent implements AccountCreationEvent {
private final boolean isMigrated;
private final boolean isNotifiedForInvoices;
+
+ public DefaultAccountData(Account d) {
+ this(d.getExternalKey() != null ? d.getExternalKey().toString() : null,
+ d.getName(),
+ d.getFirstNameLength(),
+ d.getEmail(),
+ d.getBillCycleDay(),
+ d.getCurrency() != null ? d.getCurrency().name() : null,
+ d.getPaymentProviderName(),
+ d.getTimeZone() != null ? d.getTimeZone().getID() : null,
+ d.getLocale(),
+ d.getAddress1(),
+ d.getAddress2(),
+ d.getCompanyName(),
+ d.getCity(),
+ d.getStateOrProvince(),
+ d.getPostalCode(),
+ d.getCountry(),
+ d.getPhone(),
+ d.isMigrated(),
+ d.isNotifiedForInvoices());
+ }
+
@JsonCreator
public DefaultAccountData(@JsonProperty("externalKey") String externalKey,
@JsonProperty("name") String name,
diff --git a/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java b/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
index b68ae6f..4fcc49e 100644
--- a/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
+++ b/account/src/main/java/com/ning/billing/account/dao/AuditedAccountDao.java
@@ -22,32 +22,44 @@ import java.util.Iterator;
import java.util.List;
import java.util.UUID;
-import com.ning.billing.account.api.AccountEmail;
import com.ning.billing.util.ChangeType;
import com.ning.billing.util.audit.dao.AuditSqlDao;
import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.customfield.dao.CustomFieldDao;
-import com.ning.billing.util.dao.AuditedDaoBase;
import com.ning.billing.util.entity.EntityPersistenceException;
import com.ning.billing.util.tag.dao.TagDao;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.inject.Inject;
import com.ning.billing.ErrorCode;
import com.ning.billing.account.api.Account;
import com.ning.billing.account.api.AccountApiException;
import com.ning.billing.account.api.AccountChangeEvent;
import com.ning.billing.account.api.AccountCreationEvent;
+import com.ning.billing.account.api.AccountEmail;
import com.ning.billing.account.api.user.DefaultAccountChangeEvent;
import com.ning.billing.account.api.user.DefaultAccountCreationEvent;
import com.ning.billing.util.customfield.CustomField;
import com.ning.billing.util.customfield.dao.CustomFieldSqlDao;
+import com.ning.billing.util.dao.AuditedDaoBase;
import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.bus.Bus.EventBusException;
import com.ning.billing.util.tag.Tag;
+
+
+
+
public class AuditedAccountDao extends AuditedDaoBase implements AccountDao {
+
+ private final static Logger log = LoggerFactory.getLogger(AuditedAccountDao.class);
+
private static final String ACCOUNT_EMAIL_HISTORY_TABLE = "account_email_history";
+
private final AccountSqlDao accountSqlDao;
private final AccountEmailSqlDao accountEmailSqlDao;
private final TagDao tagDao;
@@ -140,7 +152,11 @@ public class AuditedAccountDao extends AuditedDaoBase implements AccountDao {
saveTagsFromWithinTransaction(account, transactionalDao, context);
saveCustomFieldsFromWithinTransaction(account, transactionalDao, context);
AccountCreationEvent creationEvent = new DefaultAccountCreationEvent(account, context.getUserToken());
- eventBus.post(creationEvent);
+ try {
+ eventBus.postFromTransaction(creationEvent, transactionalDao);
+ } catch (EventBusException e) {
+ log.warn("Failed to post account creation event for account " + account.getId(), e);
+ }
return null;
}
});
@@ -160,9 +176,9 @@ public class AuditedAccountDao extends AuditedDaoBase implements AccountDao {
try {
accountSqlDao.inTransaction(new Transaction<Void, AccountSqlDao>() {
@Override
- public Void inTransaction(final AccountSqlDao accountSqlDao, final TransactionStatus status) throws EntityPersistenceException, Bus.EventBusException {
+ public Void inTransaction(final AccountSqlDao transactional, final TransactionStatus status) throws EntityPersistenceException, Bus.EventBusException {
String accountId = account.getId().toString();
- Account currentAccount = accountSqlDao.getById(accountId);
+ Account currentAccount = transactional.getById(accountId);
if (currentAccount == null) {
throw new EntityPersistenceException(ErrorCode.ACCOUNT_DOES_NOT_EXIST_FOR_ID, accountId);
}
@@ -172,20 +188,25 @@ public class AuditedAccountDao extends AuditedDaoBase implements AccountDao {
throw new EntityPersistenceException(ErrorCode.ACCOUNT_CANNOT_CHANGE_EXTERNAL_KEY, currentKey);
}
- accountSqlDao.update(account, context);
+ transactional.update(account, context);
UUID historyId = UUID.randomUUID();
+
accountSqlDao.insertAccountHistoryFromTransaction(account, historyId.toString(), ChangeType.UPDATE, context);
AuditSqlDao auditDao = accountSqlDao.become(AuditSqlDao.class);
auditDao.insertAuditFromTransaction("account_history" ,historyId.toString(), ChangeType.INSERT, context);
- saveTagsFromWithinTransaction(account, accountSqlDao, context);
- saveCustomFieldsFromWithinTransaction(account, accountSqlDao, context);
+ saveTagsFromWithinTransaction(account, transactional, context);
+ saveCustomFieldsFromWithinTransaction(account, transactional, context);
AccountChangeEvent changeEvent = new DefaultAccountChangeEvent(account.getId(), context.getUserToken(), currentAccount, account);
if (changeEvent.hasChanges()) {
- eventBus.post(changeEvent);
+ try {
+ eventBus.postFromTransaction(changeEvent, transactional);
+ } catch (EventBusException e) {
+ log.warn("Failed to post account change event for account " + account.getId(), e);
+ }
}
return null;
}
diff --git a/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java b/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
index 66e49bb..edc1a09 100644
--- a/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
+++ b/account/src/test/java/com/ning/billing/account/dao/AccountDaoTestBase.java
@@ -37,6 +37,7 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Stage;
import com.ning.billing.account.glue.AccountModuleWithEmbeddedDb;
+import com.ning.billing.dbi.MysqlTestingHelper;
import com.ning.billing.util.bus.DefaultBusService;
import com.ning.billing.util.bus.BusService;
import org.testng.annotations.BeforeMethod;
@@ -91,6 +92,8 @@ public abstract class AccountDaoTestBase {
public Void inTransaction(Handle h, TransactionStatus status) throws Exception {
h.execute("truncate table accounts");
h.execute("truncate table notifications");
+ h.execute("truncate table bus_events");
+ h.execute("truncate table claimed_bus_events");
h.execute("truncate table claimed_notifications");
h.execute("truncate table tag_definitions");
h.execute("truncate table tags");
diff --git a/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java b/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java
index 57d164f..0044758 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/BusinessSubscriptionTransitionRecorder.java
@@ -98,13 +98,13 @@ public class BusinessSubscriptionTransitionRecorder
final SubscriptionBundle bundle = entitlementApi.getBundleFromId(transition.getBundleId());
if (bundle != null) {
transitionKey = bundle.getKey();
-
+
final Account account = accountApi.getAccountById(bundle.getAccountId());
if (account != null) {
accountKey = account.getExternalKey();
currency = account.getCurrency();
}
- }
+ }
// The ISubscriptionTransition interface gives us all the prev/next information we need but the start date
// of the previous plan. We need to retrieve it from our own transitions table
diff --git a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
index a7091f8..b567464 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/api/TestAnalyticsService.java
@@ -157,15 +157,7 @@ public class TestAnalyticsService {
private CatalogService catalogService;
private Catalog catalog;
-
- @BeforeMethod(groups = "slow")
- public void cleanup() throws Exception
- {
- helper.cleanupTable("bst");
- helper.cleanupTable("bac");
- }
-
-
+
@BeforeClass(groups = "slow")
public void startMysql() throws IOException, ClassNotFoundException, SQLException, EntitlementUserApiException {
@@ -176,6 +168,8 @@ public class TestAnalyticsService {
// Killbill generic setup
setupBusAndMySQL();
+ helper.cleanupAllTables();
+
tagDao.create(TAG_ONE, context);
tagDao.create(TAG_TWO, context);
@@ -197,7 +191,6 @@ public class TestAnalyticsService {
}
private void setupBusAndMySQL() throws IOException {
- bus.start();
final String analyticsDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/analytics/ddl.sql"));
final String accountDdl = IOUtils.toString(BusinessSubscriptionTransitionDao.class.getResourceAsStream("/com/ning/billing/account/ddl.sql"));
@@ -216,7 +209,9 @@ public class TestAnalyticsService {
helper.initDb(utilDdl);
helper.initDb(junctionDdl);
- helper.cleanupAllTables();
+ helper.cleanupAllTables();
+
+ bus.start();
}
private void createSubscriptionTransitionEvent(final Account account) throws EntitlementUserApiException {
@@ -296,7 +291,7 @@ public class TestAnalyticsService {
helper.stopMysql();
}
- @Test(groups = "slow")
+ @Test(groups = "slow", enabled=true)
public void testRegisterForNotifications() throws Exception {
// Make sure the service has been instantiated
Assert.assertEquals(service.getName(), "analytics-service");
@@ -312,9 +307,8 @@ public class TestAnalyticsService {
// Send events and wait for the async part...
bus.post(transition);
-
bus.post(accountCreationNotification);
- Thread.sleep(1000);
+ Thread.sleep(5000);
Assert.assertEquals(subscriptionDao.getTransitions(KEY).size(), 1);
Assert.assertEquals(subscriptionDao.getTransitions(KEY).get(0), expectedTransition);
@@ -329,12 +323,12 @@ public class TestAnalyticsService {
// Post the same invoice event again - the invoice balance shouldn't change
bus.post(invoiceCreationNotification);
- Thread.sleep(1000);
+ Thread.sleep(5000);
Assert.assertTrue(accountDao.getAccount(ACCOUNT_KEY).getTotalInvoiceBalance().compareTo(INVOICE_AMOUNT) == 0);
// Test payment integration - the fields have already been populated, just make sure the code is exercised
bus.post(paymentInfoNotification);
- Thread.sleep(1000);
+ Thread.sleep(5000);
Assert.assertEquals(accountDao.getAccount(ACCOUNT_KEY).getPaymentMethod(), PAYMENT_METHOD);
Assert.assertEquals(accountDao.getAccount(ACCOUNT_KEY).getBillingAddressCountry(), CARD_COUNTRY);
diff --git a/analytics/src/test/java/com/ning/billing/analytics/dao/TestAnalyticsDao.java b/analytics/src/test/java/com/ning/billing/analytics/dao/TestAnalyticsDao.java
index 55aa5d9..3e24a39 100644
--- a/analytics/src/test/java/com/ning/billing/analytics/dao/TestAnalyticsDao.java
+++ b/analytics/src/test/java/com/ning/billing/analytics/dao/TestAnalyticsDao.java
@@ -263,10 +263,8 @@ public class TestAnalyticsDao
Assert.assertEquals(transitions.get(0).getKey(), transition.getKey());
Assert.assertEquals(transitions.get(0).getRequestedTimestamp(), transition.getRequestedTimestamp());
Assert.assertEquals(transitions.get(0).getEvent(), transition.getEvent());
- // Null Plan and Phase doesn't make sense so we turn the subscription into a null
- // STEPH not sure why that fails ?
- //Assert.assertNull(transitions.get(0).getPreviousSubscription());
- //Assert.assertNull(transitions.get(0).getNextSubscription());
+ Assert.assertNull(transitions.get(0).getPreviousSubscription());
+ Assert.assertNull(transitions.get(0).getNextSubscription());
}
@Test(groups = "slow")
diff --git a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegration.java b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegration.java
index 3e8dfb4..f7db5d1 100644
--- a/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegration.java
+++ b/beatrix/src/test/java/com/ning/billing/beatrix/integration/TestIntegration.java
@@ -71,10 +71,6 @@ public class TestIntegration extends TestIntegrationBase {
testBasePlanComplete(startDate, 3, true);
}
-// private void waitForDebug() throws Exception {
-// Thread.sleep(600000);
-// }
-
@Test(groups = {"slow", "stress"}, enabled = false)
public void stressTest() throws Exception {
final int maxIterations = 7;
diff --git a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
index efa2e44..7534014 100644
--- a/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
+++ b/entitlement/src/main/java/com/ning/billing/entitlement/engine/dao/EntitlementSqlDao.java
@@ -134,7 +134,7 @@ public class EntitlementSqlDao implements EntitlementDao {
public SubscriptionBundle createSubscriptionBundle(final SubscriptionBundleData bundle, final CallContext context) {
return bundlesDao.inTransaction(new Transaction<SubscriptionBundle, BundleSqlDao>() {
@Override
- public SubscriptionBundle inTransaction(BundleSqlDao bundlesDao, TransactionStatus status) {
+ public SubscriptionBundle inTransaction(BundleSqlDao transactional, TransactionStatus status) {
bundlesDao.insertBundle(bundle, context);
AuditSqlDao auditSqlDao = bundlesDao.become(AuditSqlDao.class);
@@ -200,13 +200,13 @@ public class EntitlementSqlDao implements EntitlementDao {
subscriptionsDao.inTransaction(new Transaction<Void, SubscriptionSqlDao>() {
@Override
- public Void inTransaction(SubscriptionSqlDao transactionalDao,
+ public Void inTransaction(SubscriptionSqlDao transactional,
TransactionStatus status) throws Exception {
- transactionalDao.updateChargedThroughDate(subscription.getId().toString(), ctd, context);
+ transactional.updateChargedThroughDate(subscription.getId().toString(), ctd, context);
- BundleSqlDao tmpDao = transactionalDao.become(BundleSqlDao.class);
+ BundleSqlDao tmpDao = transactional.become(BundleSqlDao.class);
tmpDao.updateBundleLastSysTime(subscription.getBundleId().toString(), clock.getUTCNow().toDate());
- AuditSqlDao auditSqlDao = transactionalDao.become(AuditSqlDao.class);
+ AuditSqlDao auditSqlDao = transactional.become(AuditSqlDao.class);
String subscriptionId = subscription.getId().toString();
auditSqlDao.insertAuditFromTransaction(SUBSCRIPTIONS_TABLE_NAME, subscriptionId, ChangeType.UPDATE, context);
return null;
@@ -218,14 +218,14 @@ public class EntitlementSqlDao implements EntitlementDao {
public void createNextPhaseEvent(final UUID subscriptionId, final EntitlementEvent nextPhase, final CallContext context) {
eventsDao.inTransaction(new Transaction<Void, EventSqlDao>() {
@Override
- public Void inTransaction(EventSqlDao dao,
+ public Void inTransaction(EventSqlDao transactional,
TransactionStatus status) throws Exception {
- cancelNextPhaseEventFromTransaction(subscriptionId, dao, context);
- dao.insertEvent(nextPhase, context);
- AuditSqlDao auditSqlDao = dao.become(AuditSqlDao.class);
+ cancelNextPhaseEventFromTransaction(subscriptionId, transactional, context);
+ transactional.insertEvent(nextPhase, context);
+ AuditSqlDao auditSqlDao = transactional.become(AuditSqlDao.class);
auditSqlDao.insertAuditFromTransaction(ENTITLEMENT_EVENTS_TABLE_NAME, nextPhase.getId().toString(), ChangeType.INSERT, context);
- recordFutureNotificationFromTransaction(dao,
+ recordFutureNotificationFromTransaction(transactional,
nextPhase.getEffectiveDate(),
new EntitlementNotificationKey(nextPhase.getId()));
return null;
@@ -279,23 +279,23 @@ public class EntitlementSqlDao implements EntitlementDao {
subscriptionsDao.inTransaction(new Transaction<Void, SubscriptionSqlDao>() {
@Override
- public Void inTransaction(SubscriptionSqlDao dao,
+ public Void inTransaction(SubscriptionSqlDao transactional,
TransactionStatus status) throws Exception {
- dao.insertSubscription(subscription, context);
+ transactional.insertSubscription(subscription, context);
// STEPH batch as well
- EventSqlDao eventsDaoFromSameTransaction = dao.become(EventSqlDao.class);
+ EventSqlDao eventsDaoFromSameTransaction = transactional.become(EventSqlDao.class);
List<String> eventIds = new ArrayList<String>();
for (final EntitlementEvent cur : initialEvents) {
eventsDaoFromSameTransaction.insertEvent(cur, context);
eventIds.add(cur.getId().toString()); // collect ids for batch audit log insert
- recordFutureNotificationFromTransaction(dao,
+ recordFutureNotificationFromTransaction(transactional,
cur.getEffectiveDate(),
new EntitlementNotificationKey(cur.getId()));
}
- AuditSqlDao auditSqlDao = dao.become(AuditSqlDao.class);
+ AuditSqlDao auditSqlDao = transactional.become(AuditSqlDao.class);
auditSqlDao.insertAuditFromTransaction(ENTITLEMENT_EVENTS_TABLE_NAME, eventIds, ChangeType.INSERT, context);
return null;
}
@@ -308,20 +308,20 @@ public class EntitlementSqlDao implements EntitlementDao {
eventsDao.inTransaction(new Transaction<Void, EventSqlDao>() {
@Override
- public Void inTransaction(EventSqlDao dao,
+ public Void inTransaction(EventSqlDao transactional,
TransactionStatus status) throws Exception {
List<String> eventIds = new ArrayList<String>();
for (final EntitlementEvent cur : recreateEvents) {
- dao.insertEvent(cur, context);
+ transactional.insertEvent(cur, context);
eventIds.add(cur.getId().toString()); // gather event ids for batch audit insert
- recordFutureNotificationFromTransaction(dao,
+ recordFutureNotificationFromTransaction(transactional,
cur.getEffectiveDate(),
new EntitlementNotificationKey(cur.getId()));
}
- AuditSqlDao auditSqlDao = dao.become(AuditSqlDao.class);
+ AuditSqlDao auditSqlDao = transactional.become(AuditSqlDao.class);
auditSqlDao.insertAuditFromTransaction(ENTITLEMENT_EVENTS_TABLE_NAME, eventIds, ChangeType.INSERT, context);
return null;
}
@@ -333,17 +333,17 @@ public class EntitlementSqlDao implements EntitlementDao {
eventsDao.inTransaction(new Transaction<Void, EventSqlDao>() {
@Override
- public Void inTransaction(EventSqlDao dao,
+ public Void inTransaction(EventSqlDao transactional,
TransactionStatus status) throws Exception {
- cancelNextCancelEventFromTransaction(subscriptionId, dao, context);
- cancelNextChangeEventFromTransaction(subscriptionId, dao, context);
- cancelNextPhaseEventFromTransaction(subscriptionId, dao, context);
- dao.insertEvent(cancelEvent, context);
- AuditSqlDao auditSqlDao = dao.become(AuditSqlDao.class);
+ cancelNextCancelEventFromTransaction(subscriptionId, transactional, context);
+ cancelNextChangeEventFromTransaction(subscriptionId, transactional, context);
+ cancelNextPhaseEventFromTransaction(subscriptionId, transactional, context);
+ transactional.insertEvent(cancelEvent, context);
+ AuditSqlDao auditSqlDao = transactional.become(AuditSqlDao.class);
String cancelEventId = cancelEvent.getId().toString();
auditSqlDao.insertAuditFromTransaction(ENTITLEMENT_EVENTS_TABLE_NAME, cancelEventId, ChangeType.INSERT, context);
- recordFutureNotificationFromTransaction(dao,
+ recordFutureNotificationFromTransaction(transactional,
cancelEvent.getEffectiveDate(),
new EntitlementNotificationKey(cancelEvent.getId(), seqId));
return null;
@@ -357,12 +357,12 @@ public class EntitlementSqlDao implements EntitlementDao {
eventsDao.inTransaction(new Transaction<Void, EventSqlDao>() {
@Override
- public Void inTransaction(EventSqlDao dao,
+ public Void inTransaction(EventSqlDao transactional,
TransactionStatus status) throws Exception {
UUID existingCancelId = null;
Date now = clock.getUTCNow().toDate();
- List<EntitlementEvent> events = dao.getFutureActiveEventForSubscription(subscriptionId.toString(), now);
+ List<EntitlementEvent> events = transactional.getFutureActiveEventForSubscription(subscriptionId.toString(), now);
for (EntitlementEvent cur : events) {
if (cur.getType() == EventType.API_USER && ((ApiEvent) cur).getEventType() == ApiEventType.CANCEL) {
@@ -374,19 +374,19 @@ public class EntitlementSqlDao implements EntitlementDao {
}
if (existingCancelId != null) {
- dao.unactiveEvent(existingCancelId.toString(), context);
+ transactional.unactiveEvent(existingCancelId.toString(), context);
String deactivatedEventId = existingCancelId.toString();
List<String> eventIds = new ArrayList<String>();
for (final EntitlementEvent cur : uncancelEvents) {
- dao.insertEvent(cur, context);
+ transactional.insertEvent(cur, context);
eventIds.add(cur.getId().toString()); // gather event ids for batch insert into audit log
- recordFutureNotificationFromTransaction(dao,
+ recordFutureNotificationFromTransaction(transactional,
cur.getEffectiveDate(),
new EntitlementNotificationKey(cur.getId()));
}
- AuditSqlDao auditSqlDao = dao.become(AuditSqlDao.class);
+ AuditSqlDao auditSqlDao = transactional.become(AuditSqlDao.class);
auditSqlDao.insertAuditFromTransaction(ENTITLEMENT_EVENTS_TABLE_NAME, deactivatedEventId, ChangeType.UPDATE, context);
auditSqlDao.insertAuditFromTransaction(ENTITLEMENT_EVENTS_TABLE_NAME, eventIds, ChangeType.INSERT, context);
}
@@ -399,21 +399,21 @@ public class EntitlementSqlDao implements EntitlementDao {
public void changePlan(final UUID subscriptionId, final List<EntitlementEvent> changeEvents, final CallContext context) {
eventsDao.inTransaction(new Transaction<Void, EventSqlDao>() {
@Override
- public Void inTransaction(EventSqlDao dao, TransactionStatus status) throws Exception {
- cancelNextChangeEventFromTransaction(subscriptionId, dao, context);
- cancelNextPhaseEventFromTransaction(subscriptionId, dao, context);
+ public Void inTransaction(EventSqlDao transactional, TransactionStatus status) throws Exception {
+ cancelNextChangeEventFromTransaction(subscriptionId, transactional, context);
+ cancelNextPhaseEventFromTransaction(subscriptionId, transactional, context);
List<String> eventIds = new ArrayList<String>();
for (final EntitlementEvent cur : changeEvents) {
- dao.insertEvent(cur, context);
+ transactional.insertEvent(cur, context);
eventIds.add(cur.getId().toString()); // gather event ids for batch audit log insert
- recordFutureNotificationFromTransaction(dao,
+ recordFutureNotificationFromTransaction(transactional,
cur.getEffectiveDate(),
new EntitlementNotificationKey(cur.getId()));
}
- AuditSqlDao auditSqlDao = dao.become(AuditSqlDao.class);
+ AuditSqlDao auditSqlDao = transactional.become(AuditSqlDao.class);
auditSqlDao.insertAuditFromTransaction(ENTITLEMENT_EVENTS_TABLE_NAME, eventIds, ChangeType.INSERT, context);
return null;
}
@@ -562,11 +562,11 @@ public class EntitlementSqlDao implements EntitlementDao {
eventsDao.inTransaction(new Transaction<Void, EventSqlDao>() {
@Override
- public Void inTransaction(EventSqlDao transEventDao,
+ public Void inTransaction(EventSqlDao transactional,
TransactionStatus status) throws Exception {
- SubscriptionSqlDao transSubDao = transEventDao.become(SubscriptionSqlDao.class);
- BundleSqlDao transBundleDao = transEventDao.become(BundleSqlDao.class);
+ SubscriptionSqlDao transSubDao = transactional.become(SubscriptionSqlDao.class);
+ BundleSqlDao transBundleDao = transactional.become(BundleSqlDao.class);
List<String> bundleIds = new ArrayList<String>();
List<String> subscriptionIds = new ArrayList<String>();
@@ -579,10 +579,10 @@ public class EntitlementSqlDao implements EntitlementDao {
SubscriptionData subData = curSubscription.getData();
for (final EntitlementEvent curEvent : curSubscription.getInitialEvents()) {
- transEventDao.insertEvent(curEvent, context);
+ transactional.insertEvent(curEvent, context);
eventIds.add(curEvent.getId().toString()); // gather event ids for batch audit
- recordFutureNotificationFromTransaction(transEventDao,
+ recordFutureNotificationFromTransaction(transactional,
curEvent.getEffectiveDate(),
new EntitlementNotificationKey(curEvent.getId()));
}
@@ -630,7 +630,7 @@ public class EntitlementSqlDao implements EntitlementDao {
RepairEntitlementEvent busEvent = new DefaultRepairEntitlementEvent(context.getUserToken(), accountId, bundleId, clock.getUTCNow());
eventBus.postFromTransaction(busEvent, transactional);
} catch (EventBusException e) {
- log.warn("Failed to post repair entitlement event for bundle " + bundleId);
+ log.warn("Failed to post repair entitlement event for bundle " + bundleId, e);
}
return null;
}
@@ -662,9 +662,9 @@ public class EntitlementSqlDao implements EntitlementDao {
public void saveCustomFields(final SubscriptionData subscription, final CallContext context) {
subscriptionsDao.inTransaction(new Transaction<Void, SubscriptionSqlDao>() {
@Override
- public Void inTransaction(SubscriptionSqlDao transactionalDao,
+ public Void inTransaction(SubscriptionSqlDao transactional,
TransactionStatus status) throws Exception {
- updateCustomFieldsFromTransaction(transactionalDao, subscription, context);
+ updateCustomFieldsFromTransaction(transactional, subscription, context);
return null;
}
});
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
index e6d7705..0ac07c0 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/api/TestApiBase.java
@@ -151,9 +151,9 @@ public abstract class TestApiBase {
helper = (isSqlTest(dao)) ? g.getInstance(MysqlTestingHelper.class) : null;
((DefaultCatalogService) catalogService).loadCatalog();
- ((DefaultBusService) busService).startBus();
((Engine) entitlementService).initialize();
init(g);
+ ((DefaultBusService) busService).startBus();
}
private static boolean isSqlTest(EntitlementDao theDao) {
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
index f539ce7..31e0da3 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModule.java
@@ -29,7 +29,6 @@ public class MockEngineModule extends DefaultEntitlementModule {
@Override
protected void configure() {
super.configure();
- install(new BusModule());
install(new CatalogModule());
bind(AccountUserApi.class).toInstance(BrainDeadProxyFactory.createBrainDeadProxyFor(AccountUserApi.class));
install(new MockClockModule());
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
index e46e4bc..148b62d 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleMemory.java
@@ -22,6 +22,8 @@ import com.ning.billing.entitlement.api.timeline.RepairEntitlementLifecycleDao;
import com.ning.billing.entitlement.engine.dao.EntitlementDao;
import com.ning.billing.entitlement.engine.dao.MockEntitlementDaoMemory;
import com.ning.billing.entitlement.engine.dao.RepairEntitlementDao;
+import com.ning.billing.util.glue.BusModule;
+import com.ning.billing.util.glue.BusModule.BusType;
import com.ning.billing.util.notificationq.MockNotificationQueueService;
import com.ning.billing.util.notificationq.NotificationQueueService;
@@ -42,6 +44,7 @@ public class MockEngineModuleMemory extends MockEngineModule {
@Override
protected void configure() {
super.configure();
+ install(new BusModule(BusType.MEMORY));
installNotificationQueue();
}
}
diff --git a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
index 9195c91..e1b3742 100644
--- a/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
+++ b/entitlement/src/test/java/com/ning/billing/entitlement/glue/MockEngineModuleSql.java
@@ -32,8 +32,10 @@ import com.ning.billing.entitlement.engine.dao.MockEntitlementDaoSql;
import com.ning.billing.entitlement.engine.dao.RepairEntitlementDao;
+import com.ning.billing.util.glue.BusModule;
import com.ning.billing.util.glue.FieldStoreModule;
import com.ning.billing.util.glue.NotificationQueueModule;
+import com.ning.billing.util.glue.BusModule.BusType;
public class MockEngineModuleSql extends MockEngineModule {
@@ -65,6 +67,7 @@ public class MockEngineModuleSql extends MockEngineModule {
installDBI();
install(new NotificationQueueModule());
install(new FieldStoreModule());
+ install(new BusModule(BusType.PERSISTENT));
super.configure();
}
}
diff --git a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
index b4e4fd3..0b064bb 100644
--- a/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
+++ b/invoice/src/main/java/com/ning/billing/invoice/dao/DefaultInvoiceDao.java
@@ -45,6 +45,7 @@ import com.ning.billing.junction.api.BillingApi;
import com.ning.billing.util.ChangeType;
import com.ning.billing.util.audit.dao.AuditSqlDao;
import com.ning.billing.util.bus.Bus;
+import com.ning.billing.util.bus.Bus.EventBusException;
import com.ning.billing.util.callcontext.CallContext;
import com.ning.billing.util.customfield.CustomField;
import com.ning.billing.util.customfield.dao.CustomFieldSqlDao;
@@ -151,56 +152,56 @@ public class DefaultInvoiceDao implements InvoiceDao {
@Override
public void create(final Invoice invoice, final CallContext context) {
+
+ final InvoiceCreationEvent event = new DefaultInvoiceCreationEvent(invoice.getId(), invoice.getAccountId(),
+ invoice.getBalance(), invoice.getCurrency(),
+ invoice.getInvoiceDate(),
+ context.getUserToken());
+
invoiceSqlDao.inTransaction(new Transaction<Void, InvoiceSqlDao>() {
@Override
- public Void inTransaction(final InvoiceSqlDao invoiceDao, final TransactionStatus status) throws Exception {
+ public Void inTransaction(final InvoiceSqlDao transactional, final TransactionStatus status) throws Exception {
// STEPH this seems useless
- Invoice currentInvoice = invoiceDao.getById(invoice.getId().toString());
+ Invoice currentInvoice = transactional.getById(invoice.getId().toString());
if (currentInvoice == null) {
- invoiceDao.create(invoice, context);
+ transactional.create(invoice, context);
List<InvoiceItem> recurringInvoiceItems = invoice.getInvoiceItems(RecurringInvoiceItem.class);
- RecurringInvoiceItemSqlDao recurringInvoiceItemDao = invoiceDao.become(RecurringInvoiceItemSqlDao.class);
+ RecurringInvoiceItemSqlDao recurringInvoiceItemDao = transactional.become(RecurringInvoiceItemSqlDao.class);
recurringInvoiceItemDao.batchCreateFromTransaction(recurringInvoiceItems, context);
- notifyOfFutureBillingEvents(invoiceDao, recurringInvoiceItems);
+ notifyOfFutureBillingEvents(transactional, recurringInvoiceItems);
List<InvoiceItem> fixedPriceInvoiceItems = invoice.getInvoiceItems(FixedPriceInvoiceItem.class);
- FixedPriceInvoiceItemSqlDao fixedPriceInvoiceItemDao = invoiceDao.become(FixedPriceInvoiceItemSqlDao.class);
+ FixedPriceInvoiceItemSqlDao fixedPriceInvoiceItemDao = transactional.become(FixedPriceInvoiceItemSqlDao.class);
fixedPriceInvoiceItemDao.batchCreateFromTransaction(fixedPriceInvoiceItems, context);
- setChargedThroughDates(invoiceDao, fixedPriceInvoiceItems, recurringInvoiceItems, context);
+ setChargedThroughDates(transactional, fixedPriceInvoiceItems, recurringInvoiceItems, context);
// STEPH Why do we need that? Are the payments not always null at this point?
List<InvoicePayment> invoicePayments = invoice.getPayments();
- InvoicePaymentSqlDao invoicePaymentSqlDao = invoiceDao.become(InvoicePaymentSqlDao.class);
+ InvoicePaymentSqlDao invoicePaymentSqlDao = transactional.become(InvoicePaymentSqlDao.class);
invoicePaymentSqlDao.batchCreateFromTransaction(invoicePayments, context);
- AuditSqlDao auditSqlDao = invoiceDao.become(AuditSqlDao.class);
+ AuditSqlDao auditSqlDao = transactional.become(AuditSqlDao.class);
auditSqlDao.insertAuditFromTransaction("invoices", invoice.getId().toString(), ChangeType.INSERT, context);
auditSqlDao.insertAuditFromTransaction("recurring_invoice_items", getIdsFromInvoiceItems(recurringInvoiceItems), ChangeType.INSERT, context);
auditSqlDao.insertAuditFromTransaction("fixed_invoice_items", getIdsFromInvoiceItems(fixedPriceInvoiceItems), ChangeType.INSERT, context);
auditSqlDao.insertAuditFromTransaction("invoice_payments", getIdsFromInvoicePayments(invoicePayments), ChangeType.INSERT, context);
}
-
+ try {
+ eventBus.postFromTransaction(event, transactional);
+ } catch (EventBusException e) {
+ log.warn("Failed to post invoice event for invoiceId " + invoice.getId(), e);
+ }
return null;
}
});
- // TODO: move this inside the transaction once the bus is persistent
- InvoiceCreationEvent event;
- event = new DefaultInvoiceCreationEvent(invoice.getId(), invoice.getAccountId(),
- invoice.getBalance(), invoice.getCurrency(),
- invoice.getInvoiceDate(),
- context.getUserToken());
- try {
- eventBus.post(event);
- } catch (Bus.EventBusException e) {
- throw new RuntimeException(e);
- }
+
}
private List<String> getIdsFromInvoiceItems(List<InvoiceItem> invoiceItems) {
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
index 18c6dd4..29883f1 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/AccountResource.java
@@ -108,7 +108,6 @@ public class AccountResource implements BaseJaxrsResource {
AccountJson json = new AccountJson(account);
return Response.status(Status.OK).entity(json).build();
} catch (AccountApiException e) {
- log.warn("Failed to find account.", e);
return Response.status(Status.NO_CONTENT).build();
}
@@ -131,7 +130,6 @@ public class AccountResource implements BaseJaxrsResource {
});
return Response.status(Status.OK).entity(result).build();
} catch (AccountApiException e) {
- log.warn("Failed to find account.", e);
return Response.status(Status.NO_CONTENT).build();
}
}
@@ -151,7 +149,6 @@ public class AccountResource implements BaseJaxrsResource {
AccountJson json = new AccountJson(account);
return Response.status(Status.OK).entity(json).build();
} catch (AccountApiException e) {
- log.warn("Failed to find account.", e);
return Response.status(Status.NO_CONTENT).build();
}
}
@@ -242,7 +239,6 @@ public class AccountResource implements BaseJaxrsResource {
AccountTimelineJson json = new AccountTimelineJson(account, invoices, payments, bundlesTimeline);
return Response.status(Status.OK).entity(json).build();
} catch (AccountApiException e) {
- log.warn("Failed to find account.", e);
return Response.status(Status.NO_CONTENT).build();
} catch (EntitlementRepairException e) {
log.error(e.getMessage());
diff --git a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/SubscriptionResource.java b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/SubscriptionResource.java
index 09db7f9..259fe9f 100644
--- a/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/SubscriptionResource.java
+++ b/jaxrs/src/main/java/com/ning/billing/jaxrs/resources/SubscriptionResource.java
@@ -74,8 +74,6 @@ public class SubscriptionResource implements BaseJaxrsResource {
private final JaxrsUriBuilder uriBuilder;
private final KillbillEventHandler killbillHandler;
-
-
@Inject
public SubscriptionResource(final JaxrsUriBuilder uriBuilder, final EntitlementUserApi entitlementApi,
final Clock clock, final Context context, final KillbillEventHandler killbillHandler) {
@@ -239,7 +237,6 @@ public class SubscriptionResource implements BaseJaxrsResource {
return Response.status(Status.OK).build();
} catch (EntitlementUserApiException e) {
if(e.getCode() == ErrorCode.ENT_INVALID_SUBSCRIPTION_ID.getCode()) {
- log.info(String.format("Failed to find subscription %s", subscriptionId), e);
return Response.status(Status.NO_CONTENT).build();
} else {
log.info(String.format("Failed to uncancel plan for subscription %s", subscriptionId), e);
@@ -274,7 +271,6 @@ public class SubscriptionResource implements BaseJaxrsResource {
return Response.status(Status.OK).build();
} catch (EntitlementUserApiException e) {
if(e.getCode() == ErrorCode.ENT_INVALID_SUBSCRIPTION_ID.getCode()) {
- log.info(String.format("Failed to find subscription %s", subscriptionId), e);
return Response.status(Status.NO_CONTENT).build();
} else {
throw e;
diff --git a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
index 3086274..34890a4 100644
--- a/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
+++ b/overdue/src/test/java/com/ning/billing/overdue/notification/TestOverdueCheckNotifier.java
@@ -141,7 +141,7 @@ public class TestOverdueCheckNotifier {
clock = g.getInstance(Clock.class);
IDBI dbi = g.getInstance(IDBI.class);
- dao = dbi.onDemand(DummySqlTest.class);
+
eventBus = g.getInstance(Bus.class);
helper = g.getInstance(MysqlTestingHelper.class);
notificationQueueService = g.getInstance(NotificationQueueService.class);
@@ -161,7 +161,7 @@ public class TestOverdueCheckNotifier {
eventBus.start();
notifier.initialize();
notifier.start();
-
+ dao = dbi.onDemand(DummySqlTest.class);
}
private void startMysql() throws IOException, ClassNotFoundException, SQLException {
diff --git a/payment/src/main/java/com/ning/billing/payment/RequestProcessor.java b/payment/src/main/java/com/ning/billing/payment/RequestProcessor.java
index a1ccd55..3102e5c 100644
--- a/payment/src/main/java/com/ning/billing/payment/RequestProcessor.java
+++ b/payment/src/main/java/com/ning/billing/payment/RequestProcessor.java
@@ -76,15 +76,15 @@ public class RequestProcessor {
List<Either<PaymentErrorEvent, PaymentInfoEvent>> results = paymentApi.createPayment(account, Arrays.asList(event.getInvoiceId().toString()), context);
if (!results.isEmpty()) {
Either<PaymentErrorEvent, PaymentInfoEvent> result = results.get(0);
- eventBus.post(result.isLeft() ? result.getLeft() : result.getRight());
+ try {
+ eventBus.post(result.isLeft() ? result.getLeft() : result.getRight());
+ } catch (EventBusException e) {
+ log.error("Failed to post Payment event event for account {} ", account.getId(), e);
+ }
}
}
- }
- catch(AccountApiException e) {
+ } catch(AccountApiException e) {
log.warn("could not process invoice payment", e);
}
- catch (EventBusException ex) {
- throw new RuntimeException(ex);
- }
}
}
diff --git a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
index 004e3ab..a556028 100644
--- a/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
+++ b/util/src/main/java/com/ning/billing/util/bus/PersistentBus.java
@@ -103,7 +103,6 @@ public class PersistentBus implements Bus {
}
-
@Override
public void start() {
@@ -111,8 +110,9 @@ public class PersistentBus implements Bus {
curActiveThreads = 0;
final PersistentBus thePersistentBus = this;
-
final CountDownLatch doneInitialization = new CountDownLatch(NB_BUS_THREADS);
+
+ log.info("Starting Persistent BUS with {} threads, countDownLatch = {}", NB_BUS_THREADS, doneInitialization.getCount());
for (int i = 0; i < NB_BUS_THREADS; i++) {
executor.execute(new Runnable() {
@@ -126,7 +126,7 @@ public class PersistentBus implements Bus {
synchronized(thePersistentBus) {
curActiveThreads++;
}
-
+
doneInitialization.countDown();
try {
@@ -172,7 +172,12 @@ public class PersistentBus implements Bus {
});
}
try {
- doneInitialization.await(TIMEOUT_MSEC, TimeUnit.SECONDS);
+ boolean success = doneInitialization.await(TIMEOUT_MSEC, TimeUnit.MILLISECONDS);
+ if (!success) {
+ log.warn("Failed to wait for all threads to be started, got {}/{}", doneInitialization.getCount(), NB_BUS_THREADS);
+ } else {
+ log.info("Done waiting for all threads to be started, got {}/{}", doneInitialization.getCount(), NB_BUS_THREADS);
+ }
} catch (InterruptedException e) {
log.warn("PersistentBus start sequence got interrupted...");
}
@@ -198,10 +203,10 @@ public class PersistentBus implements Bus {
int result = 0;
for (final BusEventEntry cur : events) {
- BusEvent e = deserializeBusEvent(cur.getBusEventClass(), cur.getBusEventJson());
+ BusEvent evt = deserializeBusEvent(cur.getBusEventClass(), cur.getBusEventJson());
result++;
- // STEPH need to look at failure cases
- eventBusDelegate.post(e);
+ // STEPH exception handling is done by GUAVA-- logged a bug Issue-780
+ eventBusDelegate.post(evt);
dao.clearBusEvent(cur.getId(), hostname);
}
return result;
diff --git a/util/src/main/java/com/ning/billing/util/glue/BusModule.java b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
index 2d1d30b..656253d 100644
--- a/util/src/main/java/com/ning/billing/util/glue/BusModule.java
+++ b/util/src/main/java/com/ning/billing/util/glue/BusModule.java
@@ -29,8 +29,7 @@ public class BusModule extends AbstractModule {
public BusModule() {
super();
- // Default to Memory at this point
- type = BusType.MEMORY;
+ type = BusType.PERSISTENT;
}
public BusModule(BusType type) {
diff --git a/util/src/main/resources/com/ning/billing/util/ddl.sql b/util/src/main/resources/com/ning/billing/util/ddl.sql
index 2162500..9a241a0 100644
--- a/util/src/main/resources/com/ning/billing/util/ddl.sql
+++ b/util/src/main/resources/com/ning/billing/util/ddl.sql
@@ -126,7 +126,7 @@ DROP TABLE IF EXISTS bus_events;
CREATE TABLE bus_events (
id int(11) unsigned NOT NULL AUTO_INCREMENT,
class_name varchar(128) NOT NULL,
- event_json varchar(1024) NOT NULL,
+ event_json varchar(2048) NOT NULL,
created_dt datetime NOT NULL,
processing_owner char(50) DEFAULT NULL,
processing_available_dt datetime DEFAULT NULL,
diff --git a/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java b/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java
index 371d1f9..ddc89ed 100644
--- a/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java
+++ b/util/src/test/java/com/ning/billing/util/bus/TestEventBusBase.java
@@ -49,7 +49,8 @@ public class TestEventBusBase {
eventBus.stop();
}
- public static final class MyEvent implements BusEvent {
+
+ public static class MyEvent implements BusEvent {
private String name;
private Long value;
@@ -91,6 +92,18 @@ public class TestEventBusBase {
return type;
}
}
+
+ public static final class MyEventWithException extends MyEvent {
+
+ @JsonCreator
+ public MyEventWithException(@JsonProperty("name") String name,
+ @JsonProperty("value") Long value,
+ @JsonProperty("token") UUID token,
+ @JsonProperty("type") String type) {
+ super(name, value, token, type);
+ }
+ }
+
public static final class MyOtherEvent implements BusEvent {
@@ -135,6 +148,12 @@ public class TestEventBusBase {
return type;
}
}
+
+ public static class MyEventHandlerException extends RuntimeException {
+ public MyEventHandlerException(String msg) {
+ super(msg);
+ }
+ }
public static class MyEventHandler {
@@ -158,6 +177,11 @@ public class TestEventBusBase {
//log.debug("Got event {} {}", event.name, event.value);
}
+ @Subscribe
+ public synchronized void processEvent(MyEventWithException event) {
+ throw new MyEventHandlerException("FAIL");
+ }
+
public synchronized boolean waitForCompletion(long timeoutMs) {
long ini = System.currentTimeMillis();
@@ -176,6 +200,20 @@ public class TestEventBusBase {
}
}
+ public void testSimpleWithException() {
+ try {
+ MyEventHandler handler = new MyEventHandler(1);
+ eventBus.register(handler);
+
+ eventBus.post(new MyEventWithException("my-event", 1L, UUID.randomUUID(), BusEventType.ACCOUNT_CHANGE.toString()));
+
+ Thread.sleep(50000);
+ } catch (Exception e) {
+
+ }
+
+ }
+
public void testSimple() {
try {
diff --git a/util/src/test/java/com/ning/billing/util/bus/TestPersistentEventBus.java b/util/src/test/java/com/ning/billing/util/bus/TestPersistentEventBus.java
index 530a4e7..b42b694 100644
--- a/util/src/test/java/com/ning/billing/util/bus/TestPersistentEventBus.java
+++ b/util/src/test/java/com/ning/billing/util/bus/TestPersistentEventBus.java
@@ -82,9 +82,16 @@ public class TestPersistentEventBus extends TestEventBusBase {
}
}
- @Test(groups = "slow")
+ @Test(groups = {"slow"})
public void testSimple() {
super.testSimple();
}
+
+ // Until Guava fixes exception handling, r13?
+ @Test(groups={"slow"}, enabled=false)
+ public void testSimpleWithException() {
+ super.testSimpleWithException();
+
+ }
}