keycloak-memoizeit

clustered testing

2/26/2016 6:45:58 PM

Details

diff --git a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java
index a6a14c0..e53f510 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java
@@ -93,7 +93,9 @@ import java.util.Set;
  * - roles are tricky because of composites.  Composite lists are cached too.  So, when a role is removed
  * we also iterate and invalidate any role or group that contains that role being removed.
  *
- *
+ * - Clustering gotchyas. With an invalidation cache, if you remove an entry on node 1 and this entry does not exist on node 2, node 2 will not receive a @Listener invalidation event.
+ * so, hat we have to put a marker entry in the invalidation cache before we read from the DB, so if the DB changes in between reading and adding a cache entry, the cache will be notified and bump
+ * the version information.
  *
  * - any relationship should be resolved from session.realms().  For example if JPA.getClientByClientId() is invoked,
  *  JPA should find the id of the client and then call session.realms().getClientById().  THis is to ensure that the cached
@@ -192,13 +194,16 @@ public class StreamCacheRealmProvider implements CacheRealmProvider {
 
             @Override
             public void commit() {
+                /*  THIS WAS CAUSING DEADLOCK IN A CLUSTER
                 if (delegate == null) return;
                 List<String> locks = new LinkedList<>();
                 locks.addAll(invalidations);
 
                 Collections.sort(locks); // lock ordering
                 cache.getRevisions().startBatch();
-                //if (!locks.isEmpty()) cache.getRevisions().getAdvancedCache().lock(locks);
+
+                if (!locks.isEmpty()) cache.getRevisions().getAdvancedCache().lock(locks);
+                */
 
             }
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java
index 29fc43c..7ca6b7a 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java
@@ -24,6 +24,7 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidat
 import org.infinispan.notifications.cachelistener.event.CacheEntriesEvictedEvent;
 import org.infinispan.notifications.cachelistener.event.CacheEntryInvalidatedEvent;
 import org.jboss.logging.Logger;
+import org.keycloak.models.cache.infinispan.entities.AbstractRevisioned;
 import org.keycloak.models.cache.infinispan.entities.CachedClient;
 import org.keycloak.models.cache.infinispan.entities.CachedClientTemplate;
 import org.keycloak.models.cache.infinispan.entities.CachedGroup;
@@ -72,7 +73,11 @@ public class StreamRealmCache {
 
     public Long getCurrentRevision(String id) {
         Long revision = revisions.get(id);
-        if (revision == null) return UpdateCounter.current();
+        if (revision == null) revision = UpdateCounter.current();
+        // if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event
+        // so, we do this to force this.
+        String invalidationKey = "invalidation.key" + id;
+        cache.putForExternalRead(invalidationKey, new AbstractRevisioned(-1L, invalidationKey));
         return revision;
     }
 
@@ -104,6 +109,9 @@ public class StreamRealmCache {
 
     public Object invalidateObject(String id) {
         Revisioned removed = (Revisioned)cache.remove(id);
+        // if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event
+        // so, we do this to force the event.
+        cache.remove("invalidation.key" + id);
         bumpVersion(id);
         return removed;
     }
@@ -261,11 +269,32 @@ public class StreamRealmCache {
 
     @CacheEntryInvalidated
     public void cacheInvalidated(CacheEntryInvalidatedEvent<String, Object> event) {
-        if (!event.isPre()) {
-            bumpVersion(event.getKey());
+        if (event.isPre()) {
+            String key = event.getKey();
+            if (key.startsWith("invalidation.key")) {
+                // if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event
+                // so, we do this to force this.
+                String bump = key.substring("invalidation.key".length());
+                logger.tracev("bumping invalidation key {0}", bump);
+                bumpVersion(bump);
+                return;
+            }
+
+        } else {
+        //if (!event.isPre()) {
+            String key = event.getKey();
+            if (key.startsWith("invalidation.key")) {
+                // if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event
+                // so, we do this to force this.
+                String bump = key.substring("invalidation.key".length());
+                bumpVersion(bump);
+                logger.tracev("bumping invalidation key {0}", bump);
+                return;
+            }
+            bumpVersion(key);
             Object object = event.getValue();
             if (object != null) {
-                bumpVersion(event.getKey());
+                bumpVersion(key);
                 Predicate<Map.Entry<String, Revisioned>> predicate = getInvalidationPredicate(object);
                 if (predicate != null) runEvictions(predicate);
                 logger.tracev("invalidating: {0}" + object.getClass().getName());
diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java
index a98121a..7073797 100755
--- a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java
+++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java
@@ -55,8 +55,8 @@ public class ClusteredConcurrencyTest {
 
     private static final Logger log = Logger.getLogger(ClusteredConcurrencyTest.class);
 
-    private static final int DEFAULT_THREADS = 5;
-    private static final int DEFAULT_ITERATIONS = 20;
+    private static final int DEFAULT_THREADS = 10;
+    private static final int DEFAULT_ITERATIONS = 100;
 
     // If enabled only one request is allowed at the time. Useful for checking that test is working.
     private static final boolean SYNCHRONIZED = false;
diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java
index daaf5a7..d47e78f 100755
--- a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java
+++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java
@@ -48,8 +48,8 @@ public class ConcurrencyTest extends AbstractClientTest {
 
     private static final Logger log = Logger.getLogger(ConcurrencyTest.class);
 
-    private static final int DEFAULT_THREADS = 5;
-    private static final int DEFAULT_ITERATIONS = 20;
+    private static final int DEFAULT_THREADS = 10;
+    private static final int DEFAULT_ITERATIONS = 100;
 
     // If enabled only one request is allowed at the time. Useful for checking that test is working.
     private static final boolean SYNCHRONIZED = false;