killbill-memoizeit

util: introduce thread-local dirty DB flag If an API call

4/10/2018 6:33:48 AM

Details

diff --git a/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestDefaultEntitlementApi.java b/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestDefaultEntitlementApi.java
index 58285fb..9d65c32 100644
--- a/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestDefaultEntitlementApi.java
+++ b/entitlement/src/test/java/org/killbill/billing/entitlement/api/TestDefaultEntitlementApi.java
@@ -18,7 +18,6 @@
 
 package org.killbill.billing.entitlement.api;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
@@ -45,9 +44,7 @@ import org.testng.annotations.Test;
 import com.google.common.collect.ImmutableList;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 public class TestDefaultEntitlementApi extends EntitlementTestSuiteWithEmbeddedDB {
@@ -542,7 +539,6 @@ public class TestDefaultEntitlementApi extends EntitlementTestSuiteWithEmbeddedD
 
     }
 
-
     @Test(groups = "slow")
     public void testCreateEntitlementInThePast() throws AccountApiException, EntitlementApiException, SubscriptionBaseApiException {
         final LocalDate initialDate = new LocalDate(2013, 8, 7);
@@ -645,7 +641,6 @@ public class TestDefaultEntitlementApi extends EntitlementTestSuiteWithEmbeddedD
 
     }
 
-
     @Test(groups = "slow")
     public void testCreateBaseWithEntitlementInTheFuture() throws AccountApiException, EntitlementApiException, SubscriptionApiException {
         final LocalDate initialDate = new LocalDate(2013, 8, 7);
@@ -757,7 +752,6 @@ public class TestDefaultEntitlementApi extends EntitlementTestSuiteWithEmbeddedD
         assertEquals(entitlement.getEffectiveStartDate(), initialDate);
     }
 
-
     @Test(groups = "slow")
     public void testCreateBaseSubscriptionsWithAddOns() throws AccountApiException, EntitlementApiException, SubscriptionApiException {
         final LocalDate initialDate = new LocalDate(2013, 8, 7);
@@ -805,7 +799,6 @@ public class TestDefaultEntitlementApi extends EntitlementTestSuiteWithEmbeddedD
 
     }
 
