keycloak-uncached

KEYCLOAK-5371 Fix SessionExpirationCrossDCTest, Added

10/9/2017 8:03:39 AM

Changes

Details

diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProviderFactory.java
index 330de4f..035b259 100644
--- a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProviderFactory.java
+++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProviderFactory.java
@@ -97,7 +97,7 @@ public class InfinispanClusterProviderFactory implements ClusterProviderFactory 
                     String myAddress = InfinispanUtil.getMyAddress(session);
                     String mySite = InfinispanUtil.getMySite(session);
 
-                    notificationsManager = InfinispanNotificationsManager.create(workCache, myAddress, mySite, remoteStores);
+                    notificationsManager = InfinispanNotificationsManager.create(session, workCache, myAddress, mySite, remoteStores);
                 }
             }
         }
diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java
index 0c5e6e9..c01e14b 100644
--- a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java
+++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java
@@ -23,6 +23,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.infinispan.Cache;
@@ -48,6 +50,8 @@ import org.keycloak.cluster.ClusterEvent;
 import org.keycloak.cluster.ClusterListener;
 import org.keycloak.cluster.ClusterProvider;
 import org.keycloak.common.util.ConcurrentMultivaluedHashMap;
+import org.keycloak.executors.ExecutorsProvider;
+import org.keycloak.models.KeycloakSession;
 import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
 
 /**
@@ -71,17 +75,20 @@ public class InfinispanNotificationsManager {
 
     private final String mySite;
 
+    private final ExecutorService listenersExecutor;
 
-    protected InfinispanNotificationsManager(Cache<String, Serializable> workCache, RemoteCache workRemoteCache, String myAddress, String mySite) {
+
+    protected InfinispanNotificationsManager(Cache<String, Serializable> workCache, RemoteCache workRemoteCache, String myAddress, String mySite, ExecutorService listenersExecutor) {
         this.workCache = workCache;
         this.workRemoteCache = workRemoteCache;
         this.myAddress = myAddress;
         this.mySite = mySite;
+        this.listenersExecutor = listenersExecutor;
     }
 
 
     // Create and init manager including all listeners etc
-    public static InfinispanNotificationsManager create(Cache<String, Serializable> workCache, String myAddress, String mySite, Set<RemoteStore> remoteStores) {
+    public static InfinispanNotificationsManager create(KeycloakSession session, Cache<String, Serializable> workCache, String myAddress, String mySite, Set<RemoteStore> remoteStores) {
         RemoteCache workRemoteCache = null;
 
         if (!remoteStores.isEmpty()) {
@@ -93,7 +100,8 @@ public class InfinispanNotificationsManager {
             }
         }
 
-        InfinispanNotificationsManager manager = new InfinispanNotificationsManager(workCache, workRemoteCache, myAddress, mySite);
+        ExecutorService listenersExecutor = session.getProvider(ExecutorsProvider.class).getExecutor("work-cache-event-listener");
+        InfinispanNotificationsManager manager = new InfinispanNotificationsManager(workCache, workRemoteCache, myAddress, mySite, listenersExecutor);
 
         // We need CacheEntryListener for communication within current DC
         workCache.addListener(manager.new CacheEntryListener());
@@ -206,8 +214,20 @@ public class InfinispanNotificationsManager {
 
         private void hotrodEventReceived(String key) {
             // TODO: Look at CacheEventConverter stuff to possibly include value in the event and avoid additional remoteCache request
-            Object value = workCache.get(key);
-            eventReceived(key, (Serializable) value);
+            try {
+                listenersExecutor.submit(() -> {
+
+                    Object value = workCache.get(key);
+                    eventReceived(key, (Serializable) value);
+
+                });
+            } catch (RejectedExecutionException ree) {
+                logger.warnf("Rejected submitting of the event for key: %s. Probably server going to shutdown", key);
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug(ree.getMessage(), ree);
+                }
+            }
         }
 
     }
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/ClientListenerExecutorDecorator.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/ClientListenerExecutorDecorator.java
new file mode 100644
index 0000000..58b83a0
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/ClientListenerExecutorDecorator.java
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.remotestore;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
+import org.infinispan.client.hotrod.event.ClientEvent;
+import org.jboss.logging.Logger;
+import org.keycloak.common.util.MultivaluedHashMap;
+
+import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_CREATED;
+import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_REMOVED;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ClientListenerExecutorDecorator<K> {
+
+    private static final Logger logger = Logger.getLogger(ClientListenerExecutorDecorator.class);
+
+    private final Object lock = new Object();
+
+    private final ExecutorService decorated;
+
+    // Both "eventsInProgress" and "eventsQueue" maps are guarded by the "lock", so doesn't need to be concurrency safe
+
+    // Events currently submitted to the ExecutorService
+    private Map<K, MyClientEvent> eventsInProgress = new HashMap<>();
+
+    // Queue of the events waiting to process. We don't want events of same key to be processed concurrently
+    private MultivaluedHashMap<K, MyClientEventContext> eventsQueue = new MultivaluedHashMap<>();
+
+
+    public ClientListenerExecutorDecorator(ExecutorService decorated) {
+        this.decorated = decorated;
+    }
+
+
+    // Explicitly use 3 submit methods to ensure that different type of ClientEvent is not used
+
+    public void submit(ClientCacheEntryCreatedEvent<K> cacheEntryCreatedEvent, Runnable r) {
+        MyClientEvent event = convertIspnClientEvent(cacheEntryCreatedEvent);
+        submit(event, r);
+    }
+
+
+    public void submit(ClientCacheEntryModifiedEvent<K> cacheEntryModifiedEvent, Runnable r) {
+        MyClientEvent event = convertIspnClientEvent(cacheEntryModifiedEvent);
+        submit(event, r);
+    }
+
+
+    public void submit(ClientCacheEntryRemovedEvent<K> cacheEntryRemovedEvent, Runnable r) {
+        MyClientEvent event = convertIspnClientEvent(cacheEntryRemovedEvent);
+        submit(event, r);
+    }
+
+
+    // IMPL
+
+    private void submit(MyClientEvent event, Runnable r) {
+        K key = event.key;
+
+        synchronized (lock) {
+            if (!eventsInProgress.containsKey(key)) {
+                submitImpl(key, event, r);
+            } else {
+                putEventToTheQueue(key, event, r);
+            }
+        }
+    }
+
+
+    // Assume it's called from the synchronized block
+    private void submitImpl(K key, MyClientEvent event, Runnable r) {
+        logger.debugf("Submitting event to the executor: %s", event.toString());
+
+        eventsInProgress.put(key, event);
+
+        Runnable decoratedRunnable = () -> {
+            try {
+                r.run();
+            } finally {
+                synchronized (lock) {
+                    logger.debugf("Finished processing event by the executor: %s", event.toString());
+                    eventsInProgress.remove(key);
+
+                    pollQueue(key);
+                }
+            }
+        };
+
+        decorated.submit(decoratedRunnable);
+    }
+
+
+    // Assume it's called from the synchronized block
+    private void pollQueue(K key) {
+        if (eventsQueue.containsKey(key)) {
+            List<MyClientEventContext> events = eventsQueue.get(key);
+
+            if (events.size() > 0) {
+                MyClientEventContext nextEvent = events.remove(0);
+
+                // Was last event in the queue for that key
+                if (events.size() == 0) {
+                    eventsQueue.remove(key);
+                }
+
+                submitImpl(key, nextEvent.event, nextEvent.r);
+
+            } else {
+                // Shouldn't happen
+                throw new IllegalStateException("Illegal state. Size was 0 for key " + key);
+            }
+        }
+    }
+
+
+    // Assume it's called from the synchronized block
+    private void putEventToTheQueue(K key, MyClientEvent event, Runnable r) {
+        logger.debugf("Calling putEventToTheQueue: %s", event.toString());
+
+        if (!eventsQueue.containsKey(key)) {
+            eventsQueue.putSingle(key, new MyClientEventContext(event, r));
+        } else {
+
+            List<MyClientEventContext> existingEvents = eventsQueue.get(key);
+            MyClientEventContext myNewEvent = new MyClientEventContext(event, r);
+
+            // Try to optimize queue (EG. in case we have REMOVE event, we can ignore the previous CREATE or MODIFIED events)
+            switch (event.type) {
+                case CLIENT_CACHE_ENTRY_CREATED:
+                    boolean add = true;
+                    for (MyClientEventContext ctx : existingEvents) {
+                        if (ctx.event.type == CLIENT_CACHE_ENTRY_REMOVED) {
+                            // Ignore. TODO: Log me?
+                            add = false;
+                            break;
+                        } else if (ctx.event.type == CLIENT_CACHE_ENTRY_CREATED) {
+                            // Ignore. Already on the list
+                            add = false;
+                            break;
+                        }
+                    }
+
+                    // Add to the beginning before the MODIFIED events
+                    if (add) {
+                        existingEvents.add(0, myNewEvent);
+                    }
+                    break;
+                case CLIENT_CACHE_ENTRY_MODIFIED:
+
+                    boolean addd = true;
+                    for (int i=0 ; i<existingEvents.size() ; i++) {
+                        MyClientEventContext ctx = existingEvents.get(i);
+                        if (ctx.event.type == CLIENT_CACHE_ENTRY_REMOVED) {
+                            // Ignore.
+                            addd = false;
+                            break;
+                        } else if (ctx.event.type == CLIENT_CACHE_ENTRY_CREATED) {
+                            // Shift to the next element. CREATE event go first.
+                        } else {
+                            // Can ignore the previous MODIFY event if we have newer version
+                            if (ctx.event.version < myNewEvent.event.version) {
+                                existingEvents.remove(i);
+                            } else {
+                                addd = false;
+                            }
+                        }
+
+                        if (addd) {
+                            // Add to the end
+                            existingEvents.add(myNewEvent);
+                        }
+                    }
+                    break;
+
+                case CLIENT_CACHE_ENTRY_REMOVED:
+                    // Can just ignore the other events in the queue in case of REMOVE
+                    eventsQueue.putSingle(key, new MyClientEventContext(event, r));
+                    break;
+                default:
+                    throw new IllegalStateException("Unsupported event type: " + event.type);
+            }
+
+        }
+
+        logger.debugf("Event queued. Current events for the key '%s': %s", key.toString(), eventsQueue.getList(key));
+    }
+
+
+    public MyClientEvent convertIspnClientEvent(ClientEvent ispnClientEvent) {
+        if (ispnClientEvent instanceof ClientCacheEntryCreatedEvent) {
+            ClientCacheEntryCreatedEvent<K> ev = (ClientCacheEntryCreatedEvent<K>) ispnClientEvent;
+            return new MyClientEvent(ev.getKey(), ev.getVersion(), ev.getType());
+        } else if (ispnClientEvent instanceof ClientCacheEntryModifiedEvent) {
+            ClientCacheEntryModifiedEvent<K> ev = (ClientCacheEntryModifiedEvent<K>) ispnClientEvent;
+            return new MyClientEvent(ev.getKey(), ev.getVersion(), ev.getType());
+        } else if (ispnClientEvent instanceof ClientCacheEntryRemovedEvent) {
+            ClientCacheEntryRemovedEvent<K> ev = (ClientCacheEntryRemovedEvent<K>) ispnClientEvent;
+            return new MyClientEvent(ev.getKey(), -1l, ev.getType());
+        } else {
+            throw new IllegalStateException("Unsupported event type: " + ispnClientEvent.getType());
+        }
+    }
+
+
+    private class MyClientEventContext {
+        private final MyClientEvent event;
+        private final Runnable r;
+
+        private MyClientEventContext(MyClientEvent event, Runnable r) {
+            this.event = event;
+            this.r = r;
+        }
+
+        @Override
+        public String toString() {
+            return event.toString();
+        }
+    }
+
+
+    // Using separate class as ISPN ClientEvent type doesn't provide access to key and version :/
+    private class MyClientEvent {
+        private final K key;
+        private final long version;
+        private final ClientEvent.Type type;
+
+        private MyClientEvent(K key, long version, ClientEvent.Type type) {
+            this.key = key;
+            this.version = version;
+            this.type = type;
+        }
+
+
+        @Override
+        public String toString() {
+            return String.format("ClientEvent [ type=%s, key=%s, version=%d ]", type, key, version);
+        }
+    }
+
+}
+
+
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java
index b186833..04e14f5 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java
@@ -22,22 +22,20 @@ import org.infinispan.client.hotrod.RemoteCache;
 import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
 import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
 import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
-import org.infinispan.client.hotrod.annotation.ClientCacheFailover;
 import org.infinispan.client.hotrod.annotation.ClientListener;
 import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
 import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
 import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
-import org.infinispan.client.hotrod.event.ClientCacheFailoverEvent;
 import org.infinispan.client.hotrod.event.ClientEvent;
 import org.infinispan.context.Flag;
 import org.jboss.logging.Logger;
+import org.keycloak.executors.ExecutorsProvider;
 import org.keycloak.models.KeycloakSession;
 import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
 import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
-import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
 import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
 import java.util.Random;
-import java.util.logging.Level;
+import java.util.concurrent.ExecutorService;
 import org.infinispan.client.hotrod.VersionedValue;
 
 /**
@@ -52,6 +50,7 @@ public class RemoteCacheSessionListener<K, V extends SessionEntity>  {
     private RemoteCache<K, V> remoteCache;
     private boolean distributed;
     private String myAddress;
+    private ClientListenerExecutorDecorator<K> executor;
 
 
     protected RemoteCacheSessionListener() {
@@ -68,6 +67,9 @@ public class RemoteCacheSessionListener<K, V extends SessionEntity>  {
         } else {
             this.myAddress = null;
         }
+
+        ExecutorService executor = session.getProvider(ExecutorsProvider.class).getExecutor("client-listener-" + cache.getName());
+        this.executor = new ClientListenerExecutorDecorator<>(executor);
     }
 
 
@@ -76,8 +78,12 @@ public class RemoteCacheSessionListener<K, V extends SessionEntity>  {
         K key = (K) event.getKey();
 
         if (shouldUpdateLocalCache(event.getType(), key, event.isCommandRetried())) {
-            // Should load it from remoteStore
-            cache.get(key);
+            this.executor.submit(event, () -> {
+
+                // Should load it from remoteStore
+                cache.get(key);
+
+            });
         }
     }
 
@@ -88,7 +94,11 @@ public class RemoteCacheSessionListener<K, V extends SessionEntity>  {
 
         if (shouldUpdateLocalCache(event.getType(), key, event.isCommandRetried())) {
 
-            replaceRemoteEntityInCache(key, event.getVersion());
+            this.executor.submit(event, () -> {
+
+                replaceRemoteEntityInCache(key, event.getVersion());
+
+            });
         }
     }
 
@@ -104,10 +114,18 @@ public class RemoteCacheSessionListener<K, V extends SessionEntity>  {
             
             SessionEntityWrapper<V> localEntityWrapper = cache.get(key);
             VersionedValue<V> remoteSessionVersioned = remoteCache.getVersioned(key);
-            if (remoteSessionVersioned == null || remoteSessionVersioned.getVersion() < eventVersion) {
+
+            // Probably already removed
+            if (remoteSessionVersioned == null) {
+                logger.debugf("Entity '%s' not present in remoteCache. Ignoring replace",
+                        key.toString());
+                return;
+            }
+
+            if (remoteSessionVersioned.getVersion() < eventVersion) {
                 try {
-                    logger.debugf("Got replace remote entity event prematurely, will try again. Event version: %d, got: %d",
-                      eventVersion, remoteSessionVersioned == null ? -1 : remoteSessionVersioned.getVersion());
+                    logger.debugf("Got replace remote entity event prematurely for entity '%s', will try again. Event version: %d, got: %d",
+                      key.toString(), eventVersion, remoteSessionVersioned == null ? -1 : remoteSessionVersioned.getVersion());
                     Thread.sleep(new Random().nextInt(sleepInterval));  // using exponential backoff
                     continue;
                 } catch (InterruptedException ex) {
@@ -116,9 +134,9 @@ public class RemoteCacheSessionListener<K, V extends SessionEntity>  {
                     sleepInterval = sleepInterval << 1;
                 }
             }
-            SessionEntity remoteSession = (SessionEntity) remoteCache.get(key);
+            SessionEntity remoteSession = remoteSessionVersioned.getValue();
 
-            logger.debugf("Read session%s. Entity read from remote cache: %s", replaceRetries > 1 ? "" : " again", remoteSession);
+            logger.debugf("Read session entity from the remote cache: %s . replaceRetries=%d", remoteSession.toString(), replaceRetries);
 
             SessionEntityWrapper<V> sessionWrapper = remoteSession.mergeRemoteEntityWithLocalEntity(localEntityWrapper);
 
@@ -127,7 +145,7 @@ public class RemoteCacheSessionListener<K, V extends SessionEntity>  {
                     .replace(key, localEntityWrapper, sessionWrapper);
 
             if (! replaced) {
-                logger.debugf("Did not succeed in merging sessions, will try again: %s", remoteSession);
+                logger.debugf("Did not succeed in merging sessions, will try again: %s", remoteSession.toString());
             }
         } while (replaceRetries < MAXIMUM_REPLACE_RETRIES && ! replaced);
     }
@@ -138,16 +156,15 @@ public class RemoteCacheSessionListener<K, V extends SessionEntity>  {
         K key = (K) event.getKey();
 
         if (shouldUpdateLocalCache(event.getType(), key, event.isCommandRetried())) {
-            // We received event from remoteCache, so we won't update it back
-            cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD, Flag.IGNORE_RETURN_VALUES)
-                    .remove(key);
-        }
-    }
 
+            this.executor.submit(event, () -> {
+
+                // We received event from remoteCache, so we won't update it back
+                cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD, Flag.IGNORE_RETURN_VALUES)
+                        .remove(key);
 
-    @ClientCacheFailover
-    public void failover(ClientCacheFailoverEvent event) {
-        logger.infof("Received failover event: " + event.toString());
+            });
+        }
     }
 
 
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
index b524885..0055f16 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.infinispan.Cache;
 import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.VersionedValue;
 import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
 import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
 import org.infinispan.client.hotrod.annotation.ClientListener;
@@ -151,7 +152,7 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
         @ClientCacheEntryCreated
         public void created(ClientCacheEntryCreatedEvent event) {
             String cacheKey = (String) event.getKey();
-            event(cacheKey, true);
+            event(cacheKey, event.getVersion(), true);
 
         }
 
@@ -159,17 +160,25 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
         @ClientCacheEntryModified
         public void updated(ClientCacheEntryModifiedEvent event) {
             String cacheKey = (String) event.getKey();
-            event(cacheKey, false);
+            event(cacheKey, event.getVersion(), false);
         }
 
 
-        private void event(String cacheKey, boolean created) {
+        private void event(String cacheKey, long version, boolean created) {
             EntryInfo entryInfo = state.get(cacheKey);
             entryInfo.successfulListenerWrites.incrementAndGet();
 
             totalListenerCalls.incrementAndGet();
 
-            Integer val = remoteCache.get(cacheKey);
+            VersionedValue<Integer> versionedVal = remoteCache.getVersioned(cacheKey);
+
+            if (versionedVal.getVersion() < version) {
+                System.err.println("INCOMPATIBLE VERSION. event version: " + version + ", entity version: " + versionedVal.getVersion());
+                totalErrors.incrementAndGet();
+                return;
+            }
+
+            Integer val = versionedVal.getValue();
             if (val != null) {
                 AtomicInteger dcVal;
                 if (created) {
@@ -187,6 +196,17 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
     }
 
 
+    private static void createItems(Cache<String, Integer> cache, int myThreadId) {
+        for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
+            String cacheKey = entry.getKey();
+            Integer value = entry.getValue().val.get();
+
+            cache.put(cacheKey, value);
+        }
+
+        System.out.println("Worker creating finished: " + myThreadId);
+    }
+
     private static class Worker extends Thread {
 
         private final Cache<String, Integer> cache;
@@ -200,14 +220,7 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
 
         @Override
         public void run() {
-            for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
-                String cacheKey = entry.getKey();
-                Integer value = entry.getValue().val.get();
-
-                this.cache.put(cacheKey, value);
-            }
-
-            System.out.println("Worker creating finished: " + myThreadId);
+            createItems(cache, myThreadId);
 
             for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
                 String cacheKey = entry.getKey();
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
index d95c5f9..a4091a1 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
@@ -64,6 +64,8 @@ public class ConcurrencyJDGSessionsCacheTest {
     private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0);
     private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0);
 
+    private static final ConcurrencyTestHistogram histogram = new ConcurrencyTestHistogram();
+
     //private static Map<String, EntryInfo> state = new HashMap<>();
 
     public static void main(String[] args) throws Exception {
@@ -176,7 +178,8 @@ public class ConcurrencyJDGSessionsCacheTest {
                 ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
                 ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
 
-
+        System.out.println("Histogram: ");
+        histogram.dumpStats();
 
         // Finish JVM
         cache1.getCacheManager().stop();
@@ -232,7 +235,14 @@ public class ConcurrencyJDGSessionsCacheTest {
             String cacheKey = (String) event.getKey();
             listenerCount.incrementAndGet();
 
-            // TODO: can be optimized
+            // TODO: can be optimized - object sent in the event
+            VersionedValue<SessionEntity> versionedVal = remoteCache.getVersioned(cacheKey);
+
+            if (versionedVal.getVersion() < event.getVersion()) {
+                System.err.println("INCOMPATIBLE VERSION. event version: " + event.getVersion() + ", entity version: " + versionedVal.getVersion());
+                return;
+            }
+
             SessionEntity session = (SessionEntity) remoteCache.get(cacheKey);
             SessionEntityWrapper sessionWrapper = new SessionEntityWrapper(session);
 
@@ -267,8 +277,12 @@ public class ConcurrencyJDGSessionsCacheTest {
 
             for (int i=0 ; i<ITERATION_PER_WORKER ; i++) {
 
+                // Histogram will contain value 1 in all places as it's always different note and hence session is changed to different value
                 String noteKey = "n-" + myThreadId + "-" + i;
 
+                // In case it's hardcoded (eg. all the replaces are doing same change, so session is defacto not changed), then histogram may contain bigger value than 1 on some places.
+                //String noteKey = "some";
+
                 boolean replaced = false;
                 while (!replaced) {
                     VersionedValue<UserSessionEntity> versioned = remoteCache.getVersioned("123");
@@ -299,6 +313,8 @@ public class ConcurrencyJDGSessionsCacheTest {
                     failedReplaceCounter.incrementAndGet();
                     //return false;
                     //System.out.println("Replace failed!!!");
+                } else {
+                    histogram.increaseSuccessOpsCount(oldSession.getVersion());
                 }
                 return replaced;
             } catch (Exception re) {
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyTestHistogram.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyTestHistogram.java
new file mode 100644
index 0000000..0bbd786
--- /dev/null
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyTestHistogram.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ConcurrencyTestHistogram {
+
+    private final ConcurrentMap<Long, AtomicInteger> counters = new ConcurrentHashMap<>();
+
+
+    public ConcurrencyTestHistogram() {
+
+    }
+
+
+    public void increaseSuccessOpsCount(long version) {
+        AtomicInteger counter = new AtomicInteger(0);
+        AtomicInteger existing = counters.putIfAbsent(version, counter);
+        if (existing != null) {
+            counter = existing;
+        }
+
+        counter.incrementAndGet();
+    }
+
+
+    public void dumpStats() {
+        for (Map.Entry<Long, AtomicInteger> entry : counters.entrySet()) {
+            System.out.println(entry.getKey() + "=" + entry.getValue().get());
+        }
+    }
+}
diff --git a/server-spi-private/src/main/java/org/keycloak/executors/ExecutorsProvider.java b/server-spi-private/src/main/java/org/keycloak/executors/ExecutorsProvider.java
new file mode 100644
index 0000000..ebc40fe
--- /dev/null
+++ b/server-spi-private/src/main/java/org/keycloak/executors/ExecutorsProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.executors;
+
+import java.util.concurrent.ExecutorService;
+
+import org.keycloak.provider.Provider;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public interface ExecutorsProvider extends Provider {
+
+    /**
+     * Impl will usually return different executors for different tasks.
+     *
+     * @param taskType
+     * @return
+     */
+    ExecutorService getExecutor(String taskType);
+
+}
diff --git a/server-spi-private/src/main/java/org/keycloak/executors/ExecutorsProviderFactory.java b/server-spi-private/src/main/java/org/keycloak/executors/ExecutorsProviderFactory.java
new file mode 100644
index 0000000..13dc899
--- /dev/null
+++ b/server-spi-private/src/main/java/org/keycloak/executors/ExecutorsProviderFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.executors;
+
+import org.keycloak.provider.ProviderFactory;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public interface ExecutorsProviderFactory extends ProviderFactory<ExecutorsProvider> {
+}
diff --git a/server-spi-private/src/main/java/org/keycloak/executors/ExecutorsSpi.java b/server-spi-private/src/main/java/org/keycloak/executors/ExecutorsSpi.java
new file mode 100644
index 0000000..e298753
--- /dev/null
+++ b/server-spi-private/src/main/java/org/keycloak/executors/ExecutorsSpi.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.executors;
+
+import org.keycloak.provider.Provider;
+import org.keycloak.provider.ProviderFactory;
+import org.keycloak.provider.Spi;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ExecutorsSpi implements Spi {
+
+    @Override
+    public boolean isInternal() {
+        return true;
+    }
+
+    @Override
+    public String getName() {
+        return "executors";
+    }
+
+    @Override
+    public Class<? extends Provider> getProviderClass() {
+        return ExecutorsProvider.class;
+    }
+
+    @Override
+    public Class<? extends ProviderFactory> getProviderFactoryClass() {
+        return ExecutorsProviderFactory.class;
+    }
+}
diff --git a/server-spi-private/src/main/resources/META-INF/services/org.keycloak.provider.Spi b/server-spi-private/src/main/resources/META-INF/services/org.keycloak.provider.Spi
index 1413465..6db13f7 100755
--- a/server-spi-private/src/main/resources/META-INF/services/org.keycloak.provider.Spi
+++ b/server-spi-private/src/main/resources/META-INF/services/org.keycloak.provider.Spi
@@ -46,6 +46,7 @@ org.keycloak.forms.account.AccountSpi
 org.keycloak.forms.login.LoginFormsSpi
 org.keycloak.email.EmailSenderSpi
 org.keycloak.email.EmailTemplateSpi
