killbill-memoizeit

analytics: use account lock for all refresh calls Signed-off-by:

11/19/2012 8:15:25 PM

Details

diff --git a/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java b/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
index 6ceacf9..0aee461 100644
--- a/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
+++ b/analytics/src/main/java/com/ning/billing/analytics/AnalyticsListener.java
@@ -16,8 +16,11 @@
 
 package com.ning.billing.analytics;
 
-import com.ning.billing.account.api.AccountApiException;
-import com.ning.billing.entitlement.api.user.EntitlementUserApiException;
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.ning.billing.util.callcontext.CallOrigin;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalCallContextFactory;
@@ -51,6 +54,7 @@ import com.google.inject.Inject;
 
 public class AnalyticsListener {
 
+    private static final Logger log = LoggerFactory.getLogger(AnalyticsListener.class);
     private static final int NB_LOCK_TRY = 5;
 
     private final BusinessSubscriptionTransitionDao bstDao;
@@ -77,40 +81,55 @@ public class AnalyticsListener {
         this.bosDao = bosDao;
         this.bipDao = bipDao;
         this.tagDao = tagDao;
-        // TODO: use accountRecordId when switching to internal events and acquire the lock for all refreshes
         this.locker = locker;
         this.internalCallContextFactory = internalCallContextFactory;
     }
 
     @Subscribe
-    public void handleEffectiveSubscriptionTransitionChange(final EffectiveSubscriptionInternalEvent eventEffective) throws AccountApiException, EntitlementUserApiException {
-        // The event is used as a trigger to rebuild all transitions for this bundle
-        bstDao.rebuildTransitionsForBundle(eventEffective.getBundleId(), createCallContext(eventEffective));
+    public void handleEffectiveSubscriptionTransitionChange(final EffectiveSubscriptionInternalEvent eventEffective) {
+        updateWithAccountLock(eventEffective.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                // The event is used as a trigger to rebuild all transitions for this bundle
+                bstDao.rebuildTransitionsForBundle(eventEffective.getBundleId(), createCallContext(eventEffective));
+                return null;
+            }
+        });
     }
 
     @Subscribe
-    public void handleRequestedSubscriptionTransitionChange(final RequestedSubscriptionInternalEvent eventRequested) throws AccountApiException, EntitlementUserApiException {
-        // The event is used as a trigger to rebuild all transitions for this bundle
-        bstDao.rebuildTransitionsForBundle(eventRequested.getBundleId(), createCallContext(eventRequested));
+    public void handleRequestedSubscriptionTransitionChange(final RequestedSubscriptionInternalEvent eventRequested) {
+        updateWithAccountLock(eventRequested.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                // The event is used as a trigger to rebuild all transitions for this bundle
+                bstDao.rebuildTransitionsForBundle(eventRequested.getBundleId(), createCallContext(eventRequested));
+                return null;
+            }
+        });
     }
 
     @Subscribe
     public void handleRepairEntitlement(final RepairEntitlementInternalEvent event) {
-        // In case of repair, just rebuild all transitions
-        bstDao.rebuildTransitionsForBundle(event.getBundleId(), createCallContext(event));
+        updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                // In case of repair, just rebuild all transitions
+                bstDao.rebuildTransitionsForBundle(event.getBundleId(), createCallContext(event));
+                return null;
+            }
+        });
     }
 
     @Subscribe
     public void handleAccountCreation(final AccountCreationInternalEvent event) {
-        GlobalLock lock = null;
-        try {
-            lock = locker.lockWithNumberOfTries(LockerType.ACCOUNT_FOR_ANALYTICS, event.getId().toString(), NB_LOCK_TRY);
-            bacDao.accountUpdated(event.getId(), createCallContext(event));
-        } finally {
-            if (lock != null) {
-                lock.release();
+        updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                bacDao.accountUpdated(event.getId(), createCallContext(event));
+                return null;
             }
-        }
+        });
     }
 
     @Subscribe
@@ -119,21 +138,25 @@ public class AnalyticsListener {
             return;
         }
 
-        GlobalLock lock = null;
-        try {
-            lock = locker.lockWithNumberOfTries(LockerType.ACCOUNT_FOR_ANALYTICS, event.getAccountId().toString(), NB_LOCK_TRY);
-            bacDao.accountUpdated(event.getAccountId(), createCallContext(event));
-        } finally {
-            if (lock != null) {
-                lock.release();
+        updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                bacDao.accountUpdated(event.getAccountId(), createCallContext(event));
+                return null;
             }
-        }
+        });
     }
 
     @Subscribe
     public void handleInvoiceCreation(final InvoiceCreationInternalEvent event) {
-        // The event is used as a trigger to rebuild all invoices and invoice items for this account
-        invoiceDao.rebuildInvoicesForAccount(event.getAccountId(), createCallContext(event));
+        updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                // The event is used as a trigger to rebuild all invoices and invoice items for this account
+                invoiceDao.rebuildInvoicesForAccount(event.getAccountId(), createCallContext(event));
+                return null;
+            }
+        });
     }
 
     @Subscribe
