diff --git a/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java b/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
index 6ceacf9..0aee461 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
@@ -16,8 +16,11 @@
package com.ning.billing.analytics;
-import com.ning.billing.account.api.AccountApiException;
-import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.ning.billing.util.callcontext.CallOrigin;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
@@ -51,6 +54,7 @@ import com.google.inject.Inject;
public class AnalyticsListener {
+ private static final Logger log = LoggerFactory.getLogger(AnalyticsListener.class);
private static final int NB_LOCK_TRY = 5;
private final BusinessSubscriptionTransitionDao bstDao;
@@ -77,40 +81,55 @@ public class AnalyticsListener {
this.bosDao = bosDao;
this.bipDao = bipDao;
this.tagDao = tagDao;
- // TODO: use accountRecordId when switching to internal events and acquire the lock for all refreshes
this.locker = locker;
this.internalCallContextFactory = internalCallContextFactory;
}
@Subscribe
- public void handleEffectiveSubscriptionTransitionChange(final EffectiveSubscriptionInternalEvent eventEffective) throws AccountApiException, EntitlementUserApiException {
- // The event is used as a trigger to rebuild all transitions for this bundle
- bstDao.rebuildTransitionsForBundle(eventEffective.getBundleId(), createCallContext(eventEffective));
+ public void handleEffectiveSubscriptionTransitionChange(final EffectiveSubscriptionInternalEvent eventEffective) {
+ updateWithAccountLock(eventEffective.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ // The event is used as a trigger to rebuild all transitions for this bundle
+ bstDao.rebuildTransitionsForBundle(eventEffective.getBundleId(), createCallContext(eventEffective));
+ return null;
+ }
+ });
}
@Subscribe
- public void handleRequestedSubscriptionTransitionChange(final RequestedSubscriptionInternalEvent eventRequested) throws AccountApiException, EntitlementUserApiException {
- // The event is used as a trigger to rebuild all transitions for this bundle
- bstDao.rebuildTransitionsForBundle(eventRequested.getBundleId(), createCallContext(eventRequested));
+ public void handleRequestedSubscriptionTransitionChange(final RequestedSubscriptionInternalEvent eventRequested) {
+ updateWithAccountLock(eventRequested.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ // The event is used as a trigger to rebuild all transitions for this bundle
+ bstDao.rebuildTransitionsForBundle(eventRequested.getBundleId(), createCallContext(eventRequested));
+ return null;
+ }
+ });
}
@Subscribe
public void handleRepairEntitlement(final RepairEntitlementInternalEvent event) {
- // In case of repair, just rebuild all transitions
- bstDao.rebuildTransitionsForBundle(event.getBundleId(), createCallContext(event));
+ updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ // In case of repair, just rebuild all transitions
+ bstDao.rebuildTransitionsForBundle(event.getBundleId(), createCallContext(event));
+ return null;
+ }
+ });
}
@Subscribe
public void handleAccountCreation(final AccountCreationInternalEvent event) {
- GlobalLock lock = null;
- try {
- lock = locker.lockWithNumberOfTries(LockerType.ACCOUNT_FOR_ANALYTICS, event.getId().toString(), NB_LOCK_TRY);
- bacDao.accountUpdated(event.getId(), createCallContext(event));
- } finally {
- if (lock != null) {
- lock.release();
+ updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ bacDao.accountUpdated(event.getId(), createCallContext(event));
+ return null;
}
- }
+ });
}
@Subscribe
@@ -119,21 +138,25 @@ public class AnalyticsListener {
return;
}
- GlobalLock lock = null;
- try {
- lock = locker.lockWithNumberOfTries(LockerType.ACCOUNT_FOR_ANALYTICS, event.getAccountId().toString(), NB_LOCK_TRY);
- bacDao.accountUpdated(event.getAccountId(), createCallContext(event));
- } finally {
- if (lock != null) {
- lock.release();
+ updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ bacDao.accountUpdated(event.getAccountId(), createCallContext(event));
+ return null;
}
- }
+ });
}
@Subscribe
public void handleInvoiceCreation(final InvoiceCreationInternalEvent event) {
- // The event is used as a trigger to rebuild all invoices and invoice items for this account
- invoiceDao.rebuildInvoicesForAccount(event.getAccountId(), createCallContext(event));
+ updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ // The event is used as a trigger to rebuild all invoices and invoice items for this account
+ invoiceDao.rebuildInvoicesForAccount(event.getAccountId(), createCallContext(event));
+ return null;
+ }
+ });
}
@Subscribe
@@ -143,53 +166,101 @@ public class AnalyticsListener {
@Subscribe
public void handleInvoiceAdjustment(final InvoiceAdjustmentInternalEvent event) {
- // The event is used as a trigger to rebuild all invoices and invoice items for this account
- invoiceDao.rebuildInvoicesForAccount(event.getAccountId(), createCallContext(event));
+ updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ // The event is used as a trigger to rebuild all invoices and invoice items for this account
+ invoiceDao.rebuildInvoicesForAccount(event.getAccountId(), createCallContext(event));
+ return null;
+ }
+ });
}
@Subscribe
public void handlePaymentInfo(final PaymentInfoInternalEvent paymentInfo) {
- bipDao.invoicePaymentPosted(paymentInfo.getAccountId(),
- paymentInfo.getPaymentId(),
- paymentInfo.getExtFirstPaymentRefId(),
- paymentInfo.getExtSecondPaymentRefId(),
- paymentInfo.getStatus().toString(),
- createCallContext(paymentInfo));
+ updateWithAccountLock(paymentInfo.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ bipDao.invoicePaymentPosted(paymentInfo.getAccountId(),
+ paymentInfo.getPaymentId(),
+ paymentInfo.getExtFirstPaymentRefId(),
+ paymentInfo.getExtSecondPaymentRefId(),
+ paymentInfo.getStatus().toString(),
+ createCallContext(paymentInfo));
+ return null;
+ }
+ });
}
@Subscribe
public void handlePaymentError(final PaymentErrorInternalEvent paymentError) {
- bipDao.invoicePaymentPosted(paymentError.getAccountId(),
- paymentError.getPaymentId(),
- null,
- null,
- paymentError.getMessage(),
- createCallContext(paymentError));
+ updateWithAccountLock(paymentError.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ bipDao.invoicePaymentPosted(paymentError.getAccountId(),
+ paymentError.getPaymentId(),
+ null,
+ null,
+ paymentError.getMessage(),
+ createCallContext(paymentError));
+ return null;
+ }
+ });
}
@Subscribe
public void handleOverdueChange(final OverdueChangeInternalEvent changeEvent) {
- bosDao.overdueStatusChanged(changeEvent.getOverdueObjectType(), changeEvent.getOverdueObjectId(), createCallContext(changeEvent));
+ updateWithAccountLock(changeEvent.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ bosDao.overdueStatusChanged(changeEvent.getOverdueObjectType(), changeEvent.getOverdueObjectId(), createCallContext(changeEvent));
+ return null;
+ }
+ });
}
@Subscribe
public void handleControlTagCreation(final ControlTagCreationInternalEvent event) {
- tagDao.tagAdded(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+ updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ tagDao.tagAdded(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+ return null;
+ }
+ });
}
@Subscribe
public void handleControlTagDeletion(final ControlTagDeletionInternalEvent event) {
- tagDao.tagRemoved(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+ updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ tagDao.tagRemoved(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+ return null;
+ }
+ });
}
@Subscribe
public void handleUserTagCreation(final UserTagCreationInternalEvent event) {
- tagDao.tagAdded(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+ updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ tagDao.tagAdded(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+ return null;
+ }
+ });
}
@Subscribe
public void handleUserTagDeletion(final UserTagDeletionInternalEvent event) {
- tagDao.tagRemoved(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+ updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ tagDao.tagRemoved(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+ return null;
+ }
+ });
}
@Subscribe
@@ -213,6 +284,24 @@ public class AnalyticsListener {
}
private InternalCallContext createCallContext(final BusInternalEvent event) {
- return internalCallContextFactory.createInternalCallContext(event.getTenantRecordId(), event.getAccountRecordId(), "AnalyticsService", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ return internalCallContextFactory.createInternalCallContext(event.getTenantRecordId(), event.getAccountRecordId(),
+ "AnalyticsService", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+ }
+
+ private <T> T updateWithAccountLock(final Long accountRecordId, final Callable<T> task) {
+ GlobalLock lock = null;
+ try {
+ final String lockKey = accountRecordId == null ? "0" : accountRecordId.toString();
+ lock = locker.lockWithNumberOfTries(LockerType.ACCOUNT_FOR_ANALYTICS, lockKey, NB_LOCK_TRY);
+ return task.call();
+ } catch (Exception e) {
+ log.warn("Exception while refreshing analytics tables", e);
+ } finally {
+ if (lock != null) {
+ lock.release();
+ }
+ }
+
+ return null;
}
}