+org.keycloak.executors.ExecutorsSpi
 org.keycloak.theme.ThemeSpi
 org.keycloak.truststore.TruststoreSpi
 org.keycloak.connections.httpclient.HttpClientSpi
diff --git a/services/src/main/java/org/keycloak/executors/DefaultExecutorsProviderFactory.java b/services/src/main/java/org/keycloak/executors/DefaultExecutorsProviderFactory.java
new file mode 100644
index 0000000..4c6ac3c
--- /dev/null
+++ b/services/src/main/java/org/keycloak/executors/DefaultExecutorsProviderFactory.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.executors;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.logging.Logger;
+import org.keycloak.Config;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.KeycloakSessionFactory;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class DefaultExecutorsProviderFactory implements ExecutorsProviderFactory {
+
+    protected static final Logger logger = Logger.getLogger(DefaultExecutorsProviderFactory.class);
+
+    private int DEFAULT_MIN_THREADS = 4;
+    private int DEFAULT_MAX_THREADS = 16;
+
+    private Config.Scope config;
+
+    private final Map<String, ExecutorService> executors = new ConcurrentHashMap<>();
+
+
+    @Override
+    public ExecutorsProvider create(KeycloakSession session) {
+        return new ExecutorsProvider() {
+
+            @Override
+            public ExecutorService getExecutor(String taskType) {
+                return DefaultExecutorsProviderFactory.this.getExecutor(taskType, session);
+            }
+
+            @Override
+            public void close() {
+
+            }
+        };
+    }
+
+    @Override
+    public void init(Config.Scope config) {
+        this.config = config;
+    }
+
+    @Override
+    public void postInit(KeycloakSessionFactory factory) {
+
+    }
+
+    @Override
+    public void close() {
+        for (ExecutorService executor : executors.values()) {
+            executor.shutdown();
+        }
+    }
+
+    @Override
+    public String getId() {
+        return "default";
+    }
+
+
+    // IMPL
+
+    protected ExecutorService getExecutor(String taskType, KeycloakSession session) {
+        ExecutorService existing = executors.get(taskType);
+
+        if (existing == null) {
+            synchronized (this) {
+                if (!executors.containsKey(taskType)) {
+                    Config.Scope currentScope = config.scope(taskType);
+                    int min = DEFAULT_MIN_THREADS;
+                    int max = DEFAULT_MAX_THREADS;
+
+                    if (currentScope != null) {
+                        min = currentScope.getInt("min", DEFAULT_MIN_THREADS);
+                        max = currentScope.getInt("max", DEFAULT_MAX_THREADS);
+                    }
+
+                    logger.debugf("Creating pool for task '%s': min=%d, max=%d", taskType, min, max);
+                    ExecutorService executor = createPool(taskType, session, min, max);
+                    executors.put(taskType, executor);
+                }
+
+                existing = executors.get(taskType);
+            }
+        }
+
+        return existing;
+    }
+
+
+    protected ExecutorService createPool(String taskType, KeycloakSession session, int min, int max) {
+        ThreadFactory threadFactory = new ThreadFactory() {
+
+            private AtomicInteger i = new AtomicInteger(0);
+            private int group = new Random().nextInt(2048);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                int threadNumber = i.getAndIncrement();
+                String threadName = "kc-" + taskType + "-" + group + "-" + threadNumber;
+
+                if (logger.isTraceEnabled()) {
+                    logger.tracef("Creating thread: %s", threadName);
+                }
+
+                return new Thread(r, threadName);
+            }
+
+        };
+
+        if (min == max) {
+            return Executors.newFixedThreadPool(min, threadFactory);
+        } else {
+            // Same like Executors.newCachedThreadPool. Besides that "min" and "max" are configurable
+            return new ThreadPoolExecutor(min, max,
+                    60L, TimeUnit.SECONDS,
+                    new SynchronousQueue<Runnable>(),
+                    threadFactory);
+        }
+    }
+
+}
diff --git a/services/src/main/java/org/keycloak/services/resources/admin/AttackDetectionResource.java b/services/src/main/java/org/keycloak/services/resources/admin/AttackDetectionResource.java
index c6064bb..4db698f 100755
--- a/services/src/main/java/org/keycloak/services/resources/admin/AttackDetectionResource.java
+++ b/services/src/main/java/org/keycloak/services/resources/admin/AttackDetectionResource.java
@@ -20,6 +20,7 @@ import org.jboss.logging.Logger;
 import org.jboss.resteasy.annotations.cache.NoCache;
 import org.jboss.resteasy.spi.NotFoundException;
 import org.keycloak.common.ClientConnection;
