keycloak-uncached
Changes
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java 11(+10 -1)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java 19(+16 -3)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java 4(+2 -2)
model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java 58(+1 -57)
model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java 52(+1 -51)
model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java 292(+292 -0)
model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java 51(+3 -48)
Details
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
index d3bcacc..43bb2b3 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
@@ -127,7 +127,16 @@ public class InfinispanChangelogBasedTransaction<S extends SessionEntity> extend
return wrappedEntity;
} else {
- return myUpdates.getEntityWrapper();
+ S entity = myUpdates.getEntityWrapper().getEntity();
+
+ // If entity is scheduled for remove, we don't return it.
+ boolean scheduledForRemove = myUpdates.getUpdateTasks().stream().filter((SessionUpdateTask task) -> {
+
+ return task.getOperation(entity) == SessionUpdateTask.CacheOperation.REMOVE;
+
+ }).findFirst().isPresent();
+
+ return scheduledForRemove ? null : myUpdates.getEntityWrapper();
}
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
index 8d04a50..7f10f9e 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
@@ -321,13 +321,26 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
// Recursion. We should have it locally now
return getUserSessionWithPredicate(realm, id, offline, predicate);
+ } else {
+ log.debugf("getUserSessionWithPredicate(%s): found, but predicate doesn't pass", id);
+
+ return null;
}
+ } else {
+ log.debugf("getUserSessionWithPredicate(%s): not found", id);
+
+ // Session not available on remoteCache. Was already removed there. So removing locally too.
+ // TODO: Can be optimized to skip calling remoteCache.remove
+ removeUserSession(realm, userSession);
+
+ return null;
}
- }
+ } else {
- log.debugf("getUserSessionWithPredicate(%s): not found", id);
+ log.debugf("getUserSessionWithPredicate(%s): remote cache not available", id);
- return null;
+ return null;
+ }
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java
index 3d31122..60c34bf 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java
@@ -87,8 +87,8 @@ public class KcRemoteStore extends RemoteStore {
public boolean delete(Object key) throws PersistenceException {
logger.debugf("Calling delete for key '%s' on cache '%s'", key, cacheName);
- // Optimization - we don't need to know the previous value. Also it's ok to trigger asynchronously
- getRemoteCache().removeAsync(key);
+ // Optimization - we don't need to know the previous value.
+ getRemoteCache().remove(key);
return true;
}
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
index f18d8d3..b524885 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
@@ -28,17 +28,11 @@ import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
-import org.infinispan.configuration.cache.Configuration;
-import org.infinispan.configuration.cache.ConfigurationBuilder;
-import org.infinispan.configuration.global.GlobalConfigurationBuilder;
-import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.RemoteStore;
-import org.infinispan.persistence.remote.configuration.ExhaustedAction;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
import org.junit.Assert;
-import org.junit.Ignore;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
@@ -128,7 +122,7 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
}
private static Worker createWorker(int threadId) {
- EmbeddedCacheManager manager = createManager(threadId);
+ EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
System.out.println("Retrieved cache: " + threadId);
@@ -140,56 +134,6 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
return new Worker(cache, threadId);
}
- private static EmbeddedCacheManager createManager(int threadId) {
- System.setProperty("java.net.preferIPv4Stack", "true");
- System.setProperty("jgroups.tcp.port", "53715");
- GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
-
- boolean clustered = false;
- boolean async = false;
- boolean allowDuplicateJMXDomains = true;
-
- if (clustered) {
- gcb = gcb.clusteredDefault();
- gcb.transport().clusterName("test-clustering");
- }
-
- gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
-
- EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
-
- Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
-
- cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
- return cacheManager;
-
- }
-
- private static Configuration getCacheBackedByRemoteStore(int threadId) {
- ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
-
- int port = threadId==1 ? 12232 : 13232;
- //int port = 12232;
-
- return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
- .fetchPersistentState(false)
- .ignoreModifications(false)
- .purgeOnStartup(false)
- .preload(false)
- .shared(true)
- .remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
- .rawValues(true)
- .forceReturnValues(false)
- .addServer()
- .host("localhost")
- .port(port)
- .connectionPool()
- .maxActive(20)
- .exhaustedAction(ExhaustedAction.CREATE_NEW)
- .async()
- . enabled(false).build();
- }
-
@ClientListener
public static class HotRodListener {
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
index df1b80e..9c23452 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
@@ -83,7 +83,7 @@ public class ConcurrencyJDGRemoteCacheTest {
}
private static Worker createWorker(int threadId) {
- EmbeddedCacheManager manager = createManager(threadId);
+ EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
System.out.println("Retrieved cache: " + threadId);
@@ -95,56 +95,6 @@ public class ConcurrencyJDGRemoteCacheTest {
return new Worker(cache, threadId);
}
- private static EmbeddedCacheManager createManager(int threadId) {
- System.setProperty("java.net.preferIPv4Stack", "true");
- System.setProperty("jgroups.tcp.port", "53715");
- GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
-
- boolean clustered = false;
- boolean async = false;
- boolean allowDuplicateJMXDomains = true;
-
- if (clustered) {
- gcb = gcb.clusteredDefault();
- gcb.transport().clusterName("test-clustering");
- }
-
- gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
-
- EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
-
- Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
-
- cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
- return cacheManager;
-
- }
-
- private static Configuration getCacheBackedByRemoteStore(int threadId) {
- ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
-
- int port = threadId==1 ? 12232 : 13232;
- //int port = 12232;
-
- return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
- .fetchPersistentState(false)
- .ignoreModifications(false)
- .purgeOnStartup(false)
- .preload(false)
- .shared(true)
- .remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
- .rawValues(true)
- .forceReturnValues(false)
- .addServer()
- .host("localhost")
- .port(port)
- .connectionPool()
- .maxActive(20)
- .exhaustedAction(ExhaustedAction.CREATE_NEW)
- .async()
- . enabled(false).build();
- }
-
@ClientListener
public static class HotRodListener {
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java
new file mode 100644
index 0000000..ff4c3ce
--- /dev/null
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java
@@ -0,0 +1,292 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.infinispan.Cache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
+import org.infinispan.client.hotrod.annotation.ClientListener;
+import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
+import org.infinispan.context.Flag;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.keycloak.common.util.Time;
+import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
+import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
+import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
+import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
+
+/**
+ * Check that removing of session from remoteCache is session immediately removed on remoteCache in other DC. This is true.
+ *
+ * Also check that listeners are executed asynchronously with some delay.
+ *
+ * Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ConcurrencyJDGRemoveSessionTest {
+
+ protected static final Logger logger = Logger.getLogger(ConcurrencyJDGRemoveSessionTest.class);
+
+ private static final int ITERATIONS = 10000;
+
+ private static RemoteCache remoteCache1;
+ private static RemoteCache remoteCache2;
+
+ private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
+ private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
+
+ private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0);
+ private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0);
+
+ //private static Map<String, EntryInfo> state = new HashMap<>();
+
+ public static void main(String[] args) throws Exception {
+ Cache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createManager(1).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
+ Cache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createManager(2).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
+
+ // Create caches, listeners and finally worker threads
+ Thread worker1 = createWorker(cache1, 1);
+ Thread worker2 = createWorker(cache2, 2);
+
+ // Create 100 initial sessions
+ for (int i=0 ; i<ITERATIONS ; i++) {
+ String sessionId = String.valueOf(i);
+ SessionEntityWrapper<UserSessionEntity> wrappedSession = createSessionEntity(sessionId);
+ cache1.put(sessionId, wrappedSession);
+ }
+
+ logger.info("SESSIONS CREATED");
+
+ // Create 100 initial sessions
+ for (int i=0 ; i<ITERATIONS ; i++) {
+ String sessionId = String.valueOf(i);
+ SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
+ Assert.assertNotNull("Loaded wrapper for key " + sessionId, loadedWrapper);
+ }
+
+ logger.info("SESSIONS AVAILABLE ON DC2");
+
+
+ long start = System.currentTimeMillis();
+
+ try {
+ // Just running in current thread
+ worker1.run();
+
+ logger.info("SESSIONS REMOVED");
+
+ //Thread.sleep(5000);
+
+ // Doing it in opposite direction to ensure that newer are checked first.
+ // This us currently FAILING (expected) as listeners are executed asynchronously.
+ for (int i=ITERATIONS-1 ; i>=0 ; i--) {
+ String sessionId = String.valueOf(i);
+
+ logger.infof("Before call cache2.get: %s", sessionId);
+
+ SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
+ Assert.assertNull("Loaded wrapper not null for key " + sessionId, loadedWrapper);
+ }
+
+ logger.info("SESSIONS NOT AVAILABLE ON DC2");
+
+
+ // // Start and join workers
+// worker1.start();
+// worker2.start();
+//
+// worker1.join();
+// worker2.join();
+
+ } finally {
+ Thread.sleep(2000);
+
+ // Finish JVM
+ cache1.getCacheManager().stop();
+ cache2.getCacheManager().stop();
+ }
+
+ long took = System.currentTimeMillis() - start;
+
+// // Output
+// for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
+// System.out.println(entry.getKey() + ":::" + entry.getValue());
+// worker1.cache.remove(entry.getKey());
+// }
+
+// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
+// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
+// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() );
+//
+// System.out.println("Sleeping before other report");
+//
+// Thread.sleep(1000);
+//
+// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
+// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
+// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
+
+
+ }
+
+
+ private static SessionEntityWrapper<UserSessionEntity> createSessionEntity(String sessionId) {
+ // Create 100 initial sessions
+ UserSessionEntity session = new UserSessionEntity();
+ session.setId(sessionId);
+ session.setRealm("foo");
+ session.setBrokerSessionId("!23123123");
+ session.setBrokerUserId(null);
+ session.setUser("foo");
+ session.setLoginUsername("foo");
+ session.setIpAddress("123.44.143.178");
+ session.setStarted(Time.currentTime());
+ session.setLastSessionRefresh(Time.currentTime());
+
+ AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity();
+ clientSession.setAuthMethod("saml");
+ clientSession.setAction("something");
+ clientSession.setTimestamp(1234);
+ clientSession.setProtocolMappers(new HashSet<>(Arrays.asList("mapper1", "mapper2")));
+ clientSession.setRoles(new HashSet<>(Arrays.asList("role1", "role2")));
+ session.getAuthenticatedClientSessions().put("client1", clientSession);
+
+ SessionEntityWrapper<UserSessionEntity> wrappedSession = new SessionEntityWrapper<>(session);
+ return wrappedSession;
+ }
+
+
+ private static Thread createWorker(Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, int threadId) {
+ System.out.println("Retrieved cache: " + threadId);
+
+ RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
+
+ if (threadId == 1) {
+ remoteCache1 = remoteCache;
+ } else {
+ remoteCache2 = remoteCache;
+ }
+
+ AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2;
+ HotRodListener listener = new HotRodListener(cache, remoteCache, counter);
+ remoteCache.addClientListener(listener);
+
+ return new RemoteCacheWorker(remoteCache, threadId);
+ //return new CacheWorker(cache, threadId);
+ }
+
+
+ private static EmbeddedCacheManager createManager(int threadId) {
+ return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
+ }
+
+
+ @ClientListener
+ public static class HotRodListener {
+
+ private Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache;
+ private RemoteCache remoteCache;
+ private AtomicInteger listenerCount;
+
+ public HotRodListener(Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache, RemoteCache remoteCache, AtomicInteger listenerCount) {
+ this.listenerCount = listenerCount;
+ this.remoteCache = remoteCache;
+ this.origCache = origCache;
+ }
+
+
+ @ClientCacheEntryCreated
+ public void created(ClientCacheEntryCreatedEvent event) {
+ String cacheKey = (String) event.getKey();
+
+ logger.infof("Listener executed for creating of session %s", cacheKey);
+ }
+
+
+ @ClientCacheEntryModified
+ public void modified(ClientCacheEntryModifiedEvent event) {
+ String cacheKey = (String) event.getKey();
+
+ logger.infof("Listener executed for modifying of session %s", cacheKey);
+ }
+
+
+ @ClientCacheEntryRemoved
+ public void removed(ClientCacheEntryRemovedEvent event) {
+ String cacheKey = (String) event.getKey();
+
+ logger.infof("Listener executed for removing of session %s", cacheKey);
+
+ // TODO: for distributed caches, ensure that it is executed just on owner OR if event.isCommandRetried
+ origCache
+ .getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE)
+ .remove(cacheKey);
+
+ }
+
+ }
+
+ private static class RemoteCacheWorker extends Thread {
+
+ private final RemoteCache<String, Object> remoteCache;
+
+ private final int myThreadId;
+
+ private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) {
+ this.remoteCache = remoteCache;
+ this.myThreadId = myThreadId;
+ }
+
+ @Override
+ public void run() {
+
+ for (int i=0 ; i<ITERATIONS ; i++) {
+ String sessionId = String.valueOf(i);
+ remoteCache.remove(sessionId);
+
+
+ logger.infof("Session %s removed on DC1", sessionId);
+
+ // Check if it's immediately seen that session is removed on 2nd DC
+ RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1;
+ SessionEntityWrapper thatSession = (SessionEntityWrapper) secondDCRemoteCache.get(sessionId);
+ Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, thatSession);
+
+ // Also check that it's immediatelly removed on my DC
+ SessionEntityWrapper mySession = (SessionEntityWrapper) remoteCache.get(sessionId);
+ Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, mySession);
+ }
+
+ }
+
+ }
+
+}
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
index 7101d38..dd34b19 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
@@ -59,7 +59,7 @@ import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
*/
public class ConcurrencyJDGSessionsCacheTest {
- protected static final Logger logger = Logger.getLogger(KcRemoteStore.class);
+ protected static final Logger logger = Logger.getLogger(ConcurrencyJDGSessionsCacheTest.class);
private static final int ITERATION_PER_WORKER = 1000;
@@ -210,56 +210,11 @@ public class ConcurrencyJDGSessionsCacheTest {
//return new CacheWorker(cache, threadId);
}
- private static EmbeddedCacheManager createManager(int threadId) {
- System.setProperty("java.net.preferIPv4Stack", "true");
- System.setProperty("jgroups.tcp.port", "53715");
- GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
-
- boolean clustered = false;
- boolean async = false;
- boolean allowDuplicateJMXDomains = true;
-
- if (clustered) {
- gcb = gcb.clusteredDefault();
- gcb.transport().clusterName("test-clustering");
- }
-
- gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
-
- EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
-
- Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
-
- cacheManager.defineConfiguration(InfinispanConnectionProvider.SESSION_CACHE_NAME, invalidationCacheConfiguration);
- return cacheManager;
+ private static EmbeddedCacheManager createManager(int threadId) {
+ return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, KcRemoteStoreConfigurationBuilder.class);
}
- private static Configuration getCacheBackedByRemoteStore(int threadId) {
- ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
-
- int port = threadId==1 ? 12232 : 13232;
- //int port = 12232;
-
- return cacheConfigBuilder.persistence().addStore(KcRemoteStoreConfigurationBuilder.class)
- .fetchPersistentState(false)
- .ignoreModifications(false)
- .purgeOnStartup(false)
- .preload(false)
- .shared(true)
- .remoteCacheName(InfinispanConnectionProvider.SESSION_CACHE_NAME)
- .rawValues(true)
- .forceReturnValues(false)
- .marshaller(KeycloakHotRodMarshallerFactory.class.getName())
- .addServer()
- .host("localhost")
- .port(port)
- .connectionPool()
- .maxActive(20)
- .exhaustedAction(ExhaustedAction.CREATE_NEW)
- .async()
- .enabled(false).build();
- }
@ClientListener
public static class HotRodListener {
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/TestCacheManagerFactory.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/TestCacheManagerFactory.java
new file mode 100644
index 0000000..06dd95f
--- /dev/null
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/TestCacheManagerFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.persistence.remote.configuration.ExhaustedAction;
+import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+class TestCacheManagerFactory {
+
+
+ <T extends RemoteStoreConfigurationBuilder> EmbeddedCacheManager createManager(int threadId, String cacheName, Class<T> builderClass) {
+ System.setProperty("java.net.preferIPv4Stack", "true");
+ System.setProperty("jgroups.tcp.port", "53715");
+ GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
+
+ boolean clustered = false;
+ boolean async = false;
+ boolean allowDuplicateJMXDomains = true;
+
+ if (clustered) {
+ gcb = gcb.clusteredDefault();
+ gcb.transport().clusterName("test-clustering");
+ }
+
+ gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
+
+ EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
+
+ Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId, cacheName, builderClass);
+
+ cacheManager.defineConfiguration(cacheName, invalidationCacheConfiguration);
+ return cacheManager;
+
+ }
+
+
+ private <T extends RemoteStoreConfigurationBuilder> Configuration getCacheBackedByRemoteStore(int threadId, String cacheName, Class<T> builderClass) {
+ ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
+
+ int port = threadId==1 ? 12232 : 13232;
+ //int port = 12232;
+
+ return cacheConfigBuilder.persistence().addStore(builderClass)
+ .fetchPersistentState(false)
+ .ignoreModifications(false)
+ .purgeOnStartup(false)
+ .preload(false)
+ .shared(true)
+ .remoteCacheName(cacheName)
+ .rawValues(true)
+ .forceReturnValues(false)
+ .marshaller(KeycloakHotRodMarshallerFactory.class.getName())
+ .addServer()
+ .host("localhost")
+ .port(port)
+ .connectionPool()
+ .maxActive(20)
+ .exhaustedAction(ExhaustedAction.CREATE_NEW)
+ .async()
+ . enabled(false).build();
+ }
+}
diff --git a/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java b/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java
index 11d44af..c06caa0 100755
--- a/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java
+++ b/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java
@@ -34,6 +34,7 @@ import org.keycloak.services.ErrorPageException;
import org.keycloak.services.managers.AuthenticationManager;
import org.keycloak.services.managers.AuthenticationSessionManager;
import org.keycloak.services.managers.ClientSessionCode;
+import org.keycloak.services.managers.UserSessionCrossDCManager;
import org.keycloak.services.messages.Messages;
import org.keycloak.services.resources.LoginActionsService;
import org.keycloak.services.util.CacheControlUtil;
@@ -208,7 +209,7 @@ public abstract class AuthorizationEndpointBase {
}
}
- UserSessionModel userSession = authSessionId==null ? null : session.sessions().getUserSession(realm, authSessionId);
+ UserSessionModel userSession = authSessionId==null ? null : new UserSessionCrossDCManager(session).getUserSessionIfExistsRemotely(realm, authSessionId);
if (userSession != null) {
logger.debugf("Sent request to authz endpoint. We don't have authentication session with ID '%s' but we have userSession. Will re-create authentication session with same ID", authSessionId);
diff --git a/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java b/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java
index 1d1a3be..11795e5 100644
--- a/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java
+++ b/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java
@@ -62,4 +62,17 @@ public class UserSessionCrossDCManager {
});
}
+
+
+ // Just check if userSession also exists on remoteCache. It can happen that logout happened on 2nd DC and userSession is already removed on remoteCache and this DC wasn't yet notified
+ public UserSessionModel getUserSessionIfExistsRemotely(RealmModel realm, String id) {
+ UserSessionModel userSession = kcSession.sessions().getUserSession(realm, id);
+
+ // This will remove userSession "locally" if it doesn't exists on remoteCache
+ kcSession.sessions().getUserSessionWithPredicate(realm, id, false, (UserSessionModel userSession2) -> {
+ return userSession2 == null;
+ });
+
+ return kcSession.sessions().getUserSession(realm, id);
+ }
}
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java
index f305013..c0eb849 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java
@@ -39,8 +39,8 @@ public class LoginCrossDCTest extends AbstractAdminCrossDCTest {
//log.info("Started to sleep");
//Thread.sleep(10000000);
- for (int i=0 ; i<10 ; i++) {
- OAuthClient.AuthorizationEndpointResponse response1 = Retry.call(() -> oauth.doLogin("test-user@localhost", "password"), 20, 100);
+ for (int i=0 ; i<30 ; i++) {
+ OAuthClient.AuthorizationEndpointResponse response1 = oauth.doLogin("test-user@localhost", "password");
String code = response1.getCode();
OAuthClient.AccessTokenResponse response2 = oauth.doAccessTokenRequest(code, "password");
Assert.assertNotNull(response2.getAccessToken());