keycloak-uncached

Details

diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
index d3bcacc..43bb2b3 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
@@ -127,7 +127,16 @@ public class InfinispanChangelogBasedTransaction<S extends SessionEntity> extend
 
             return wrappedEntity;
         } else {
-            return myUpdates.getEntityWrapper();
+            S entity = myUpdates.getEntityWrapper().getEntity();
+
+            // If entity is scheduled for remove, we don't return it.
+            boolean scheduledForRemove = myUpdates.getUpdateTasks().stream().filter((SessionUpdateTask task) -> {
+
+                return task.getOperation(entity) == SessionUpdateTask.CacheOperation.REMOVE;
+
+            }).findFirst().isPresent();
+
+            return scheduledForRemove ? null : myUpdates.getEntityWrapper();
         }
     }
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
index 8d04a50..7f10f9e 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
@@ -321,13 +321,26 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
 
                     // Recursion. We should have it locally now
                     return getUserSessionWithPredicate(realm, id, offline, predicate);
+                } else {
+                    log.debugf("getUserSessionWithPredicate(%s): found, but predicate doesn't pass", id);
+
+                    return null;
                 }
+            } else {
+                log.debugf("getUserSessionWithPredicate(%s): not found", id);
+
+                // Session not available on remoteCache. Was already removed there. So removing locally too.
+                // TODO: Can be optimized to skip calling remoteCache.remove
+                removeUserSession(realm, userSession);
+
+                return null;
             }
-        }
+        } else {
 
-        log.debugf("getUserSessionWithPredicate(%s): not found", id);
+            log.debugf("getUserSessionWithPredicate(%s): remote cache not available", id);
 
-        return null;
+            return null;
+        }
     }
 
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java
index 3d31122..60c34bf 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java
@@ -87,8 +87,8 @@ public class KcRemoteStore extends RemoteStore {
     public boolean delete(Object key) throws PersistenceException {
         logger.debugf("Calling delete for key '%s' on cache '%s'", key, cacheName);
 
-        // Optimization - we don't need to know the previous value. Also it's ok to trigger asynchronously
-        getRemoteCache().removeAsync(key);
+        // Optimization - we don't need to know the previous value.
+        getRemoteCache().remove(key);
 
         return true;
     }
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
index f18d8d3..b524885 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
@@ -28,17 +28,11 @@ import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
 import org.infinispan.client.hotrod.annotation.ClientListener;
 import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
 import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
-import org.infinispan.configuration.cache.Configuration;
-import org.infinispan.configuration.cache.ConfigurationBuilder;
-import org.infinispan.configuration.global.GlobalConfigurationBuilder;
-import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.persistence.manager.PersistenceManager;
 import org.infinispan.persistence.remote.RemoteStore;
-import org.infinispan.persistence.remote.configuration.ExhaustedAction;
 import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
 import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
 
@@ -128,7 +122,7 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
     }
 
     private static Worker createWorker(int threadId) {
-        EmbeddedCacheManager manager = createManager(threadId);
+        EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
         Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
 
         System.out.println("Retrieved cache: " + threadId);
@@ -140,56 +134,6 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
         return new Worker(cache, threadId);
     }
 