@@ -143,53 +166,101 @@ public class AnalyticsListener {
 
     @Subscribe
     public void handleInvoiceAdjustment(final InvoiceAdjustmentInternalEvent event) {
-        // The event is used as a trigger to rebuild all invoices and invoice items for this account
-        invoiceDao.rebuildInvoicesForAccount(event.getAccountId(), createCallContext(event));
+        updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                // The event is used as a trigger to rebuild all invoices and invoice items for this account
+                invoiceDao.rebuildInvoicesForAccount(event.getAccountId(), createCallContext(event));
+                return null;
+            }
+        });
     }
 
     @Subscribe
     public void handlePaymentInfo(final PaymentInfoInternalEvent paymentInfo) {
-        bipDao.invoicePaymentPosted(paymentInfo.getAccountId(),
-                                    paymentInfo.getPaymentId(),
-                                    paymentInfo.getExtFirstPaymentRefId(),
-                                    paymentInfo.getExtSecondPaymentRefId(),
-                                    paymentInfo.getStatus().toString(),
-                                    createCallContext(paymentInfo));
+        updateWithAccountLock(paymentInfo.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                bipDao.invoicePaymentPosted(paymentInfo.getAccountId(),
+                                            paymentInfo.getPaymentId(),
+                                            paymentInfo.getExtFirstPaymentRefId(),
+                                            paymentInfo.getExtSecondPaymentRefId(),
+                                            paymentInfo.getStatus().toString(),
+                                            createCallContext(paymentInfo));
+                return null;
+            }
+        });
     }
 
     @Subscribe
     public void handlePaymentError(final PaymentErrorInternalEvent paymentError) {
-        bipDao.invoicePaymentPosted(paymentError.getAccountId(),
-                                    paymentError.getPaymentId(),
-                                    null,
-                                    null,
-                                    paymentError.getMessage(),
-                                    createCallContext(paymentError));
+        updateWithAccountLock(paymentError.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                bipDao.invoicePaymentPosted(paymentError.getAccountId(),
+                                            paymentError.getPaymentId(),
+                                            null,
+                                            null,
+                                            paymentError.getMessage(),
+                                            createCallContext(paymentError));
+                return null;
+            }
+        });
     }
 
     @Subscribe
     public void handleOverdueChange(final OverdueChangeInternalEvent changeEvent) {
-        bosDao.overdueStatusChanged(changeEvent.getOverdueObjectType(), changeEvent.getOverdueObjectId(), createCallContext(changeEvent));
+        updateWithAccountLock(changeEvent.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                bosDao.overdueStatusChanged(changeEvent.getOverdueObjectType(), changeEvent.getOverdueObjectId(), createCallContext(changeEvent));
+                return null;
+            }
+        });
     }
 
     @Subscribe
     public void handleControlTagCreation(final ControlTagCreationInternalEvent event) {
-        tagDao.tagAdded(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+        updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                tagDao.tagAdded(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+                return null;
+            }
+        });
     }
 
     @Subscribe
     public void handleControlTagDeletion(final ControlTagDeletionInternalEvent event) {
-        tagDao.tagRemoved(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+        updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                tagDao.tagRemoved(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+                return null;
+            }
+        });
     }
 
     @Subscribe
     public void handleUserTagCreation(final UserTagCreationInternalEvent event) {
-        tagDao.tagAdded(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+        updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                tagDao.tagAdded(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+                return null;
+            }
+        });
     }
 
     @Subscribe
     public void handleUserTagDeletion(final UserTagDeletionInternalEvent event) {
-        tagDao.tagRemoved(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+        updateWithAccountLock(event.getAccountRecordId(), new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                tagDao.tagRemoved(event.getObjectType(), event.getObjectId(), event.getTagDefinition().getName(), createCallContext(event));
+                return null;
+            }
+        });
     }
 
     @Subscribe
@@ -213,6 +284,24 @@ public class AnalyticsListener {
     }
 
     private InternalCallContext createCallContext(final BusInternalEvent event) {
-        return internalCallContextFactory.createInternalCallContext(event.getTenantRecordId(), event.getAccountRecordId(), "AnalyticsService", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+        return internalCallContextFactory.createInternalCallContext(event.getTenantRecordId(), event.getAccountRecordId(),
+                                                                    "AnalyticsService", CallOrigin.INTERNAL, UserType.SYSTEM, event.getUserToken());
+    }
+
+    private <T> T updateWithAccountLock(final Long accountRecordId, final Callable<T> task) {
+        GlobalLock lock = null;
+        try {
+            final String lockKey = accountRecordId == null ? "0" : accountRecordId.toString();
+            lock = locker.lockWithNumberOfTries(LockerType.ACCOUNT_FOR_ANALYTICS, lockKey, NB_LOCK_TRY);
+            return task.call();
+        } catch (Exception e) {
+            log.warn("Exception while refreshing analytics tables", e);
+        } finally {
+            if (lock != null) {
+                lock.release();
+            }
+        }
+
+        return null;
     }
 }
diff --git a/util/src/main/java/com/ning/billing/util/entity/dao/EntitySqlDao.java b/util/src/main/java/com/ning/billing/util/entity/dao/EntitySqlDao.java
index ea9ff61..58243d3 100644
--- a/util/src/main/java/com/ning/billing/util/entity/dao/EntitySqlDao.java
+++ b/util/src/main/java/com/ning/billing/util/entity/dao/EntitySqlDao.java
@@ -30,7 +30,6 @@ import com.ning.billing.util.audit.ChangeType;
 import com.ning.billing.util.callcontext.InternalCallContext;
 import com.ning.billing.util.callcontext.InternalTenantContext;
 import com.ning.billing.util.dao.AuditSqlDao;
-import com.ning.billing.util.dao.EntityHistoryModelDao;
 import com.ning.billing.util.dao.HistorySqlDao;
 import com.ning.billing.util.entity.Entity;
 import com.ning.billing.util.entity.EntityPersistenceException;