keycloak-uncached

Details

diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
index c6d3814..3eb5518 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
@@ -30,6 +30,7 @@ import org.keycloak.models.RealmModel;
 import org.keycloak.models.sessions.infinispan.CacheDecorators;
 import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
 import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
+import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
 
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -205,9 +206,12 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
 
     private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity) {
         boolean replaced = false;
+        int iteration = 0;
         V session = oldVersionEntity.getEntity();
 
-        while (!replaced) {
+        while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
+            iteration++;
+
             SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersionEntity.getLocalMetadata());
 
             // Atomic cluster-aware replace
@@ -236,6 +240,10 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
             }
         }
 
+        if (!replaced) {
+            logger.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName());
+        }
+
     }
 
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
index e32b919..15c9e4f 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
@@ -34,6 +34,7 @@ import org.keycloak.models.RealmModel;
 import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
 import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
 import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
+import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
 
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -132,7 +133,11 @@ public class RemoteCacheInvoker {
 
     private <K, V extends SessionEntity> void replace(RemoteCache<K, SessionEntityWrapper<V>> remoteCache, long lifespanMs, long maxIdleMs, K key, SessionUpdateTask<V> task) {
         boolean replaced = false;
-        while (!replaced) {
+        int iteration = 0;
+
+        while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
+            iteration++;
+
             VersionedValue<SessionEntityWrapper<V>> versioned = remoteCache.getVersioned(key);
             if (versioned == null) {
                 logger.warnf("Not found entity to replace for key '%s'", key);
@@ -159,6 +164,10 @@ public class RemoteCacheInvoker {
                 }
             }
         }
+
+        if (!replaced) {
+            logger.warnf("Failed to replace entity '%s' in remote cache '%s'", key, remoteCache.getName());
+        }
     }
 
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/InfinispanUtil.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/InfinispanUtil.java
index dcff43d..0566b6b 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/InfinispanUtil.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/InfinispanUtil.java
@@ -34,6 +34,8 @@ import org.keycloak.models.KeycloakSession;
  */
 public class InfinispanUtil {
 
+    public static final int MAXIMUM_REPLACE_RETRIES = 25;
+
     // See if we have RemoteStore (external JDG) configured for cross-Data-Center scenario
     public static Set<RemoteStore> getRemoteStores(Cache ispnCache) {
         return ispnCache.getAdvancedCache().getComponentRegistry().getComponent(PersistenceManager.class).getStores(RemoteStore.class);
diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java
index fff93e3..12a3ee5 100644
--- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java
@@ -27,6 +27,7 @@ import org.infinispan.configuration.cache.CacheMode;
 import org.infinispan.configuration.cache.Configuration;
 import org.infinispan.configuration.cache.ConfigurationBuilder;
 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.context.Flag;
 import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
@@ -77,40 +78,60 @@ public class DistributedCacheConcurrentWritesTest {
         clientSession.setTimestamp(1234);
         session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId());
 
-        cache1.put("123", session);
+        try {
+            for (int i = 0; i < 10; i++) {
+                testConcurrentReplaceWithRemove("key-" + i, session, cache1, cache2);
+            }
+        } finally {
+
+            // Kill JVM
+            cache1.getCache().stop();
+            cache2.getCache().stop();
+            cache1.getCache().getCacheManager().stop();
+            cache2.getCache().getCacheManager().stop();
+
+            System.out.println("Managers killed");
+        }
+    }
+
+
+    // Reproducer for KEYCLOAK-7443 and KEYCLOAK-7489. The infinite loop can happen if cache.replace(key, old, new) is called and entity was removed on one cluster node in the meantime
+    private static void testConcurrentReplaceWithRemove(String key, UserSessionEntity session, CacheWrapper<String, UserSessionEntity> cache1,
+                                                 CacheWrapper<String, UserSessionEntity> cache2) throws InterruptedException {
+        cache1.put(key, session);
 
         // Create 2 workers for concurrent write and start them
-        Worker worker1 = new Worker(1, cache1);
-        Worker worker2 = new Worker(2, cache2);
+        Worker worker1 = new Worker(1, cache1, key);
+        Worker worker2 = new Worker(2, cache2, key);
 
         long start = System.currentTimeMillis();
 
-        System.out.println("Started clustering test");
+        System.out.println("Started clustering test for key " + key);
 
         worker1.start();
         //worker1.join();
         worker2.start();
 
+        Thread.sleep(1000);
+        // Try to remove the entity after some sleep time.
+        cache1.wrappedCache.getAdvancedCache()
+                .withFlags(Flag.CACHE_MODE_LOCAL)
+                .remove(key);
+
         worker1.join();
         worker2.join();
 
         long took = System.currentTimeMillis() - start;
-        session = cache1.get("123").getEntity();
-        System.out.println("Took: " + took + " ms. Notes count: " + session.getNotes().size() + ", failedReplaceCounter: " + failedReplaceCounter.get()
-                + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
-
-        // JGroups statistics
-        JChannel channel = (JChannel)((JGroupsTransport)cache1.wrappedCache.getAdvancedCache().getRpcManager().getTransport()).getChannel();
-        System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 +
-                ", received messages: " + channel.getReceivedMessages());
-
-        // Kill JVM
-        cache1.getCache().stop();
-        cache2.getCache().stop();
-        cache1.getCache().getCacheManager().stop();
-        cache2.getCache().getCacheManager().stop();
-
-        System.out.println("Managers killed");
+
+        System.out.println("Test finished for key '" + key + "'. Took: " + took + " ms");
+
+//        System.out.println("Took: " + took + " ms for key . Notes count: " + session.getNotes().size() + ", failedReplaceCounter: " + failedReplaceCounter.get()
+//                + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
+
+//        // JGroups statistics
+//        JChannel channel = (JChannel)((JGroupsTransport)cache1.wrappedCache.getAdvancedCache().getRpcManager().getTransport()).getChannel();
+//        System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 +
+//                ", received messages: " + channel.getReceivedMessages());
     }
 
 
@@ -118,10 +139,13 @@ public class DistributedCacheConcurrentWritesTest {
 
         private final CacheWrapper<String, UserSessionEntity> cache;
         private final int threadId;
+        private final String key;
 
-        public Worker(int threadId, CacheWrapper<String, UserSessionEntity> cache) {
+        public Worker(int threadId, CacheWrapper<String, UserSessionEntity> cache, String key) {
             this.threadId = threadId;
             this.cache = cache;
+            this.key = key;
+            setName("th-" + key + "-" + threadId);
         }
 
         @Override
@@ -131,16 +155,25 @@ public class DistributedCacheConcurrentWritesTest {
 
                 String noteKey = "n-" + threadId + "-" + i;
 
+                  // This code can be used to reproduce infinite loop ( KEYCLOAK-7443 )
+//                boolean replaced = false;
+//                while (!replaced) {
+//                    SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get(key);
+//                    oldWrapped.getEntity().getNotes().put(noteKey, "someVal");
+//                    replaced = cacheReplace(oldWrapped, oldWrapped.getEntity());
+//                }
+
+                int count = 0;
                 boolean replaced = false;
-                while (!replaced) {
-                    SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get("123");
-                    UserSessionEntity oldSession = oldWrapped.getEntity();
-                    //UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession);
-                    UserSessionEntity clone = oldSession;
-
-                    clone.getNotes().put(noteKey, "someVal");
-                    //cache.replace("123", clone);
-                    replaced = cacheReplace(oldWrapped, clone);
+                while (!replaced && count < 25) {
+                    count++;
+                    SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get(key);
+                    oldWrapped.getEntity().getNotes().put(noteKey, "someVal");
+                    replaced = cacheReplace(oldWrapped, oldWrapped.getEntity());
+                }
+                if (!replaced) {
+                    System.err.println("FAILED TO REPLACE ENTITY: " + key);
+                    return;
                 }
             }
 
@@ -148,8 +181,8 @@ public class DistributedCacheConcurrentWritesTest {
 
         private boolean cacheReplace(SessionEntityWrapper<UserSessionEntity> oldSession, UserSessionEntity newSession) {
             try {
-                boolean replaced = cache.replace("123", oldSession, newSession);
-                //cache.replace("123", newSession);
+                boolean replaced = cache.replace(key, oldSession, newSession);
+                //cache.replace(key, newSession);
                 if (!replaced) {
                     failedReplaceCounter.incrementAndGet();
                     //return false;
@@ -239,7 +272,7 @@ public class DistributedCacheConcurrentWritesTest {
         ConfigurationBuilder distConfigBuilder = new ConfigurationBuilder();
         if (clustered) {
             distConfigBuilder.clustering().cacheMode(async ? CacheMode.DIST_ASYNC : CacheMode.DIST_SYNC);
-            distConfigBuilder.clustering().hash().numOwners(1);
+            distConfigBuilder.clustering().hash().numOwners(2);
 
             // Disable L1 cache
             distConfigBuilder.clustering().hash().l1().enabled(false);