-
     @Test(groups = "slow", expectedExceptions = EntitlementApiException.class)
     public void testCreateBaseSubscriptionsWithAddOnsMissingBase() throws AccountApiException, EntitlementApiException, SubscriptionApiException {
         final LocalDate initialDate = new LocalDate(2013, 8, 7);
diff --git a/subscription/src/test/java/org/killbill/billing/subscription/engine/dao/TestSubscriptionDao.java b/subscription/src/test/java/org/killbill/billing/subscription/engine/dao/TestSubscriptionDao.java
index 99b6544..2fc6531 100644
--- a/subscription/src/test/java/org/killbill/billing/subscription/engine/dao/TestSubscriptionDao.java
+++ b/subscription/src/test/java/org/killbill/billing/subscription/engine/dao/TestSubscriptionDao.java
@@ -37,6 +37,9 @@ import org.killbill.billing.subscription.events.SubscriptionBaseEvent;
 import org.killbill.billing.subscription.events.user.ApiEventBuilder;
 import org.killbill.billing.subscription.events.user.ApiEventCreate;
 import org.killbill.billing.util.UUIDs;
+import org.killbill.billing.util.glue.KillbillApiAopModule;
+import org.mockito.Mockito;
+import org.skife.jdbi.v2.IDBI;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -166,4 +169,54 @@ public class TestSubscriptionDao extends SubscriptionTestSuiteWithEmbeddedDB {
 
 
     }
+
+    @Test(groups = "slow")
+    public void testDirtyFlag() throws SubscriptionBaseApiException {
+        // @BeforeMethod created the account
+        KillbillApiAopModule.resetDirtyDBFlag();
+
+        final IDBI dbiSpy = Mockito.spy(dbi);
+        final IDBI roDbiSpy = Mockito.spy(roDbi);
+        final SubscriptionDao subscriptionDao = new DefaultSubscriptionDao(dbiSpy,
+                                                                           roDbiSpy,
+                                                                           clock,
+                                                                           addonUtils,
+                                                                           notificationQueueService,
+                                                                           bus,
+                                                                           controlCacheDispatcher,
+                                                                           nonEntityDao,
+                                                                           internalCallContextFactory);
+        Mockito.verify(dbiSpy, Mockito.times(0)).open();
+        Mockito.verify(roDbiSpy, Mockito.times(0)).open();
+
+        Assert.assertEquals(subscriptionDao.getSubscriptionBundleForAccount(accountId, internalCallContext).size(), 0);
+        Mockito.verify(dbiSpy, Mockito.times(0)).open();
+        Mockito.verify(roDbiSpy, Mockito.times(1)).open();
+
+        Assert.assertEquals(subscriptionDao.getSubscriptionBundleForAccount(accountId, internalCallContext).size(), 0);
+        Mockito.verify(dbiSpy, Mockito.times(0)).open();
+        Mockito.verify(roDbiSpy, Mockito.times(2)).open();
+
+        final String externalKey = UUID.randomUUID().toString();
+        final DateTime startDate = clock.getUTCNow();
+        final DateTime createdDate = startDate.plusSeconds(10);
+        final DefaultSubscriptionBaseBundle bundleDef = new DefaultSubscriptionBaseBundle(externalKey, accountId, startDate, startDate, createdDate, createdDate);
+        final SubscriptionBaseBundle bundle = subscriptionDao.createSubscriptionBundle(bundleDef, catalog, false, internalCallContext);
+        Mockito.verify(dbiSpy, Mockito.times(1)).open();
+        Mockito.verify(roDbiSpy, Mockito.times(2)).open();
+
+        Assert.assertEquals(subscriptionDao.getSubscriptionBundleForAccount(accountId, internalCallContext).size(), 1);
+        Mockito.verify(dbiSpy, Mockito.times(2)).open();
+        Mockito.verify(roDbiSpy, Mockito.times(2)).open();
+
+        Assert.assertEquals(subscriptionDao.getSubscriptionBundleForAccount(accountId, internalCallContext).size(), 1);
+        Mockito.verify(dbiSpy, Mockito.times(3)).open();
+        Mockito.verify(roDbiSpy, Mockito.times(2)).open();
+
+        KillbillApiAopModule.resetDirtyDBFlag();
+
+        Assert.assertEquals(subscriptionDao.getSubscriptionBundleForAccount(accountId, internalCallContext).size(), 1);
+        Mockito.verify(dbiSpy, Mockito.times(3)).open();
+        Mockito.verify(roDbiSpy, Mockito.times(3)).open();
+    }
 }
diff --git a/subscription/src/test/java/org/killbill/billing/subscription/SubscriptionTestSuiteWithEmbeddedDB.java b/subscription/src/test/java/org/killbill/billing/subscription/SubscriptionTestSuiteWithEmbeddedDB.java
index fa1c027..2b5da99 100644
--- a/subscription/src/test/java/org/killbill/billing/subscription/SubscriptionTestSuiteWithEmbeddedDB.java
+++ b/subscription/src/test/java/org/killbill/billing/subscription/SubscriptionTestSuiteWithEmbeddedDB.java
@@ -36,11 +36,14 @@ import org.killbill.billing.subscription.api.timeline.SubscriptionBaseTimelineAp
 import org.killbill.billing.subscription.api.transfer.SubscriptionBaseTransferApi;
 import org.killbill.billing.subscription.api.user.SubscriptionBaseBundle;
 import org.killbill.billing.subscription.api.user.TestSubscriptionHelper;
+import org.killbill.billing.subscription.engine.addon.AddonUtils;
 import org.killbill.billing.subscription.engine.dao.SubscriptionDao;
 import org.killbill.billing.subscription.glue.TestDefaultSubscriptionModuleWithEmbeddedDB;
 import org.killbill.billing.util.config.definition.SubscriptionConfig;
 import org.killbill.billing.util.dao.NonEntityDao;
+import org.killbill.bus.api.PersistentBus;
 import org.killbill.clock.ClockMock;
