keycloak-aplcache
Changes
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java 6(+1 -5)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java 2(+1 -1)
Details
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java
index 4e349f6..f9adf9b 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java
@@ -41,10 +41,6 @@ public class LastSessionRefreshChecker {
}
- // Metadata attribute, which contains the lastSessionRefresh available on remoteCache. Used in decide whether we need to write to remoteCache (DC) or not
- public static final String LAST_SESSION_REFRESH_REMOTE = "lsrr";
-
-
public SessionUpdateTask.CrossDCMessageStatus getCrossDCMessageStatus(KeycloakSession kcSession, RealmModel realm, SessionEntityWrapper<UserSessionEntity> sessionWrapper, boolean offline, int newLastSessionRefresh) {
// revokeRefreshToken always writes everything to remoteCache immediately
if (realm.isRevokeRefreshToken()) {
@@ -62,7 +58,7 @@ public class LastSessionRefreshChecker {
return SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED;
}
- Integer lsrr = sessionWrapper.getLocalMetadataNoteInt(LAST_SESSION_REFRESH_REMOTE);
+ Integer lsrr = sessionWrapper.getLocalMetadataNoteInt(UserSessionEntity.LAST_SESSION_REFRESH_REMOTE);
if (lsrr == null) {
logger.warnf("Not available lsrr note on user session %s.", sessionWrapper.getEntity().getId());
return SessionUpdateTask.CrossDCMessageStatus.SYNC;
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java
index d8a1e6c..394fbd8 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java
@@ -39,7 +39,7 @@ public class UserSessionEntity extends SessionEntity {
public static final Logger logger = Logger.getLogger(UserSessionEntity.class);
- // Tracks the "lastSessionRefresh" from userSession entity from remote cache
+ // Metadata attribute, which contains the lastSessionRefresh available on remoteCache. Used in decide whether we need to write to remoteCache (DC) or not
public static final String LAST_SESSION_REFRESH_REMOTE = "lsrr";
private String user;
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
index 424a2f7..60cda43 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
@@ -17,6 +17,7 @@
package org.keycloak.models.sessions.infinispan.remotestore;
+import org.keycloak.common.util.Time;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -32,6 +33,7 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
+import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -76,11 +78,12 @@ public class RemoteCacheInvoker {
logger.debugf("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key);
- runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, session);
+ runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, sessionWrapper);
}
- private <S extends SessionEntity> void runOnRemoteCache(RemoteCache remoteCache, long maxIdleMs, String key, SessionUpdateTask<S> task, S session) {
+ private <S extends SessionEntity> void runOnRemoteCache(RemoteCache remoteCache, long maxIdleMs, String key, SessionUpdateTask<S> task, SessionEntityWrapper<S> sessionWrapper) {
+ S session = sessionWrapper.getEntity();
SessionUpdateTask.CacheOperation operation = task.getOperation(session);
switch (operation) {
@@ -92,12 +95,14 @@ public class RemoteCacheInvoker {
remoteCache.put(key, session, task.getLifespanMs(), TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
break;
case ADD_IF_ABSENT:
+ final int currentTime = Time.currentTime();
SessionEntity existing = (SessionEntity) remoteCache
.withFlags(Flag.FORCE_RETURN_VALUE)
.putIfAbsent(key, session, -1, TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
if (existing != null) {
throw new IllegalStateException("There is already existing value in cache for key " + key);
}
+ sessionWrapper.putLocalMetadataNoteInt(UserSessionEntity.LAST_SESSION_REFRESH_REMOTE, currentTime);
break;
case REPLACE:
replace(remoteCache, task.getLifespanMs(), maxIdleMs, key, task);
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java
index fa7719e..d29e220 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java
@@ -36,6 +36,9 @@ import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
+import java.util.Random;
+import java.util.logging.Level;
+import org.infinispan.client.hotrod.VersionedValue;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -85,25 +88,48 @@ public class RemoteCacheSessionListener {
if (shouldUpdateLocalCache(event.getType(), key, event.isCommandRetried())) {
- replaceRemoteEntityInCache(key);
+ replaceRemoteEntityInCache(key, event.getVersion());
}
}
+ private static final int MAXIMUM_REPLACE_RETRIES = 10;
- private void replaceRemoteEntityInCache(String key) {
+ private void replaceRemoteEntityInCache(String key, long eventVersion) {
// TODO can be optimized and remoteSession sent in the event itself?
- SessionEntityWrapper localEntityWrapper = cache.get(key);
- SessionEntity remoteSession = (SessionEntity) remoteCache.get(key);
+ boolean replaced = false;
+ int replaceRetries = 0;
+ int sleepInterval = 25;
+ do {
+ replaceRetries++;
+
+ SessionEntityWrapper localEntityWrapper = cache.get(key);
+ VersionedValue remoteSessionVersioned = remoteCache.getVersioned(key);
+ if (remoteSessionVersioned == null || remoteSessionVersioned.getVersion() < eventVersion) {
+ try {
+ logger.debugf("Got replace remote entity event prematurely, will try again. Event version: %d, got: %d",
+ eventVersion, remoteSessionVersioned == null ? -1 : remoteSessionVersioned.getVersion());
+ Thread.sleep(new Random().nextInt(sleepInterval)); // using exponential backoff
+ continue;
+ } catch (InterruptedException ex) {
+ continue;
+ } finally {
+ sleepInterval = sleepInterval << 1;
+ }
+ }
+ SessionEntity remoteSession = (SessionEntity) remoteCache.get(key);
+
+ logger.debugf("Read session%s. Entity read from remote cache: %s", replaceRetries > 1 ? "" : " again", remoteSession);
+
+ SessionEntityWrapper sessionWrapper = remoteSession.mergeRemoteEntityWithLocalEntity(localEntityWrapper);
- if (logger.isDebugEnabled()) {
- logger.debugf("Read session. Entity read from remote cache: %s", remoteSession.toString());
- }
-
- SessionEntityWrapper sessionWrapper = remoteSession.mergeRemoteEntityWithLocalEntity(localEntityWrapper);
+ // We received event from remoteCache, so we won't update it back
+ replaced = cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD, Flag.IGNORE_RETURN_VALUES)
+ .replace(key, localEntityWrapper, sessionWrapper);
- // We received event from remoteCache, so we won't update it back
- cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD, Flag.IGNORE_RETURN_VALUES)
- .replace(key, sessionWrapper);
+ if (! replaced) {
+ logger.debugf("Did not succeed in merging sessions, will try again: %s", remoteSession);
+ }
+ } while (replaceRetries < MAXIMUM_REPLACE_RETRIES && ! replaced);
}