keycloak-uncached
Changes
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java 10(+9 -1)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java 11(+10 -1)
Details
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
index c6d3814..3eb5518 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
@@ -30,6 +30,7 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.sessions.infinispan.CacheDecorators;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
+import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -205,9 +206,12 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity) {
boolean replaced = false;
+ int iteration = 0;
V session = oldVersionEntity.getEntity();
- while (!replaced) {
+ while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
+ iteration++;
+
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersionEntity.getLocalMetadata());
// Atomic cluster-aware replace
@@ -236,6 +240,10 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
}
}
+ if (!replaced) {
+ logger.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName());
+ }
+
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
index e32b919..15c9e4f 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
@@ -34,6 +34,7 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
+import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -132,7 +133,11 @@ public class RemoteCacheInvoker {
private <K, V extends SessionEntity> void replace(RemoteCache<K, SessionEntityWrapper<V>> remoteCache, long lifespanMs, long maxIdleMs, K key, SessionUpdateTask<V> task) {
boolean replaced = false;
- while (!replaced) {
+ int iteration = 0;
+
+ while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
+ iteration++;
+
VersionedValue<SessionEntityWrapper<V>> versioned = remoteCache.getVersioned(key);
if (versioned == null) {
logger.warnf("Not found entity to replace for key '%s'", key);
@@ -159,6 +164,10 @@ public class RemoteCacheInvoker {
}
}
}
+
+ if (!replaced) {
+ logger.warnf("Failed to replace entity '%s' in remote cache '%s'", key, remoteCache.getName());
+ }
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/InfinispanUtil.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/InfinispanUtil.java
index dcff43d..0566b6b 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/InfinispanUtil.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/InfinispanUtil.java
@@ -34,6 +34,8 @@ import org.keycloak.models.KeycloakSession;
*/
public class InfinispanUtil {
+ public static final int MAXIMUM_REPLACE_RETRIES = 25;
+
// See if we have RemoteStore (external JDG) configured for cross-Data-Center scenario
public static Set<RemoteStore> getRemoteStores(Cache ispnCache) {
return ispnCache.getAdvancedCache().getComponentRegistry().getComponent(PersistenceManager.class).getStores(RemoteStore.class);
diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java
index fff93e3..12a3ee5 100644
--- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java
@@ -27,6 +27,7 @@ import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.context.Flag;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
@@ -77,40 +78,60 @@ public class DistributedCacheConcurrentWritesTest {
clientSession.setTimestamp(1234);
session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId());
- cache1.put("123", session);
+ try {
+ for (int i = 0; i < 10; i++) {
+ testConcurrentReplaceWithRemove("key-" + i, session, cache1, cache2);
+ }
+ } finally {
+
+ // Kill JVM
+ cache1.getCache().stop();
+ cache2.getCache().stop();
+ cache1.getCache().getCacheManager().stop();
+ cache2.getCache().getCacheManager().stop();
+
+ System.out.println("Managers killed");
+ }
+ }
+
+
+ // Reproducer for KEYCLOAK-7443 and KEYCLOAK-7489. The infinite loop can happen if cache.replace(key, old, new) is called and entity was removed on one cluster node in the meantime
+ private static void testConcurrentReplaceWithRemove(String key, UserSessionEntity session, CacheWrapper<String, UserSessionEntity> cache1,
+ CacheWrapper<String, UserSessionEntity> cache2) throws InterruptedException {
+ cache1.put(key, session);
// Create 2 workers for concurrent write and start them
- Worker worker1 = new Worker(1, cache1);
- Worker worker2 = new Worker(2, cache2);
+ Worker worker1 = new Worker(1, cache1, key);
+ Worker worker2 = new Worker(2, cache2, key);
long start = System.currentTimeMillis();
- System.out.println("Started clustering test");
+ System.out.println("Started clustering test for key " + key);
worker1.start();
//worker1.join();
worker2.start();
+ Thread.sleep(1000);
+ // Try to remove the entity after some sleep time.
+ cache1.wrappedCache.getAdvancedCache()
+ .withFlags(Flag.CACHE_MODE_LOCAL)
+ .remove(key);
+
worker1.join();
worker2.join();
long took = System.currentTimeMillis() - start;
- session = cache1.get("123").getEntity();
- System.out.println("Took: " + took + " ms. Notes count: " + session.getNotes().size() + ", failedReplaceCounter: " + failedReplaceCounter.get()
- + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
-
- // JGroups statistics
- JChannel channel = (JChannel)((JGroupsTransport)cache1.wrappedCache.getAdvancedCache().getRpcManager().getTransport()).getChannel();
- System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 +
- ", received messages: " + channel.getReceivedMessages());
-
- // Kill JVM
- cache1.getCache().stop();
- cache2.getCache().stop();
- cache1.getCache().getCacheManager().stop();
- cache2.getCache().getCacheManager().stop();
-
- System.out.println("Managers killed");
+
+ System.out.println("Test finished for key '" + key + "'. Took: " + took + " ms");
+
+// System.out.println("Took: " + took + " ms for key . Notes count: " + session.getNotes().size() + ", failedReplaceCounter: " + failedReplaceCounter.get()
+// + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
+
+// // JGroups statistics
+// JChannel channel = (JChannel)((JGroupsTransport)cache1.wrappedCache.getAdvancedCache().getRpcManager().getTransport()).getChannel();
+// System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 +
+// ", received messages: " + channel.getReceivedMessages());
}
@@ -118,10 +139,13 @@ public class DistributedCacheConcurrentWritesTest {
private final CacheWrapper<String, UserSessionEntity> cache;
private final int threadId;
+ private final String key;
- public Worker(int threadId, CacheWrapper<String, UserSessionEntity> cache) {
+ public Worker(int threadId, CacheWrapper<String, UserSessionEntity> cache, String key) {
this.threadId = threadId;
this.cache = cache;
+ this.key = key;
+ setName("th-" + key + "-" + threadId);
}
@Override
@@ -131,16 +155,25 @@ public class DistributedCacheConcurrentWritesTest {
String noteKey = "n-" + threadId + "-" + i;
+ // This code can be used to reproduce infinite loop ( KEYCLOAK-7443 )
+// boolean replaced = false;
+// while (!replaced) {
+// SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get(key);
+// oldWrapped.getEntity().getNotes().put(noteKey, "someVal");
+// replaced = cacheReplace(oldWrapped, oldWrapped.getEntity());
+// }
+
+ int count = 0;
boolean replaced = false;
- while (!replaced) {
- SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get("123");
- UserSessionEntity oldSession = oldWrapped.getEntity();
- //UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession);
- UserSessionEntity clone = oldSession;
-
- clone.getNotes().put(noteKey, "someVal");
- //cache.replace("123", clone);
- replaced = cacheReplace(oldWrapped, clone);
+ while (!replaced && count < 25) {
+ count++;
+ SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get(key);
+ oldWrapped.getEntity().getNotes().put(noteKey, "someVal");
+ replaced = cacheReplace(oldWrapped, oldWrapped.getEntity());
+ }
+ if (!replaced) {
+ System.err.println("FAILED TO REPLACE ENTITY: " + key);
+ return;
}
}
@@ -148,8 +181,8 @@ public class DistributedCacheConcurrentWritesTest {
private boolean cacheReplace(SessionEntityWrapper<UserSessionEntity> oldSession, UserSessionEntity newSession) {
try {
- boolean replaced = cache.replace("123", oldSession, newSession);
- //cache.replace("123", newSession);
+ boolean replaced = cache.replace(key, oldSession, newSession);
+ //cache.replace(key, newSession);
if (!replaced) {
failedReplaceCounter.incrementAndGet();
//return false;
@@ -239,7 +272,7 @@ public class DistributedCacheConcurrentWritesTest {
ConfigurationBuilder distConfigBuilder = new ConfigurationBuilder();
if (clustered) {
distConfigBuilder.clustering().cacheMode(async ? CacheMode.DIST_ASYNC : CacheMode.DIST_SYNC);
- distConfigBuilder.clustering().hash().numOwners(1);
+ distConfigBuilder.clustering().hash().numOwners(2);
// Disable L1 cache
distConfigBuilder.clustering().hash().l1().enabled(false);