+import org.killbill.notificationq.api.NotificationQueueService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -65,10 +68,12 @@ public class SubscriptionTestSuiteWithEmbeddedDB extends GuicyKillbillTestSuiteW
     protected SubscriptionBaseInternalApi subscriptionInternalApi;
     @Inject
     protected SubscriptionBaseTransferApi transferApi;
-
+    @Inject
+    protected PersistentBus bus;
     @Inject
     protected SubscriptionBaseTimelineApi repairApi;
-
+    @Inject
+    protected NotificationQueueService notificationQueueService;
     @Inject
     protected CatalogService catalogService;
     @Inject
@@ -79,7 +84,8 @@ public class SubscriptionTestSuiteWithEmbeddedDB extends GuicyKillbillTestSuiteW
     protected ClockMock clock;
     @Inject
     protected BusService busService;
-
+    @Inject
+    protected AddonUtils addonUtils;
     @Inject
     protected TestSubscriptionHelper testUtil;
     @Inject
diff --git a/util/src/main/java/org/killbill/billing/util/entity/dao/EntitySqlDaoTransactionalJdbiWrapper.java b/util/src/main/java/org/killbill/billing/util/entity/dao/EntitySqlDaoTransactionalJdbiWrapper.java
index 7a01782..a6f6287 100644
--- a/util/src/main/java/org/killbill/billing/util/entity/dao/EntitySqlDaoTransactionalJdbiWrapper.java
+++ b/util/src/main/java/org/killbill/billing/util/entity/dao/EntitySqlDaoTransactionalJdbiWrapper.java
@@ -24,6 +24,7 @@ import org.killbill.billing.util.cache.CacheControllerDispatcher;
 import org.killbill.billing.util.callcontext.InternalCallContextFactory;
 import org.killbill.billing.util.dao.NonEntityDao;
 import org.killbill.billing.util.entity.Entity;
+import org.killbill.billing.util.glue.KillbillApiAopModule;
 import org.killbill.clock.Clock;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