+import org.keycloak.common.util.Time;
 import org.keycloak.events.admin.OperationType;
 import org.keycloak.events.admin.ResourceType;
 import org.keycloak.models.KeycloakSession;
@@ -100,9 +101,17 @@ public class AttackDetectionResource {
 
         UserLoginFailureModel model = session.sessions().getUserLoginFailure(realm, userId);
         if (model == null) return data;
-        if (session.getProvider(BruteForceProtector.class).isTemporarilyDisabled(session, realm, user)) {
+
+        boolean disabled;
+        if (user == null) {
+            disabled = Time.currentTime() < model.getFailedLoginNotBefore();
+        } else {
+            disabled = session.getProvider(BruteForceProtector.class).isTemporarilyDisabled(session, realm, user);
+        }
+        if (disabled) {
             data.put("disabled", true);
         }
+
         data.put("numFailures", model.getNumFailures());
         data.put("lastFailure", model.getLastFailure());
         data.put("lastIPFailure", model.getLastIPFailure());
diff --git a/services/src/main/resources/META-INF/services/org.keycloak.executors.ExecutorsProviderFactory b/services/src/main/resources/META-INF/services/org.keycloak.executors.ExecutorsProviderFactory
new file mode 100644
index 0000000..2e4f168
--- /dev/null
+++ b/services/src/main/resources/META-INF/services/org.keycloak.executors.ExecutorsProviderFactory
@@ -0,0 +1,35 @@
+#
+# Copyright 2017 Red Hat, Inc. and/or its affiliates
+# and other contributors as indicated by the @author tags.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Copyright 2016 Red Hat, Inc. and/or its affiliates
+# and other contributors as indicated by the @author tags.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.keycloak.executors.DefaultExecutorsProviderFactory
\ No newline at end of file
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/ConcurrentLoginTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/ConcurrentLoginTest.java
index 219d7a2..e350584 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/ConcurrentLoginTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/ConcurrentLoginTest.java
@@ -206,7 +206,7 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
 
                 @Override
                 public void run(int threadIndex, Keycloak keycloak, RealmResource realm) throws Throwable {
-                    log.infof("Trying to execute codeURL: %s, threadIndex: %i", codeURL, threadIndex);
+                    log.infof("Trying to execute codeURL: %s, threadIndex: %d", codeURL, threadIndex);
 
                     OAuthClient.AccessTokenResponse resp = oauth1.doAccessTokenRequest(code, "password");
                     if (resp.getAccessToken() != null && resp.getError() == null) {
@@ -222,10 +222,11 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
 
             oauth1.openLogout();
 
+            // Code should be successfully exchanged for the token just once
             Assert.assertEquals(1, codeToTokenSuccessCount.get());
             Assert.assertEquals(DEFAULT_THREADS - 1, codeToTokenErrorsCount.get());
 
-            log.infof("Iteration %i passed successfully", i);
+            log.infof("Iteration %d passed successfully", i);
         }
 
         long end = System.currentTimeMillis() - start;
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/SessionExpirationCrossDCTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/SessionExpirationCrossDCTest.java
index a7f9559..c605e38 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/SessionExpirationCrossDCTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/SessionExpirationCrossDCTest.java
@@ -20,6 +20,7 @@ package org.keycloak.testsuite.crossdc;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.ws.rs.NotFoundException;
 
@@ -369,14 +370,23 @@ public class SessionExpirationCrossDCTest extends AbstractAdminCrossDCTest {
         // Logout user
         ApiUtil.findUserByUsernameId(getAdminClient().realm(REALM_NAME), "login-test").logout();
 
+        // Another increase after notBefore set
+        setTimeOffset(10);
+
         // Assert it's not possible to refresh sessions. Works because user.notBefore
-        int i = 0;
-        for (OAuthClient.AccessTokenResponse response : responses) {
-            i++;
-            OAuthClient.AccessTokenResponse refreshTokenResponse = oauth.doRefreshTokenRequest(response.getRefreshToken(), "password");
-            Assert.assertNull("Failed in iteration " + i, refreshTokenResponse.getRefreshToken());
-            Assert.assertNotNull("Failed in iteration " + i, refreshTokenResponse.getError());
-        }
+        AtomicInteger i = new AtomicInteger(0);
+        Retry.execute(() -> {
+            i.incrementAndGet();
+            int j = 0;
+            for (OAuthClient.AccessTokenResponse response : responses) {
+                j++;
+                OAuthClient.AccessTokenResponse refreshTokenResponse = oauth.doRefreshTokenRequest(response.getRefreshToken(), "password");
+                Assert.assertNull("Failed in iteration " + j, refreshTokenResponse.getRefreshToken());
+                Assert.assertNotNull("Failed in iteration " + j, refreshTokenResponse.getError());
+            }
+
+            log.infof("Passed the testLogoutUserWithFailover in the iteration: %d", i.get());
+        }, 50, 50);
     }
 
 
diff --git a/testsuite/integration-arquillian/tests/base/src/test/resources/arquillian.xml b/testsuite/integration-arquillian/tests/base/src/test/resources/arquillian.xml
index 1951857..10ca829 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/resources/arquillian.xml
+++ b/testsuite/integration-arquillian/tests/base/src/test/resources/arquillian.xml
@@ -204,6 +204,7 @@
                 <property name="javaVmArguments">
                     ${auth.server.memory.settings}
                     -Djava.net.preferIPv4Stack=true
+                    ${cache.server.crossdc1.jvm.debug.args}
                 </property>
                 <property name="outputToConsole">${cache.server.console.output}</property>
                 <property name="managementPort">${cache.server.management.port}</property>
@@ -229,6 +230,7 @@
                 <property name="javaVmArguments">
                     ${auth.server.memory.settings}
                     -Djava.net.preferIPv4Stack=true
+                    ${cache.server.crossdc2.jvm.debug.args}
                 </property>
                 <property name="outputToConsole">${cache.server.console.output}</property>
                 <property name="managementPort">${cache.server.2.management.port}</property>
diff --git a/testsuite/integration-arquillian/tests/pom.xml b/testsuite/integration-arquillian/tests/pom.xml
index 3ea5ad3..a00aa5d 100755
--- a/testsuite/integration-arquillian/tests/pom.xml
+++ b/testsuite/integration-arquillian/tests/pom.xml
@@ -413,6 +413,14 @@
                 <auth.servers.crossdc>true</auth.servers.crossdc>
                 <auth.server.undertow.crossdc>true</auth.server.undertow.crossdc>
                 <node.name>undertow</node.name>
+
+                <cache.server.crossdc1.jvm.debug.port>6001</cache.server.crossdc1.jvm.debug.port>
+                <cache.server.crossdc2.jvm.debug.port>6002</cache.server.crossdc2.jvm.debug.port>
+
+
+                <!-- default is "n", possible to override by e.g. -Dcache.server.crossdc1.debug.suspend=y -->
+                <cache.server.crossdc1.debug.suspend>${auth.server.debug.suspend}</cache.server.crossdc1.debug.suspend>
+                <cache.server.crossdc2.debug.suspend>${auth.server.debug.suspend}</cache.server.crossdc2.debug.suspend>
             </properties>
             <build>
                 <plugins>
@@ -442,6 +450,15 @@
                             <systemPropertyVariables>
                                 <node.name>${node.name}</node.name>
                                 <pageload.timeout>20000</pageload.timeout>
+
+                                <!-- TODO Same props config is duplicated for undertow. Use separate profile? -->
+                                <cache.server.crossdc1.jvm.debug.args>
+                                    -agentlib:jdwp=transport=dt_socket,server=y,suspend=${cache.server.crossdc1.debug.suspend},address=localhost:${cache.server.crossdc1.jvm.debug.port}
+                                </cache.server.crossdc1.jvm.debug.args>
+                                <cache.server.crossdc2.jvm.debug.args>
+                                    -agentlib:jdwp=transport=dt_socket,server=y,suspend=${cache.server.crossdc2.debug.suspend},address=localhost:${cache.server.crossdc2.jvm.debug.port}
+                                </cache.server.crossdc2.jvm.debug.args>
+
                             </systemPropertyVariables>
                         </configuration>
                     </plugin>
@@ -464,12 +481,16 @@
                 <!-- property specifies keycloak-add-user.json file destination -->
                 <auth.server.config.dir>${auth.server.crossdc01.home}/standalone/configuration</auth.server.config.dir>
 
+                <cache.server.crossdc1.jvm.debug.port>6001</cache.server.crossdc1.jvm.debug.port>
+                <cache.server.crossdc2.jvm.debug.port>6002</cache.server.crossdc2.jvm.debug.port>
                 <auth.server.crossdc01.jvm.debug.port>5001</auth.server.crossdc01.jvm.debug.port>
                 <auth.server.crossdc02.jvm.debug.port>5002</auth.server.crossdc02.jvm.debug.port>
                 <auth.server.crossdc11.jvm.debug.port>5011</auth.server.crossdc11.jvm.debug.port>
                 <auth.server.crossdc12.jvm.debug.port>5012</auth.server.crossdc12.jvm.debug.port>
 
                 <!-- default is "n", possible to override by e.g. -Dauth.server.crossdc01.debug.suspend=y -->
+                <cache.server.crossdc1.debug.suspend>${auth.server.debug.suspend}</cache.server.crossdc1.debug.suspend>
+                <cache.server.crossdc2.debug.suspend>${auth.server.debug.suspend}</cache.server.crossdc2.debug.suspend>
                 <auth.server.crossdc01.debug.suspend>${auth.server.debug.suspend}</auth.server.crossdc01.debug.suspend>
                 <auth.server.crossdc02.debug.suspend>${auth.server.debug.suspend}</auth.server.crossdc02.debug.suspend>
                 <auth.server.crossdc11.debug.suspend>${auth.server.debug.suspend}</auth.server.crossdc11.debug.suspend>
@@ -563,6 +584,14 @@
                                 <auth.server.crossdc11.management.port>10021</auth.server.crossdc11.management.port>
                                 <auth.server.crossdc12.management.port>10022</auth.server.crossdc12.management.port>
 
+                                <!-- TODO Same props config is duplicated for undertow. Use separate profile? -->
+                                <cache.server.crossdc1.jvm.debug.args>
+                                    -agentlib:jdwp=transport=dt_socket,server=y,suspend=${cache.server.crossdc1.debug.suspend},address=localhost:${cache.server.crossdc1.jvm.debug.port}
+                                </cache.server.crossdc1.jvm.debug.args>
+                                <cache.server.crossdc2.jvm.debug.args>
+                                    -agentlib:jdwp=transport=dt_socket,server=y,suspend=${cache.server.crossdc2.debug.suspend},address=localhost:${cache.server.crossdc2.jvm.debug.port}
+                                </cache.server.crossdc2.jvm.debug.args>
+
                                 <auth.server.crossdc01.jvm.debug.args>
                                     -agentlib:jdwp=transport=dt_socket,server=y,suspend=${auth.server.crossdc01.debug.suspend},address=localhost:${auth.server.crossdc01.jvm.debug.port}
                                 </auth.server.crossdc01.jvm.debug.args>