keycloak-aplcache
Merge pull request #4372 from mposolda/ispn-clientListeners-bugs KEYCLOAK-4187 …
Changes
model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java 19(+9 -10)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStore.java 2(+1 -1)
model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java 295(+295 -0)
model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java 9(+5 -4)
Details
diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java
index 998cbeb..f59be47 100644
--- a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java
+++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java
@@ -23,28 +23,23 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
-import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
-import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.context.Flag;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
-import org.infinispan.notifications.cachelistener.event.CacheEntryExpiredEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.persistence.remote.RemoteStore;
@@ -52,8 +47,7 @@ import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
-import org.keycloak.common.util.MultivaluedHashMap;
-
+import org.keycloak.common.util.ConcurrentMultivaluedHashMap;
/**
* Impl for sending infinispan messages across cluster and listening to them
*
@@ -63,7 +57,7 @@ public class InfinispanNotificationsManager {
protected static final Logger logger = Logger.getLogger(InfinispanNotificationsManager.class);
- private final MultivaluedHashMap<String, ClusterListener> listeners = new MultivaluedHashMap<>();
+ private final ConcurrentMultivaluedHashMap<String, ClusterListener> listeners = new ConcurrentMultivaluedHashMap<>();
private final ConcurrentMap<String, TaskCallback> taskCallbacks = new ConcurrentHashMap<>();
@@ -132,8 +126,10 @@ public class InfinispanNotificationsManager {
wrappedEvent.setSender(myAddress);
wrappedEvent.setSenderSite(mySite);
+ String eventKey = UUID.randomUUID().toString();
+
if (logger.isTraceEnabled()) {
- logger.tracef("Sending event: %s", event);
+ logger.tracef("Sending event with key %s: %s", eventKey, event);
}
Flag[] flags = dcNotify == ClusterProvider.DCNotify.LOCAL_DC_ONLY
@@ -142,7 +138,7 @@ public class InfinispanNotificationsManager {
// Put the value to the cache to notify listeners on all the nodes
workCache.getAdvancedCache().withFlags(flags)
- .put(UUID.randomUUID().toString(), wrappedEvent, 120, TimeUnit.SECONDS);
+ .put(eventKey, wrappedEvent, 120, TimeUnit.SECONDS);
}
@@ -208,6 +204,9 @@ public class InfinispanNotificationsManager {
private void eventReceived(String key, Serializable obj) {
if (!(obj instanceof WrapperClusterEvent)) {
+ if (obj == null) {
+ logger.warnf("Event object wasn't available in remote cache after event was received. Event key: %s", key);
+ }
return;
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStore.java
index b285290..c50bcf1 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStore.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStore.java
@@ -90,7 +90,7 @@ public class LastSessionRefreshStore {
LastSessionRefreshEvent event = new LastSessionRefreshEvent(refreshesToSend);
if (logger.isDebugEnabled()) {
- logger.debugf("Sending lastSessionRefreshes: %s", event.getLastSessionRefreshes().toString());
+ logger.debugf("Sending lastSessionRefreshes for key '%s'. Refreshes: %s", eventKey, event.getLastSessionRefreshes().toString());
}
// Don't notify local DC about the lastSessionRefreshes. They were processed here already
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
new file mode 100644
index 0000000..f18d8d3
--- /dev/null
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
@@ -0,0 +1,295 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.infinispan.Cache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
+import org.infinispan.client.hotrod.annotation.ClientListener;
+import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.persistence.manager.PersistenceManager;
+import org.infinispan.persistence.remote.RemoteStore;
+import org.infinispan.persistence.remote.configuration.ExhaustedAction;
+import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
+import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
+
+/**
+ * Test that hotrod ClientListeners are correctly executed as expected
+ *
+ * STEPS TO REPRODUCE:
+ * - Unzip infinispan-server-8.2.6.Final to some locations ISPN1 and ISPN2
+ *
+ * - Edit both ISPN1/standalone/configuration/clustered.xml and ISPN2/standalone/configuration/clustered.xml . Configure cache in container "clustered"
+ *
+ * <replicated-cache-configuration name="sessions-cfg" mode="ASYNC" start="EAGER" batching="false">
+ <transaction mode="NON_XA" locking="PESSIMISTIC"/>
+ </replicated-cache-configuration>
+
+ <replicated-cache name="work" configuration="sessions-cfg" />
+
+ - Run server1
+ ./standalone.sh -c clustered.xml -Djava.net.preferIPv4Stack=true -Djboss.socket.binding.port-offset=1010 -Djboss.default.multicast.address=234.56.78.99 -Djboss.node.name=cache-server
+
+ - Run server2
+ ./standalone.sh -c clustered.xml -Djava.net.preferIPv4Stack=true -Djboss.socket.binding.port-offset=2010 -Djboss.default.multicast.address=234.56.78.99 -Djboss.node.name=cache-server-dc-2
+
+ - Run this test as main class from IDE
+ *
+ *
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ConcurrencyJDGRemoteCacheClientListenersTest {
+
+ // Helper map to track if listeners were executed
+ private static Map<String, EntryInfo> state = new HashMap<>();
+
+ private static AtomicInteger totalListenerCalls = new AtomicInteger(0);
+
+ private static AtomicInteger totalErrors = new AtomicInteger(0);
+
+
+ public static void main(String[] args) throws Exception {
+ // Init map somehow
+ for (int i=0 ; i<1000 ; i++) {
+ String key = "key-" + i;
+ EntryInfo entryInfo = new EntryInfo();
+ entryInfo.val.set(i);
+ state.put(key, entryInfo);
+ }
+
+ // Create caches, listeners and finally worker threads
+ Worker worker1 = createWorker(1);
+ Worker worker2 = createWorker(2);
+
+ // Note "run", so it's not executed asynchronously here!!!
+ worker1.run();
+
+//
+// // Start and join workers
+// worker1.start();
+// worker2.start();
+//
+// worker1.join();
+// worker2.join();
+
+ // Output
+ for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
+ System.out.println(entry.getKey() + ":::" + entry.getValue());
+ }
+
+ System.out.println("totalListeners: " + totalListenerCalls.get() + ", totalErrors: " + totalErrors.get());
+
+
+ // Assert that ClientListener was able to read the value and save it into EntryInfo
+ try {
+ for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
+ EntryInfo info = entry.getValue();
+ Assert.assertEquals(info.val.get(), info.dc1Created.get());
+ Assert.assertEquals(info.val.get(), info.dc2Created.get());
+ Assert.assertEquals(info.val.get() * 2, info.dc1Updated.get());
+ Assert.assertEquals(info.val.get() * 2, info.dc2Updated.get());
+ worker1.cache.remove(entry.getKey());
+ }
+ } finally {
+ // Finish JVM
+ worker1.cache.getCacheManager().stop();
+ worker2.cache.getCacheManager().stop();
+ }
+ }
+
+ private static Worker createWorker(int threadId) {
+ EmbeddedCacheManager manager = createManager(threadId);
+ Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
+
+ System.out.println("Retrieved cache: " + threadId);
+
+ RemoteStore remoteStore = cache.getAdvancedCache().getComponentRegistry().getComponent(PersistenceManager.class).getStores(RemoteStore.class).iterator().next();
+ HotRodListener listener = new HotRodListener(cache, threadId);
+ remoteStore.getRemoteCache().addClientListener(listener);
+
+ return new Worker(cache, threadId);
+ }
+
+ private static EmbeddedCacheManager createManager(int threadId) {
+ System.setProperty("java.net.preferIPv4Stack", "true");
+ System.setProperty("jgroups.tcp.port", "53715");
+ GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
+
+ boolean clustered = false;
+ boolean async = false;
+ boolean allowDuplicateJMXDomains = true;
+
+ if (clustered) {
+ gcb = gcb.clusteredDefault();
+ gcb.transport().clusterName("test-clustering");
+ }
+
+ gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
+
+ EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
+
+ Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
+
+ cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
+ return cacheManager;
+
+ }
+
+ private static Configuration getCacheBackedByRemoteStore(int threadId) {
+ ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
+
+ int port = threadId==1 ? 12232 : 13232;
+ //int port = 12232;
+
+ return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
+ .fetchPersistentState(false)
+ .ignoreModifications(false)
+ .purgeOnStartup(false)
+ .preload(false)
+ .shared(true)
+ .remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
+ .rawValues(true)
+ .forceReturnValues(false)
+ .addServer()
+ .host("localhost")
+ .port(port)
+ .connectionPool()
+ .maxActive(20)
+ .exhaustedAction(ExhaustedAction.CREATE_NEW)
+ .async()
+ . enabled(false).build();
+ }
+
+
+ @ClientListener
+ public static class HotRodListener {
+
+ private final RemoteCache<String, Integer> remoteCache;
+ private final int threadId;
+
+ public HotRodListener(Cache<String, Integer> cache, int threadId) {
+ this.remoteCache = InfinispanUtil.getRemoteCache(cache);
+ this.threadId = threadId;
+ }
+
+ //private AtomicInteger listenerCount = new AtomicInteger(0);
+
+ @ClientCacheEntryCreated
+ public void created(ClientCacheEntryCreatedEvent event) {
+ String cacheKey = (String) event.getKey();
+ event(cacheKey, true);
+
+ }
+
+
+ @ClientCacheEntryModified
+ public void updated(ClientCacheEntryModifiedEvent event) {
+ String cacheKey = (String) event.getKey();
+ event(cacheKey, false);
+ }
+
+
+ private void event(String cacheKey, boolean created) {
+ EntryInfo entryInfo = state.get(cacheKey);
+ entryInfo.successfulListenerWrites.incrementAndGet();
+
+ totalListenerCalls.incrementAndGet();
+
+ Integer val = remoteCache.get(cacheKey);
+ if (val != null) {
+ AtomicInteger dcVal;
+ if (created) {
+ dcVal = threadId == 1 ? entryInfo.dc1Created : entryInfo.dc2Created;
+ } else {
+ dcVal = threadId == 1 ? entryInfo.dc1Updated : entryInfo.dc2Updated;
+ }
+ dcVal.set(val);
+ } else {
+ System.err.println("NOT A VALUE FOR KEY: " + cacheKey);
+ totalErrors.incrementAndGet();
+ }
+ }
+
+ }
+
+
+ private static class Worker extends Thread {
+
+ private final Cache<String, Integer> cache;
+
+ private final int myThreadId;
+
+ private Worker(Cache<String, Integer> cache, int myThreadId) {
+ this.cache = cache;
+ this.myThreadId = myThreadId;
+ }
+
+ @Override
+ public void run() {
+ for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
+ String cacheKey = entry.getKey();
+ Integer value = entry.getValue().val.get();
+
+ this.cache.put(cacheKey, value);
+ }
+
+ System.out.println("Worker creating finished: " + myThreadId);
+
+ for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
+ String cacheKey = entry.getKey();
+ Integer value = entry.getValue().val.get() * 2;
+
+ this.cache.replace(cacheKey, value);
+ }
+
+ System.out.println("Worker updating finished: " + myThreadId);
+ }
+
+ }
+
+
+ public static class EntryInfo {
+ AtomicInteger val = new AtomicInteger();
+ AtomicInteger successfulListenerWrites = new AtomicInteger(0);
+ AtomicInteger dc1Created = new AtomicInteger();
+ AtomicInteger dc2Created = new AtomicInteger();
+ AtomicInteger dc1Updated = new AtomicInteger();
+ AtomicInteger dc2Updated = new AtomicInteger();
+
+ @Override
+ public String toString() {
+ return String.format("val: %d, successfulListenerWrites: %d, dc1Created: %d, dc2Created: %d, dc1Updated: %d, dc2Updated: %d", val.get(), successfulListenerWrites.get(),
+ dc1Created.get(), dc2Created.get(), dc1Updated.get(), dc2Updated.get());
+ }
+ }
+}
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
index d86e6f8..df1b80e 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
@@ -43,11 +43,12 @@ import org.junit.Ignore;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
/**
- * Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG
+ * Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG. Especially tests "putIfAbsent" contract.
+ *
+ * Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
-@Ignore
public class ConcurrencyJDGRemoteCacheTest {
private static Map<String, EntryInfo> state = new HashMap<>();
@@ -122,8 +123,8 @@ public class ConcurrencyJDGRemoteCacheTest {
private static Configuration getCacheBackedByRemoteStore(int threadId) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
- // int port = threadId==1 ? 11222 : 11322;
- int port = 11222;
+ int port = threadId==1 ? 12232 : 13232;
+ //int port = 12232;
return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
.fetchPersistentState(false)
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
index 056b0de..7101d38 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
@@ -39,6 +39,7 @@ import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.RemoteStore;
import org.infinispan.persistence.remote.configuration.ExhaustedAction;
import org.jboss.logging.Logger;
+import org.junit.Assert;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
@@ -50,13 +51,9 @@ import org.keycloak.models.sessions.infinispan.remotestore.KcRemoteStoreConfigur
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
/**
- * Test requires to prepare 2 JDG (or infinispan servers) before it's runned.
- * Steps:
- * - In JDG1/standalone/configuration/clustered.xml add this: <replicated-cache name="sessions" mode="SYNC" start="EAGER"/>
- * - Same in JDG2
- * - Run JDG1 with: ./standalone.sh -c clustered.xml
- * - Run JDG2 with: ./standalone.sh -c clustered.xml -Djboss.socket.binding.port-offset=100
- * - Run this test
+ * Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG. Especially tests "replaceWithVersion" contract.
+ *
+ * Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@@ -66,6 +63,9 @@ public class ConcurrencyJDGSessionsCacheTest {
private static final int ITERATION_PER_WORKER = 1000;
+ private static RemoteCache remoteCache1;
+ private static RemoteCache remoteCache2;
+
private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
@@ -176,6 +176,16 @@ public class ConcurrencyJDGSessionsCacheTest {
", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() );
+ System.out.println("Sleeping before other report");
+
+ Thread.sleep(1000);
+
+ System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
+ ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
+ ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
+
+
+
// Finish JVM
cache1.getCacheManager().stop();
cache2.getCacheManager().stop();
@@ -186,7 +196,11 @@ public class ConcurrencyJDGSessionsCacheTest {
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
- remoteCache.keySet();
+ if (threadId == 1) {
+ remoteCache1 = remoteCache;
+ } else {
+ remoteCache2 = remoteCache;
+ }
AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2;
HotRodListener listener = new HotRodListener(cache, remoteCache, counter);
@@ -224,8 +238,8 @@ public class ConcurrencyJDGSessionsCacheTest {
private static Configuration getCacheBackedByRemoteStore(int threadId) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
- //int port = threadId==1 ? 11222 : 11322;
- int port = 11222;
+ int port = threadId==1 ? 12232 : 13232;
+ //int port = 12232;
return cacheConfigBuilder.persistence().addStore(KcRemoteStoreConfigurationBuilder.class)
.fetchPersistentState(false)
@@ -288,12 +302,12 @@ public class ConcurrencyJDGSessionsCacheTest {
private static class RemoteCacheWorker extends Thread {
- private final RemoteCache<String, UserSessionEntity> cache;
+ private final RemoteCache<String, UserSessionEntity> remoteCache;
private final int myThreadId;
- private RemoteCacheWorker(RemoteCache cache, int myThreadId) {
- this.cache = cache;
+ private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) {
+ this.remoteCache = remoteCache;
this.myThreadId = myThreadId;
}
@@ -306,7 +320,7 @@ public class ConcurrencyJDGSessionsCacheTest {
boolean replaced = false;
while (!replaced) {
- VersionedValue<UserSessionEntity> versioned = cache.getVersioned("123");
+ VersionedValue<UserSessionEntity> versioned = remoteCache.getVersioned("123");
UserSessionEntity oldSession = versioned.getValue();
//UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession);
UserSessionEntity clone = oldSession;
@@ -315,13 +329,20 @@ public class ConcurrencyJDGSessionsCacheTest {
//cache.replace("123", clone);
replaced = cacheReplace(versioned, clone);
}
+
+ // Try to see if remoteCache on 2nd DC is immediatelly seeing our change
+ RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1;
+ UserSessionEntity thatSession = (UserSessionEntity) secondDCRemoteCache.get("123");
+
+ Assert.assertEquals("someVal", thatSession.getNotes().get(noteKey));
+ //System.out.println("Passed");
}
}
private boolean cacheReplace(VersionedValue<UserSessionEntity> oldSession, UserSessionEntity newSession) {
try {
- boolean replaced = cache.replaceWithVersion("123", newSession, oldSession.getVersion());
+ boolean replaced = remoteCache.replaceWithVersion("123", newSession, oldSession.getVersion());
//cache.replace("123", newSession);
if (!replaced) {
failedReplaceCounter.incrementAndGet();
diff --git a/testsuite/integration-arquillian/servers/cache-server/jboss/common/add-keycloak-caches.xsl b/testsuite/integration-arquillian/servers/cache-server/jboss/common/add-keycloak-caches.xsl
index bdd8b7c..b6fbd2e 100644
--- a/testsuite/integration-arquillian/servers/cache-server/jboss/common/add-keycloak-caches.xsl
+++ b/testsuite/integration-arquillian/servers/cache-server/jboss/common/add-keycloak-caches.xsl
@@ -47,7 +47,7 @@
<xsl:copy>
<xsl:apply-templates select="@* | node()" />
- <replicated-cache-configuration name="sessions-cfg" mode="SYNC" start="EAGER" batching="false">
+ <replicated-cache-configuration name="sessions-cfg" mode="ASYNC" start="EAGER" batching="false">
<transaction mode="NON_XA" locking="PESSIMISTIC"/>
</replicated-cache-configuration>
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/AbstractConcurrencyTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/AbstractConcurrencyTest.java
index e228270..5559a5e 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/AbstractConcurrencyTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/AbstractConcurrencyTest.java
@@ -76,6 +76,7 @@ public abstract class AbstractConcurrencyTest extends AbstractTestRealmKeycloakT
runnable.run(arrayIndex % numThreads, keycloaks.get(), keycloaks.get().realm(REALM_NAME));
} catch (Throwable ex) {
failures.add(ex);
+ log.error(ex.getMessage(), ex);
}
return null;
});
@@ -93,7 +94,7 @@ public abstract class AbstractConcurrencyTest extends AbstractTestRealmKeycloakT
}
if (! failures.isEmpty()) {
- RuntimeException ex = new RuntimeException("There were failures in threads");
+ RuntimeException ex = new RuntimeException("There were failures in threads. Failures count: " + failures.size());
failures.forEach(ex::addSuppressed);
throw ex;
}