@@ -82,12 +83,13 @@ public class EntitySqlDaoTransactionalJdbiWrapper {
 
     /**
      * @param <ReturnType>                   object type to return from the transaction
-     * @param ro                             whether to use the read-only connection
+     * @param requestedRO                    hint as whether to use the read-only connection
      * @param entitySqlDaoTransactionWrapper transaction to execute
      * @return result from the transaction fo type ReturnType
      */
-    public <ReturnType> ReturnType execute(final boolean ro, final EntitySqlDaoTransactionWrapper<ReturnType> entitySqlDaoTransactionWrapper) {
+    public <ReturnType> ReturnType execute(final boolean requestedRO, final EntitySqlDaoTransactionWrapper<ReturnType> entitySqlDaoTransactionWrapper) {
         final String debugInfo = logger.isDebugEnabled() ? getDebugInfo() : null;
+        final boolean ro = shouldUseRODBI(requestedRO, debugInfo);
         final String debugPrefix = ro ? "RO" : "RW";
 
         final Handle handle = ro ? roDbi.open() : dbi.open();
@@ -107,12 +109,33 @@ public class EntitySqlDaoTransactionalJdbiWrapper {
         }
     }
 
+    private boolean shouldUseRODBI(final boolean requestedRO, final String debugInfo) {
+        if (!requestedRO) {
+            KillbillApiAopModule.setDirtyDBFlag();
+            logger.debug("[RW] Dirty flag set, transaction: {}", debugInfo);
+            return false;
+        } else {
+            if (KillbillApiAopModule.getDirtyDBFlag()) {
+                // Redirect to the rw instance, to work-around any replication delay
+                logger.debug("[RW] RO DBI handle requested, but dirty flag set, transaction: {}", debugInfo);
+                return false;
+            } else {
+                return true;
+            }
+        }
+    }
+
     //
     // This is only used in the pagination APIs when streaming results. We want to keep the connection open, and also there is no need
     // to send bus events, record notifications where we need to keep the Connection through the jDBI Handle.
     //
     public <M extends EntityModelDao<E>, E extends Entity, T extends EntitySqlDao<M, E>> T onDemandForStreamingResults(final Class<T> sqlObjectType) {
-        return roDbi.onDemand(sqlObjectType);
+        final String debugInfo = logger.isDebugEnabled() ? getDebugInfo() : null;
+        if (shouldUseRODBI(true, debugInfo)) {
+            return roDbi.onDemand(sqlObjectType);
+        } else {
+            return dbi.onDemand(sqlObjectType);
+        }
     }
 
     /**
diff --git a/util/src/main/java/org/killbill/billing/util/glue/KillbillApiAopModule.java b/util/src/main/java/org/killbill/billing/util/glue/KillbillApiAopModule.java
index 9176282..6ca8209 100644
--- a/util/src/main/java/org/killbill/billing/util/glue/KillbillApiAopModule.java
+++ b/util/src/main/java/org/killbill/billing/util/glue/KillbillApiAopModule.java
@@ -35,6 +35,12 @@ import com.google.inject.matcher.Matchers;
 public class KillbillApiAopModule extends AbstractModule {
 
     private static final Logger logger = LoggerFactory.getLogger(KillbillApiAopModule.class);
+    private static final ThreadLocal<Boolean> perThreadDirtyDBFlag = new ThreadLocal<Boolean>();
+
+    static {
+        // Set an initial value
+        resetDirtyDBFlag();
+    }
 
     @Override
     protected void configure() {
@@ -53,15 +59,31 @@ public class KillbillApiAopModule extends AbstractModule {
             return prof.executeWithProfiling(ProfilingFeatureType.API, invocation.getMethod().getName(), new WithProfilingCallback() {
                 @Override
                 public Object execute() throws Throwable {
-                    logger.debug("Entering API call {}, arguments: {}", invocation.getMethod(), invocation.getArguments());
-                    final Object proceed = invocation.proceed();
-                    logger.debug("Exiting  API call {}, returning: {}", invocation.getMethod(), proceed);
-                    return proceed;
+                    try {
+                        logger.debug("Entering API call {}, arguments: {}", invocation.getMethod(), invocation.getArguments());
+                        final Object proceed = invocation.proceed();
+                        logger.debug("Exiting  API call {}, returning: {}", invocation.getMethod(), proceed);
+                        return proceed;
+                    } finally {
+                        resetDirtyDBFlag();
+                    }
                 }
             });
         }
     }
 
+    public static void setDirtyDBFlag() {
+        perThreadDirtyDBFlag.set(true);
+    }
+
+    public static void resetDirtyDBFlag() {
+        perThreadDirtyDBFlag.set(false);
+    }
+
+    public static Boolean getDirtyDBFlag() {
+        return perThreadDirtyDBFlag.get();
+    }
+
     private static final Matcher<Method> SYNTHETIC_METHOD_MATCHER = new Matcher<Method>() {
         @Override
         public boolean matches(final Method method) {
diff --git a/util/src/test/java/org/killbill/billing/GuicyKillbillTestSuiteWithEmbeddedDB.java b/util/src/test/java/org/killbill/billing/GuicyKillbillTestSuiteWithEmbeddedDB.java
index 719cff7..37e3f80 100644
--- a/util/src/test/java/org/killbill/billing/GuicyKillbillTestSuiteWithEmbeddedDB.java
+++ b/util/src/test/java/org/killbill/billing/GuicyKillbillTestSuiteWithEmbeddedDB.java
@@ -23,6 +23,7 @@ import javax.inject.Named;
 import javax.sql.DataSource;
 
 import org.killbill.billing.util.cache.CacheControllerDispatcher;
+import org.killbill.billing.util.glue.KillbillApiAopModule;
 import org.killbill.commons.embeddeddb.EmbeddedDB;
 import org.skife.jdbi.v2.IDBI;
 import org.slf4j.Logger;
@@ -63,6 +64,7 @@ public class GuicyKillbillTestSuiteWithEmbeddedDB extends GuicyKillbillTestSuite
     public void beforeMethod() throws Exception {
         cleanupAllTables();
         controlCacheDispatcher.clearAll();
+        KillbillApiAopModule.resetDirtyDBFlag();
     }
 
     protected void cleanupAllTables() {