keycloak-aplcache

KEYCLOAK-4187 Fix warnings

8/7/2017 11:29:04 AM

Details

diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java
index 4e349f6..f9adf9b 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java
@@ -41,10 +41,6 @@ public class LastSessionRefreshChecker {
     }
 
 
-    // Metadata attribute, which contains the lastSessionRefresh available on remoteCache. Used in decide whether we need to write to remoteCache (DC) or not
-    public static final String LAST_SESSION_REFRESH_REMOTE = "lsrr";
-
-
     public SessionUpdateTask.CrossDCMessageStatus getCrossDCMessageStatus(KeycloakSession kcSession, RealmModel realm, SessionEntityWrapper<UserSessionEntity> sessionWrapper, boolean offline, int newLastSessionRefresh) {
         // revokeRefreshToken always writes everything to remoteCache immediately
         if (realm.isRevokeRefreshToken()) {
@@ -62,7 +58,7 @@ public class LastSessionRefreshChecker {
             return SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED;
         }
 
-        Integer lsrr = sessionWrapper.getLocalMetadataNoteInt(LAST_SESSION_REFRESH_REMOTE);
+        Integer lsrr = sessionWrapper.getLocalMetadataNoteInt(UserSessionEntity.LAST_SESSION_REFRESH_REMOTE);
         if (lsrr == null) {
             logger.warnf("Not available lsrr note on user session %s.", sessionWrapper.getEntity().getId());
             return SessionUpdateTask.CrossDCMessageStatus.SYNC;
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java
index d8a1e6c..394fbd8 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java
@@ -39,7 +39,7 @@ public class UserSessionEntity extends SessionEntity {
 
     public static final Logger logger = Logger.getLogger(UserSessionEntity.class);
 
-    // Tracks the "lastSessionRefresh" from userSession entity from remote cache
+    // Metadata attribute, which contains the lastSessionRefresh available on remoteCache. Used in decide whether we need to write to remoteCache (DC) or not
     public static final String LAST_SESSION_REFRESH_REMOTE = "lsrr";
 
     private String user;
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
index 424a2f7..60cda43 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
@@ -17,6 +17,7 @@
 
 package org.keycloak.models.sessions.infinispan.remotestore;
 
+import org.keycloak.common.util.Time;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -32,6 +33,7 @@ import org.keycloak.models.RealmModel;
 import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
 import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
 import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
+import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
 
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -76,11 +78,12 @@ public class RemoteCacheInvoker {
 
         logger.debugf("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key);
 
-        runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, session);
+        runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, sessionWrapper);
     }
 
 
-    private <S extends SessionEntity> void runOnRemoteCache(RemoteCache remoteCache, long maxIdleMs, String key, SessionUpdateTask<S> task, S session) {
+    private <S extends SessionEntity> void runOnRemoteCache(RemoteCache remoteCache, long maxIdleMs, String key, SessionUpdateTask<S> task, SessionEntityWrapper<S> sessionWrapper) {
+        S session = sessionWrapper.getEntity();
         SessionUpdateTask.CacheOperation operation = task.getOperation(session);
 
         switch (operation) {
@@ -92,12 +95,14 @@ public class RemoteCacheInvoker {
                 remoteCache.put(key, session, task.getLifespanMs(), TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
                 break;
             case ADD_IF_ABSENT:
+                final int currentTime = Time.currentTime();
                 SessionEntity existing = (SessionEntity) remoteCache
                         .withFlags(Flag.FORCE_RETURN_VALUE)
                         .putIfAbsent(key, session, -1, TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
                 if (existing != null) {
                     throw new IllegalStateException("There is already existing value in cache for key " + key);
                 }
+                sessionWrapper.putLocalMetadataNoteInt(UserSessionEntity.LAST_SESSION_REFRESH_REMOTE, currentTime);
                 break;
             case REPLACE:
                 replace(remoteCache, task.getLifespanMs(), maxIdleMs, key, task);
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java
index fa7719e..d29e220 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java
@@ -36,6 +36,9 @@ import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
 import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
 import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
 import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
+import java.util.Random;
+import java.util.logging.Level;
+import org.infinispan.client.hotrod.VersionedValue;
 
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -85,25 +88,48 @@ public class RemoteCacheSessionListener  {
 
         if (shouldUpdateLocalCache(event.getType(), key, event.isCommandRetried())) {
 
-            replaceRemoteEntityInCache(key);
+            replaceRemoteEntityInCache(key, event.getVersion());
         }
     }
 
+    private static final int MAXIMUM_REPLACE_RETRIES = 10;
 
-    private void replaceRemoteEntityInCache(String key) {
+    private void replaceRemoteEntityInCache(String key, long eventVersion) {
         // TODO can be optimized and remoteSession sent in the event itself?
-        SessionEntityWrapper localEntityWrapper = cache.get(key);
-        SessionEntity remoteSession = (SessionEntity) remoteCache.get(key);
+        boolean replaced = false;
+        int replaceRetries = 0;
+        int sleepInterval = 25;
+        do {
+            replaceRetries++;
+            
+            SessionEntityWrapper localEntityWrapper = cache.get(key);
+            VersionedValue remoteSessionVersioned = remoteCache.getVersioned(key);
+            if (remoteSessionVersioned == null || remoteSessionVersioned.getVersion() < eventVersion) {
+                try {
+                    logger.debugf("Got replace remote entity event prematurely, will try again. Event version: %d, got: %d",
+                      eventVersion, remoteSessionVersioned == null ? -1 : remoteSessionVersioned.getVersion());
+                    Thread.sleep(new Random().nextInt(sleepInterval));  // using exponential backoff
+                    continue;
+                } catch (InterruptedException ex) {
+                    continue;
+                } finally {
+                    sleepInterval = sleepInterval << 1;
+                }
+            }
+            SessionEntity remoteSession = (SessionEntity) remoteCache.get(key);
+
+            logger.debugf("Read session%s. Entity read from remote cache: %s", replaceRetries > 1 ? "" : " again", remoteSession);
+
+            SessionEntityWrapper sessionWrapper = remoteSession.mergeRemoteEntityWithLocalEntity(localEntityWrapper);
 
-        if (logger.isDebugEnabled()) {
-            logger.debugf("Read session. Entity read from remote cache: %s", remoteSession.toString());
-        }
-
-        SessionEntityWrapper sessionWrapper = remoteSession.mergeRemoteEntityWithLocalEntity(localEntityWrapper);
+            // We received event from remoteCache, so we won't update it back
+            replaced = cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD, Flag.IGNORE_RETURN_VALUES)
+                    .replace(key, localEntityWrapper, sessionWrapper);
 
-        // We received event from remoteCache, so we won't update it back
-        cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD, Flag.IGNORE_RETURN_VALUES)
-                .replace(key, sessionWrapper);
+            if (! replaced) {
+                logger.debugf("Did not succeed in merging sessions, will try again: %s", remoteSession);
+            }
+        } while (replaceRetries < MAXIMUM_REPLACE_RETRIES && ! replaced);
     }