keycloak-aplcache

Merge pull request #4372 from mposolda/ispn-clientListeners-bugs KEYCLOAK-4187

8/7/2017 8:21:56 AM

Details

diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java
index 998cbeb..f59be47 100644
--- a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java
+++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java
@@ -23,28 +23,23 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.infinispan.Cache;
 import org.infinispan.client.hotrod.RemoteCache;
 import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
-import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
 import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
 import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
 import org.infinispan.client.hotrod.annotation.ClientListener;
 import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
-import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
 import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
 import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
 import org.infinispan.context.Flag;
 import org.infinispan.notifications.Listener;
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
 import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
-import org.infinispan.notifications.cachelistener.event.CacheEntryExpiredEvent;
 import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
 import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
 import org.infinispan.persistence.remote.RemoteStore;
@@ -52,8 +47,7 @@ import org.jboss.logging.Logger;
 import org.keycloak.cluster.ClusterEvent;
 import org.keycloak.cluster.ClusterListener;
 import org.keycloak.cluster.ClusterProvider;
-import org.keycloak.common.util.MultivaluedHashMap;
-
+import org.keycloak.common.util.ConcurrentMultivaluedHashMap;
 /**
  * Impl for sending infinispan messages across cluster and listening to them
  *
@@ -63,7 +57,7 @@ public class InfinispanNotificationsManager {
 
     protected static final Logger logger = Logger.getLogger(InfinispanNotificationsManager.class);
 
-    private final MultivaluedHashMap<String, ClusterListener> listeners = new MultivaluedHashMap<>();
+    private final ConcurrentMultivaluedHashMap<String, ClusterListener> listeners = new ConcurrentMultivaluedHashMap<>();
 
     private final ConcurrentMap<String, TaskCallback> taskCallbacks = new ConcurrentHashMap<>();
 
@@ -132,8 +126,10 @@ public class InfinispanNotificationsManager {
         wrappedEvent.setSender(myAddress);
         wrappedEvent.setSenderSite(mySite);
 
+        String eventKey = UUID.randomUUID().toString();
+
         if (logger.isTraceEnabled()) {
-            logger.tracef("Sending event: %s", event);
+            logger.tracef("Sending event with key %s: %s", eventKey, event);
         }
 
         Flag[] flags = dcNotify == ClusterProvider.DCNotify.LOCAL_DC_ONLY
@@ -142,7 +138,7 @@ public class InfinispanNotificationsManager {
 
         // Put the value to the cache to notify listeners on all the nodes
         workCache.getAdvancedCache().withFlags(flags)
-                .put(UUID.randomUUID().toString(), wrappedEvent, 120, TimeUnit.SECONDS);
+                .put(eventKey, wrappedEvent, 120, TimeUnit.SECONDS);
     }
 
 
@@ -208,6 +204,9 @@ public class InfinispanNotificationsManager {
 
     private void eventReceived(String key, Serializable obj) {
         if (!(obj instanceof WrapperClusterEvent)) {
+            if (obj == null) {
+                logger.warnf("Event object wasn't available in remote cache after event was received. Event key: %s", key);
+            }
             return;
         }
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStore.java
index b285290..c50bcf1 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStore.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStore.java
@@ -90,7 +90,7 @@ public class LastSessionRefreshStore {
         LastSessionRefreshEvent event = new LastSessionRefreshEvent(refreshesToSend);
 
         if (logger.isDebugEnabled()) {
-            logger.debugf("Sending lastSessionRefreshes: %s", event.getLastSessionRefreshes().toString());
+            logger.debugf("Sending lastSessionRefreshes for key '%s'. Refreshes: %s", eventKey, event.getLastSessionRefreshes().toString());
         }
 
         // Don't notify local DC about the lastSessionRefreshes. They were processed here already
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
new file mode 100644
index 0000000..f18d8d3
--- /dev/null
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
@@ -0,0 +1,295 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.infinispan.Cache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
+import org.infinispan.client.hotrod.annotation.ClientListener;
+import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.persistence.manager.PersistenceManager;
+import org.infinispan.persistence.remote.RemoteStore;
+import org.infinispan.persistence.remote.configuration.ExhaustedAction;
+import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
+import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
+
+/**
+ * Test that hotrod ClientListeners are correctly executed as expected
+ *
+ * STEPS TO REPRODUCE:
+ * - Unzip infinispan-server-8.2.6.Final to some locations ISPN1 and ISPN2
+ *
+ * - Edit both ISPN1/standalone/configuration/clustered.xml and ISPN2/standalone/configuration/clustered.xml . Configure cache in container "clustered"
+ *
+ * 		<replicated-cache-configuration name="sessions-cfg" mode="ASYNC" start="EAGER" batching="false">
+            <transaction mode="NON_XA" locking="PESSIMISTIC"/>
+        </replicated-cache-configuration>
+
+        <replicated-cache name="work" configuration="sessions-cfg" />
+
+    - Run server1
+ ./standalone.sh -c clustered.xml -Djava.net.preferIPv4Stack=true -Djboss.socket.binding.port-offset=1010 -Djboss.default.multicast.address=234.56.78.99 -Djboss.node.name=cache-server
+
+    - Run server2
+ ./standalone.sh -c clustered.xml -Djava.net.preferIPv4Stack=true -Djboss.socket.binding.port-offset=2010 -Djboss.default.multicast.address=234.56.78.99 -Djboss.node.name=cache-server-dc-2
+
+    - Run this test as main class from IDE
+ *
+ *
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ConcurrencyJDGRemoteCacheClientListenersTest {
+
+    // Helper map to track if listeners were executed
+    private static Map<String, EntryInfo> state = new HashMap<>();
+
+    private static AtomicInteger totalListenerCalls = new AtomicInteger(0);
+
+    private static AtomicInteger totalErrors = new AtomicInteger(0);
+
+
+    public static void main(String[] args) throws Exception {
+        // Init map somehow
+        for (int i=0 ; i<1000 ; i++) {
+            String key = "key-" + i;
+            EntryInfo entryInfo = new EntryInfo();
+            entryInfo.val.set(i);
+            state.put(key, entryInfo);
+        }
+
+        // Create caches, listeners and finally worker threads
+        Worker worker1 = createWorker(1);
+        Worker worker2 = createWorker(2);
+
+        // Note "run", so it's not executed asynchronously here!!!
+        worker1.run();
+
+//
+//        // Start and join workers
+//        worker1.start();
+//        worker2.start();
+//
+//        worker1.join();
+//        worker2.join();
+
+        // Output
+        for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
+            System.out.println(entry.getKey() + ":::" + entry.getValue());
+        }
+
+        System.out.println("totalListeners: " + totalListenerCalls.get() + ", totalErrors: " + totalErrors.get());
+
+
+        // Assert that ClientListener was able to read the value and save it into EntryInfo
+        try {
+            for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
+                EntryInfo info = entry.getValue();
+                Assert.assertEquals(info.val.get(), info.dc1Created.get());
+                Assert.assertEquals(info.val.get(), info.dc2Created.get());
+                Assert.assertEquals(info.val.get() * 2, info.dc1Updated.get());
+                Assert.assertEquals(info.val.get() * 2, info.dc2Updated.get());
+                worker1.cache.remove(entry.getKey());
+            }
+        } finally {
+            // Finish JVM
+            worker1.cache.getCacheManager().stop();
+            worker2.cache.getCacheManager().stop();
+        }
+    }
+
+    private static Worker createWorker(int threadId) {
+        EmbeddedCacheManager manager = createManager(threadId);
+        Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
+
+        System.out.println("Retrieved cache: " + threadId);
+
+        RemoteStore remoteStore = cache.getAdvancedCache().getComponentRegistry().getComponent(PersistenceManager.class).getStores(RemoteStore.class).iterator().next();
+        HotRodListener listener = new HotRodListener(cache, threadId);
+        remoteStore.getRemoteCache().addClientListener(listener);
+
+        return new Worker(cache, threadId);
+    }
+
+    private static EmbeddedCacheManager createManager(int threadId) {
+        System.setProperty("java.net.preferIPv4Stack", "true");
+        System.setProperty("jgroups.tcp.port", "53715");
+        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
+
+        boolean clustered = false;
+        boolean async = false;
+        boolean allowDuplicateJMXDomains = true;
+
+        if (clustered) {
+            gcb = gcb.clusteredDefault();
+            gcb.transport().clusterName("test-clustering");
+        }
+
+        gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
+
+        EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
+
+        Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
+
+        cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
+        return cacheManager;
+
+    }
+
+    private static Configuration getCacheBackedByRemoteStore(int threadId) {
+        ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
+
+        int port = threadId==1 ? 12232 : 13232;
+        //int port = 12232;
+
+        return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
+                .fetchPersistentState(false)
+                .ignoreModifications(false)
+                .purgeOnStartup(false)
+                .preload(false)
+                .shared(true)
+                .remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
+                .rawValues(true)
+                .forceReturnValues(false)
+                .addServer()
+                .host("localhost")
+                .port(port)
+                .connectionPool()
+                .maxActive(20)
+                .exhaustedAction(ExhaustedAction.CREATE_NEW)
+                .async()
+                .   enabled(false).build();
+    }
+
+
+    @ClientListener
+    public static class HotRodListener {
+
+        private final RemoteCache<String, Integer> remoteCache;
+        private final int threadId;
+
+        public HotRodListener(Cache<String, Integer> cache, int threadId) {
+            this.remoteCache = InfinispanUtil.getRemoteCache(cache);
+            this.threadId = threadId;
+        }
+
+        //private AtomicInteger listenerCount = new AtomicInteger(0);
+
+        @ClientCacheEntryCreated
+        public void created(ClientCacheEntryCreatedEvent event) {
+            String cacheKey = (String) event.getKey();
+            event(cacheKey, true);
+
+        }
+
+
+        @ClientCacheEntryModified
+        public void updated(ClientCacheEntryModifiedEvent event) {
+            String cacheKey = (String) event.getKey();
+            event(cacheKey, false);
+        }
+
+
+        private void event(String cacheKey, boolean created) {
+            EntryInfo entryInfo = state.get(cacheKey);
+            entryInfo.successfulListenerWrites.incrementAndGet();
+
+            totalListenerCalls.incrementAndGet();
+
+            Integer val = remoteCache.get(cacheKey);
+            if (val != null) {
+                AtomicInteger dcVal;
+                if (created) {
+                    dcVal = threadId == 1 ? entryInfo.dc1Created : entryInfo.dc2Created;
+                } else {
+                    dcVal = threadId == 1 ? entryInfo.dc1Updated : entryInfo.dc2Updated;
+                }
+                dcVal.set(val);
+            } else {
+                System.err.println("NOT A VALUE FOR KEY: " + cacheKey);
+                totalErrors.incrementAndGet();
+            }
+        }
+
+    }
+
+
+    private static class Worker extends Thread {
+
+        private final Cache<String, Integer> cache;
+
+        private final int myThreadId;
+
+        private Worker(Cache<String, Integer> cache, int myThreadId) {
+            this.cache = cache;
+            this.myThreadId = myThreadId;
+        }
+
+        @Override
+        public void run() {
+            for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
+                String cacheKey = entry.getKey();
+                Integer value = entry.getValue().val.get();
+
+                this.cache.put(cacheKey, value);
+            }
+
+            System.out.println("Worker creating finished: " + myThreadId);
+
+            for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
+                String cacheKey = entry.getKey();
+                Integer value = entry.getValue().val.get() * 2;
+
+                this.cache.replace(cacheKey, value);
+            }
+
+            System.out.println("Worker updating finished: " + myThreadId);
+        }
+
+    }
+
+
+    public static class EntryInfo {
+        AtomicInteger val = new AtomicInteger();
+        AtomicInteger successfulListenerWrites = new AtomicInteger(0);
+        AtomicInteger dc1Created = new AtomicInteger();
+        AtomicInteger dc2Created = new AtomicInteger();
+        AtomicInteger dc1Updated = new AtomicInteger();
+        AtomicInteger dc2Updated = new AtomicInteger();
+
+        @Override
+        public String toString() {
+            return String.format("val: %d, successfulListenerWrites: %d, dc1Created: %d, dc2Created: %d, dc1Updated: %d, dc2Updated: %d", val.get(), successfulListenerWrites.get(),
+                    dc1Created.get(), dc2Created.get(), dc1Updated.get(), dc2Updated.get());
+        }
+    }
+}
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
index d86e6f8..df1b80e 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
@@ -43,11 +43,12 @@ import org.junit.Ignore;
 import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
 
 /**
- * Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG
+ * Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG. Especially tests "putIfAbsent" contract.
+ *
+ * Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
  *
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
  */