-    private static EmbeddedCacheManager createManager(int threadId) {
-        System.setProperty("java.net.preferIPv4Stack", "true");
-        System.setProperty("jgroups.tcp.port", "53715");
-        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
-
-        boolean clustered = false;
-        boolean async = false;
-        boolean allowDuplicateJMXDomains = true;
-
-        if (clustered) {
-            gcb = gcb.clusteredDefault();
-            gcb.transport().clusterName("test-clustering");
-        }
-
-        gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
-
-        EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
-
-        Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
-
-        cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
-        return cacheManager;
-
-    }
-
-    private static Configuration getCacheBackedByRemoteStore(int threadId) {
-        ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
-
-        int port = threadId==1 ? 12232 : 13232;
-        //int port = 12232;
-
-        return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
-                .fetchPersistentState(false)
-                .ignoreModifications(false)
-                .purgeOnStartup(false)
-                .preload(false)
-                .shared(true)
-                .remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
-                .rawValues(true)
-                .forceReturnValues(false)
-                .addServer()
-                .host("localhost")
-                .port(port)
-                .connectionPool()
-                .maxActive(20)
-                .exhaustedAction(ExhaustedAction.CREATE_NEW)
-                .async()
-                .   enabled(false).build();
-    }
-
 
     @ClientListener
     public static class HotRodListener {
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
index df1b80e..9c23452 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
@@ -83,7 +83,7 @@ public class ConcurrencyJDGRemoteCacheTest {
     }
 
     private static Worker createWorker(int threadId) {
-        EmbeddedCacheManager manager = createManager(threadId);
+        EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
         Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
 
         System.out.println("Retrieved cache: " + threadId);
@@ -95,56 +95,6 @@ public class ConcurrencyJDGRemoteCacheTest {
         return new Worker(cache, threadId);
     }
 
-    private static EmbeddedCacheManager createManager(int threadId) {
-        System.setProperty("java.net.preferIPv4Stack", "true");
-        System.setProperty("jgroups.tcp.port", "53715");
-        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
-
-        boolean clustered = false;
-        boolean async = false;
-        boolean allowDuplicateJMXDomains = true;
-
-        if (clustered) {
-            gcb = gcb.clusteredDefault();
-            gcb.transport().clusterName("test-clustering");
-        }
-
-        gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
-
-        EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
-
-        Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
-
-        cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
-        return cacheManager;
-
-    }
-
-    private static Configuration getCacheBackedByRemoteStore(int threadId) {
-        ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
-
-        int port = threadId==1 ? 12232 : 13232;
-        //int port = 12232;
-
-        return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
-                .fetchPersistentState(false)
-                .ignoreModifications(false)
-                .purgeOnStartup(false)
-                .preload(false)
-                .shared(true)
-                .remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
-                .rawValues(true)
-                .forceReturnValues(false)
-                .addServer()
-                    .host("localhost")
-                    .port(port)
-                .connectionPool()
-                    .maxActive(20)
-                    .exhaustedAction(ExhaustedAction.CREATE_NEW)
-                .async()
-                .   enabled(false).build();
-    }
-
 
     @ClientListener
     public static class HotRodListener {
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java
new file mode 100644
index 0000000..ff4c3ce
--- /dev/null
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java
@@ -0,0 +1,292 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.infinispan.Cache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
+import org.infinispan.client.hotrod.annotation.ClientListener;
+import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
+import org.infinispan.context.Flag;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.keycloak.common.util.Time;
+import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
+import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
+import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
+import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
+
+/**
+ * Check that removing of session from remoteCache is session immediately removed on remoteCache in other DC. This is true.
+ *
+ * Also check that listeners are executed asynchronously with some delay.
+ *
+ * Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ConcurrencyJDGRemoveSessionTest {
+
+    protected static final Logger logger = Logger.getLogger(ConcurrencyJDGRemoveSessionTest.class);
+
+    private static final int ITERATIONS = 10000;
+
+    private static RemoteCache remoteCache1;
+    private static RemoteCache remoteCache2;
+
+    private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
+    private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
+
+    private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0);
+    private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0);
+
+    //private static Map<String, EntryInfo> state = new HashMap<>();
+
+    public static void main(String[] args) throws Exception {
+        Cache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createManager(1).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
+        Cache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createManager(2).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
+
+        // Create caches, listeners and finally worker threads
+        Thread worker1 = createWorker(cache1, 1);
+        Thread worker2 = createWorker(cache2, 2);
+
+        // Create 100 initial sessions
+        for (int i=0 ; i<ITERATIONS ; i++) {
+            String sessionId = String.valueOf(i);
+            SessionEntityWrapper<UserSessionEntity> wrappedSession = createSessionEntity(sessionId);
+            cache1.put(sessionId, wrappedSession);
+        }
+
+        logger.info("SESSIONS CREATED");
+
+        // Create 100 initial sessions
+        for (int i=0 ; i<ITERATIONS ; i++) {
+            String sessionId = String.valueOf(i);
+            SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
+            Assert.assertNotNull("Loaded wrapper for key " + sessionId, loadedWrapper);
+        }
+
+        logger.info("SESSIONS AVAILABLE ON DC2");
+
+
+        long start = System.currentTimeMillis();
+
+        try {
+            // Just running in current thread
+            worker1.run();
+
+            logger.info("SESSIONS REMOVED");
+
+            //Thread.sleep(5000);
+
+            // Doing it in opposite direction to ensure that newer are checked first.
+            // This us currently FAILING (expected) as listeners are executed asynchronously.
+            for (int i=ITERATIONS-1 ; i>=0 ; i--) {
+                String sessionId = String.valueOf(i);
+
+                logger.infof("Before call cache2.get: %s", sessionId);
+
+                SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
+                Assert.assertNull("Loaded wrapper not null for key " + sessionId, loadedWrapper);
+            }
+
+            logger.info("SESSIONS NOT AVAILABLE ON DC2");
+
+
+            //        // Start and join workers
+//        worker1.start();
+//        worker2.start();
+//
+//        worker1.join();
+//        worker2.join();
+
+        } finally {
+            Thread.sleep(2000);
+
+            // Finish JVM
+            cache1.getCacheManager().stop();
+            cache2.getCacheManager().stop();
+        }
+
+        long took = System.currentTimeMillis() - start;
+
+//        // Output
+//        for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
+//            System.out.println(entry.getKey() + ":::" + entry.getValue());
+//            worker1.cache.remove(entry.getKey());
+//        }
+
+//        System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
+//                ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
+//                ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() );
+//
+//        System.out.println("Sleeping before other report");
+//
+//        Thread.sleep(1000);
+//
+//        System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
+//                ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
+//                ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
+
+
+    }
+
+
+    private static SessionEntityWrapper<UserSessionEntity> createSessionEntity(String sessionId) {
+        // Create 100 initial sessions
+        UserSessionEntity session = new UserSessionEntity();
+        session.setId(sessionId);
+        session.setRealm("foo");
+        session.setBrokerSessionId("!23123123");
+        session.setBrokerUserId(null);
+        session.setUser("foo");
+        session.setLoginUsername("foo");
+        session.setIpAddress("123.44.143.178");
+        session.setStarted(Time.currentTime());
+        session.setLastSessionRefresh(Time.currentTime());
+
+        AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity();
+        clientSession.setAuthMethod("saml");
+        clientSession.setAction("something");
+        clientSession.setTimestamp(1234);
+        clientSession.setProtocolMappers(new HashSet<>(Arrays.asList("mapper1", "mapper2")));
+        clientSession.setRoles(new HashSet<>(Arrays.asList("role1", "role2")));
+        session.getAuthenticatedClientSessions().put("client1", clientSession);
+
+        SessionEntityWrapper<UserSessionEntity> wrappedSession = new SessionEntityWrapper<>(session);
+        return wrappedSession;
+    }
+
+
+    private static Thread createWorker(Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, int threadId) {
+        System.out.println("Retrieved cache: " + threadId);
+
+        RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
+
+        if (threadId == 1) {
+            remoteCache1 = remoteCache;
+        } else {
+            remoteCache2 = remoteCache;
+        }
+
+        AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2;
+        HotRodListener listener = new HotRodListener(cache, remoteCache, counter);
+        remoteCache.addClientListener(listener);
+
+        return new RemoteCacheWorker(remoteCache, threadId);
+        //return new CacheWorker(cache, threadId);
+    }
+
+
+    private static EmbeddedCacheManager createManager(int threadId) {
+        return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
+    }
+
+
+    @ClientListener
+    public static class HotRodListener {
+
+        private Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache;
+        private RemoteCache remoteCache;
+        private AtomicInteger listenerCount;
+
+        public HotRodListener(Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache, RemoteCache remoteCache, AtomicInteger listenerCount) {
+            this.listenerCount = listenerCount;
+            this.remoteCache = remoteCache;
+            this.origCache = origCache;
+        }
+
+
+        @ClientCacheEntryCreated
+        public void created(ClientCacheEntryCreatedEvent event) {
+            String cacheKey = (String) event.getKey();
+
+            logger.infof("Listener executed for creating of session %s", cacheKey);
+        }
+
+
+        @ClientCacheEntryModified
+        public void modified(ClientCacheEntryModifiedEvent event) {
+            String cacheKey = (String) event.getKey();
+
+            logger.infof("Listener executed for modifying of session %s", cacheKey);
+        }
+
+
+        @ClientCacheEntryRemoved
+        public void removed(ClientCacheEntryRemovedEvent event) {
+            String cacheKey = (String) event.getKey();
+
+            logger.infof("Listener executed for removing of session %s", cacheKey);
+
+            // TODO: for distributed caches, ensure that it is executed just on owner OR if event.isCommandRetried
+            origCache
+                    .getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE)
+                    .remove(cacheKey);
+
+        }
+
+    }
+
+    private static class RemoteCacheWorker extends Thread {
+
+        private final RemoteCache<String, Object> remoteCache;
+
+        private final int myThreadId;
+
+        private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) {
+            this.remoteCache = remoteCache;
+            this.myThreadId = myThreadId;
+        }
+
+        @Override
+        public void run() {
+
+            for (int i=0 ; i<ITERATIONS ; i++) {
+                String sessionId = String.valueOf(i);
+                remoteCache.remove(sessionId);
+
+
+                logger.infof("Session %s removed on DC1", sessionId);
+
+                // Check if it's immediately seen that session is removed on 2nd DC
+                RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1;
+                SessionEntityWrapper thatSession = (SessionEntityWrapper) secondDCRemoteCache.get(sessionId);
+                Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, thatSession);
+
+                // Also check that it's immediatelly removed on my DC
+                SessionEntityWrapper mySession = (SessionEntityWrapper) remoteCache.get(sessionId);
+                Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, mySession);
+            }
+
+        }
+
+    }
+
+}
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
index 7101d38..dd34b19 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
@@ -59,7 +59,7 @@ import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
  */
 public class ConcurrencyJDGSessionsCacheTest {
 
-    protected static final Logger logger = Logger.getLogger(KcRemoteStore.class);
+    protected static final Logger logger = Logger.getLogger(ConcurrencyJDGSessionsCacheTest.class);
 
     private static final int ITERATION_PER_WORKER = 1000;
 
@@ -210,56 +210,11 @@ public class ConcurrencyJDGSessionsCacheTest {
         //return new CacheWorker(cache, threadId);
     }
 
-    private static EmbeddedCacheManager createManager(int threadId) {
-        System.setProperty("java.net.preferIPv4Stack", "true");
-        System.setProperty("jgroups.tcp.port", "53715");
-        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
-
-        boolean clustered = false;
-        boolean async = false;
-        boolean allowDuplicateJMXDomains = true;
-
-        if (clustered) {
-            gcb = gcb.clusteredDefault();
-            gcb.transport().clusterName("test-clustering");
-        }
-
-        gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
-
-        EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
-
-        Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
-
-        cacheManager.defineConfiguration(InfinispanConnectionProvider.SESSION_CACHE_NAME, invalidationCacheConfiguration);
-        return cacheManager;
 
+    private static EmbeddedCacheManager createManager(int threadId) {
+        return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, KcRemoteStoreConfigurationBuilder.class);
     }
 
-    private static Configuration getCacheBackedByRemoteStore(int threadId) {
-        ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
-
-        int port = threadId==1 ? 12232 : 13232;
-        //int port = 12232;
-
-        return cacheConfigBuilder.persistence().addStore(KcRemoteStoreConfigurationBuilder.class)
-                .fetchPersistentState(false)
-                .ignoreModifications(false)
-                .purgeOnStartup(false)
-                .preload(false)
-                .shared(true)
-                .remoteCacheName(InfinispanConnectionProvider.SESSION_CACHE_NAME)
-                .rawValues(true)
-                .forceReturnValues(false)
-                .marshaller(KeycloakHotRodMarshallerFactory.class.getName())
-                .addServer()
-                    .host("localhost")
-                    .port(port)
-                .connectionPool()
-                    .maxActive(20)
-                    .exhaustedAction(ExhaustedAction.CREATE_NEW)
-                .async()
-                    .enabled(false).build();
-    }
 
     @ClientListener
     public static class HotRodListener {
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/TestCacheManagerFactory.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/TestCacheManagerFactory.java
new file mode 100644
index 0000000..06dd95f
--- /dev/null
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/TestCacheManagerFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.persistence.remote.configuration.ExhaustedAction;
+import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+class TestCacheManagerFactory {
+
+
+    <T extends RemoteStoreConfigurationBuilder> EmbeddedCacheManager createManager(int threadId, String cacheName, Class<T> builderClass) {
+        System.setProperty("java.net.preferIPv4Stack", "true");
+        System.setProperty("jgroups.tcp.port", "53715");
+        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
+
+        boolean clustered = false;
+        boolean async = false;
+        boolean allowDuplicateJMXDomains = true;
+
+        if (clustered) {
+            gcb = gcb.clusteredDefault();
+            gcb.transport().clusterName("test-clustering");
+        }
+
+        gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
+
+        EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
+
+        Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId, cacheName, builderClass);
+
+        cacheManager.defineConfiguration(cacheName, invalidationCacheConfiguration);
+        return cacheManager;
+
+    }
+
+
+    private <T extends RemoteStoreConfigurationBuilder> Configuration getCacheBackedByRemoteStore(int threadId, String cacheName, Class<T> builderClass) {
+        ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
+
+        int port = threadId==1 ? 12232 : 13232;
+        //int port = 12232;
+
+        return cacheConfigBuilder.persistence().addStore(builderClass)
+                .fetchPersistentState(false)
+                .ignoreModifications(false)
+                .purgeOnStartup(false)
+                .preload(false)
+                .shared(true)
+                .remoteCacheName(cacheName)
+                .rawValues(true)
+                .forceReturnValues(false)
+                .marshaller(KeycloakHotRodMarshallerFactory.class.getName())
+                .addServer()
+                    .host("localhost")
+                .port(port)
+                .connectionPool()
+                    .maxActive(20)
+                    .exhaustedAction(ExhaustedAction.CREATE_NEW)
+                .async()
+                .   enabled(false).build();
+    }
+}
diff --git a/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java b/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java
index 11d44af..c06caa0 100755
--- a/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java
+++ b/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java
@@ -34,6 +34,7 @@ import org.keycloak.services.ErrorPageException;
 import org.keycloak.services.managers.AuthenticationManager;
 import org.keycloak.services.managers.AuthenticationSessionManager;
 import org.keycloak.services.managers.ClientSessionCode;
+import org.keycloak.services.managers.UserSessionCrossDCManager;
 import org.keycloak.services.messages.Messages;
 import org.keycloak.services.resources.LoginActionsService;
 import org.keycloak.services.util.CacheControlUtil;
@@ -208,7 +209,7 @@ public abstract class AuthorizationEndpointBase {
             }
         }
 
-        UserSessionModel userSession = authSessionId==null ? null : session.sessions().getUserSession(realm, authSessionId);
+        UserSessionModel userSession = authSessionId==null ? null : new UserSessionCrossDCManager(session).getUserSessionIfExistsRemotely(realm, authSessionId);
 
         if (userSession != null) {
             logger.debugf("Sent request to authz endpoint. We don't have authentication session with ID '%s' but we have userSession. Will re-create authentication session with same ID", authSessionId);
diff --git a/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java b/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java
index 1d1a3be..11795e5 100644
--- a/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java
+++ b/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java
@@ -62,4 +62,17 @@ public class UserSessionCrossDCManager {
 
         });
     }
+
+
+    // Just check if userSession also exists on remoteCache. It can happen that logout happened on 2nd DC and userSession is already removed on remoteCache and this DC wasn't yet notified
+    public UserSessionModel getUserSessionIfExistsRemotely(RealmModel realm, String id) {
+        UserSessionModel userSession = kcSession.sessions().getUserSession(realm, id);
+
+        // This will remove userSession "locally" if it doesn't exists on remoteCache
+        kcSession.sessions().getUserSessionWithPredicate(realm, id, false, (UserSessionModel userSession2) -> {
+            return userSession2 == null;
+        });
+
+        return kcSession.sessions().getUserSession(realm, id);
+    }
 }
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java
index f305013..c0eb849 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java
@@ -39,8 +39,8 @@ public class LoginCrossDCTest extends AbstractAdminCrossDCTest {
 
         //log.info("Started to sleep");
         //Thread.sleep(10000000);
-        for (int i=0 ; i<10 ; i++) {
-            OAuthClient.AuthorizationEndpointResponse response1 = Retry.call(() -> oauth.doLogin("test-user@localhost", "password"), 20, 100);
+        for (int i=0 ; i<30 ; i++) {
+            OAuthClient.AuthorizationEndpointResponse response1 = oauth.doLogin("test-user@localhost", "password");
             String code = response1.getCode();
             OAuthClient.AccessTokenResponse response2 = oauth.doAccessTokenRequest(code, "password");
             Assert.assertNotNull(response2.getAccessToken());