-@Ignore
 public class ConcurrencyJDGRemoteCacheTest {
 
     private static Map<String, EntryInfo> state = new HashMap<>();
@@ -122,8 +123,8 @@ public class ConcurrencyJDGRemoteCacheTest {
     private static Configuration getCacheBackedByRemoteStore(int threadId) {
         ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
 
-        // int port = threadId==1 ? 11222 : 11322;
-        int port = 11222;
+        int port = threadId==1 ? 12232 : 13232;
+        //int port = 12232;
 
         return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
                 .fetchPersistentState(false)
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
index 056b0de..7101d38 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
@@ -39,6 +39,7 @@ import org.infinispan.persistence.manager.PersistenceManager;
 import org.infinispan.persistence.remote.RemoteStore;
 import org.infinispan.persistence.remote.configuration.ExhaustedAction;
 import org.jboss.logging.Logger;
+import org.junit.Assert;
 import org.keycloak.common.util.Time;
 import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
 import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
@@ -50,13 +51,9 @@ import org.keycloak.models.sessions.infinispan.remotestore.KcRemoteStoreConfigur
 import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
 
 /**
- * Test requires to prepare 2 JDG (or infinispan servers) before it's runned.
- * Steps:
- * - In JDG1/standalone/configuration/clustered.xml add this: <replicated-cache name="sessions" mode="SYNC" start="EAGER"/>
- * - Same in JDG2
- * - Run JDG1 with: ./standalone.sh -c clustered.xml
- * - Run JDG2 with: ./standalone.sh -c clustered.xml -Djboss.socket.binding.port-offset=100
- * - Run this test
+ * Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG. Especially tests "replaceWithVersion" contract.
+ *
+ * Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
  *
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
  */
@@ -66,6 +63,9 @@ public class ConcurrencyJDGSessionsCacheTest {
 
     private static final int ITERATION_PER_WORKER = 1000;
 
+    private static RemoteCache remoteCache1;
+    private static RemoteCache remoteCache2;
+
     private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
     private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
 
@@ -176,6 +176,16 @@ public class ConcurrencyJDGSessionsCacheTest {
                 ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
                 ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() );
 
+        System.out.println("Sleeping before other report");
+
+        Thread.sleep(1000);
+
+        System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
+                ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
+                ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
+
+
+
         // Finish JVM
         cache1.getCacheManager().stop();
         cache2.getCacheManager().stop();
@@ -186,7 +196,11 @@ public class ConcurrencyJDGSessionsCacheTest {
 
         RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
 
-        remoteCache.keySet();
+        if (threadId == 1) {
+            remoteCache1 = remoteCache;
+        } else {
+            remoteCache2 = remoteCache;
+        }
 
         AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2;
         HotRodListener listener = new HotRodListener(cache, remoteCache, counter);
@@ -224,8 +238,8 @@ public class ConcurrencyJDGSessionsCacheTest {
     private static Configuration getCacheBackedByRemoteStore(int threadId) {
         ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
 
-        //int port = threadId==1 ? 11222 : 11322;
-        int port = 11222;
+        int port = threadId==1 ? 12232 : 13232;
+        //int port = 12232;
 
         return cacheConfigBuilder.persistence().addStore(KcRemoteStoreConfigurationBuilder.class)
                 .fetchPersistentState(false)
@@ -288,12 +302,12 @@ public class ConcurrencyJDGSessionsCacheTest {
 
     private static class RemoteCacheWorker extends Thread {
 
-        private final RemoteCache<String, UserSessionEntity> cache;
+        private final RemoteCache<String, UserSessionEntity> remoteCache;
 
         private final int myThreadId;
 
-        private RemoteCacheWorker(RemoteCache cache, int myThreadId) {
-            this.cache = cache;
+        private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) {
+            this.remoteCache = remoteCache;
             this.myThreadId = myThreadId;
         }
 
@@ -306,7 +320,7 @@ public class ConcurrencyJDGSessionsCacheTest {
 
                 boolean replaced = false;
                 while (!replaced) {
-                    VersionedValue<UserSessionEntity> versioned = cache.getVersioned("123");
+                    VersionedValue<UserSessionEntity> versioned = remoteCache.getVersioned("123");
                     UserSessionEntity oldSession = versioned.getValue();
                     //UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession);
                     UserSessionEntity clone = oldSession;
@@ -315,13 +329,20 @@ public class ConcurrencyJDGSessionsCacheTest {
                     //cache.replace("123", clone);
                     replaced = cacheReplace(versioned, clone);
                 }
+
+                // Try to see if remoteCache on 2nd DC is immediatelly seeing our change
+                RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1;
+                UserSessionEntity thatSession = (UserSessionEntity) secondDCRemoteCache.get("123");
+
+                Assert.assertEquals("someVal", thatSession.getNotes().get(noteKey));
+                //System.out.println("Passed");
             }
 
         }
 
         private boolean cacheReplace(VersionedValue<UserSessionEntity> oldSession, UserSessionEntity newSession) {
             try {
-                boolean replaced = cache.replaceWithVersion("123", newSession, oldSession.getVersion());
+                boolean replaced = remoteCache.replaceWithVersion("123", newSession, oldSession.getVersion());
                 //cache.replace("123", newSession);
                 if (!replaced) {
                     failedReplaceCounter.incrementAndGet();
diff --git a/testsuite/integration-arquillian/servers/cache-server/jboss/common/add-keycloak-caches.xsl b/testsuite/integration-arquillian/servers/cache-server/jboss/common/add-keycloak-caches.xsl
index bdd8b7c..b6fbd2e 100644
--- a/testsuite/integration-arquillian/servers/cache-server/jboss/common/add-keycloak-caches.xsl
+++ b/testsuite/integration-arquillian/servers/cache-server/jboss/common/add-keycloak-caches.xsl
@@ -47,7 +47,7 @@
         <xsl:copy>
             <xsl:apply-templates select="@* | node()" />
 
-            <replicated-cache-configuration name="sessions-cfg" mode="SYNC" start="EAGER" batching="false">
+            <replicated-cache-configuration name="sessions-cfg" mode="ASYNC" start="EAGER" batching="false">
                 <transaction mode="NON_XA" locking="PESSIMISTIC"/>
             </replicated-cache-configuration>
 
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/AbstractConcurrencyTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/AbstractConcurrencyTest.java
index e228270..5559a5e 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/AbstractConcurrencyTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/AbstractConcurrencyTest.java
@@ -76,6 +76,7 @@ public abstract class AbstractConcurrencyTest extends AbstractTestRealmKeycloakT
                     runnable.run(arrayIndex % numThreads, keycloaks.get(), keycloaks.get().realm(REALM_NAME));
                 } catch (Throwable ex) {
                     failures.add(ex);
+                    log.error(ex.getMessage(), ex);
                 }
                 return null;
             });
@@ -93,7 +94,7 @@ public abstract class AbstractConcurrencyTest extends AbstractTestRealmKeycloakT
         }
 
         if (! failures.isEmpty()) {
-            RuntimeException ex = new RuntimeException("There were failures in threads");
+            RuntimeException ex = new RuntimeException("There were failures in threads. Failures count: " + failures.size());
             failures.forEach(ex::addSuppressed);
             throw ex;
         }