keycloak-uncached

Changes

Details

diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
index d3bcacc..43bb2b3 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java
@@ -127,7 +127,16 @@ public class InfinispanChangelogBasedTransaction<S extends SessionEntity> extend
 
             return wrappedEntity;
         } else {
-            return myUpdates.getEntityWrapper();
+            S entity = myUpdates.getEntityWrapper().getEntity();
+
+            // If entity is scheduled for remove, we don't return it.
+            boolean scheduledForRemove = myUpdates.getUpdateTasks().stream().filter((SessionUpdateTask task) -> {
+
+                return task.getOperation(entity) == SessionUpdateTask.CacheOperation.REMOVE;
+
+            }).findFirst().isPresent();
+
+            return scheduledForRemove ? null : myUpdates.getEntityWrapper();
         }
     }
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/MergedUpdate.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/MergedUpdate.java
index 695401d..1f24f84 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/MergedUpdate.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/MergedUpdate.java
@@ -95,5 +95,10 @@ class MergedUpdate<S extends SessionEntity> implements SessionUpdateTask<S> {
         return result;
     }
 
+    @Override
+    public String toString() {
+        return "MergedUpdate" + childUpdates;
+    }
+
 
 }
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionEntityWrapper.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionEntityWrapper.java
index 400a1cd..ca21487 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionEntityWrapper.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionEntityWrapper.java
@@ -117,6 +117,10 @@ public class SessionEntityWrapper<S extends SessionEntity> {
                 + Objects.hashCode(entity);
     }
 
+    @Override
+    public String toString() {
+        return "SessionEntityWrapper{" + "version=" + version + ", entity=" + entity + ", localMetadata=" + localMetadata + '}';
+    }
 
     public static class ExternalizerImpl implements Externalizer<SessionEntityWrapper> {
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java
index 4e349f6..f9adf9b 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java
@@ -41,10 +41,6 @@ public class LastSessionRefreshChecker {
     }
 
 
-    // Metadata attribute, which contains the lastSessionRefresh available on remoteCache. Used in decide whether we need to write to remoteCache (DC) or not
-    public static final String LAST_SESSION_REFRESH_REMOTE = "lsrr";
-
-
     public SessionUpdateTask.CrossDCMessageStatus getCrossDCMessageStatus(KeycloakSession kcSession, RealmModel realm, SessionEntityWrapper<UserSessionEntity> sessionWrapper, boolean offline, int newLastSessionRefresh) {
         // revokeRefreshToken always writes everything to remoteCache immediately
         if (realm.isRevokeRefreshToken()) {
@@ -62,7 +58,7 @@ public class LastSessionRefreshChecker {
             return SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED;
         }
 
-        Integer lsrr = sessionWrapper.getLocalMetadataNoteInt(LAST_SESSION_REFRESH_REMOTE);
+        Integer lsrr = sessionWrapper.getLocalMetadataNoteInt(UserSessionEntity.LAST_SESSION_REFRESH_REMOTE);
         if (lsrr == null) {
             logger.warnf("Not available lsrr note on user session %s.", sessionWrapper.getEntity().getId());
             return SessionUpdateTask.CrossDCMessageStatus.SYNC;
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/SessionEntity.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/SessionEntity.java
index feca10e..25ac2a4 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/SessionEntity.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/SessionEntity.java
@@ -24,7 +24,7 @@ import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
 /**
  * @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
  */
-public class SessionEntity implements Serializable {
+public abstract class SessionEntity implements Serializable {
 
     private String id;
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java
index d8a1e6c..5d0edb0 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/entities/UserSessionEntity.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Map;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -39,7 +40,7 @@ public class UserSessionEntity extends SessionEntity {
 
     public static final Logger logger = Logger.getLogger(UserSessionEntity.class);
 
-    // Tracks the "lastSessionRefresh" from userSession entity from remote cache
+    // Metadata attribute, which contains the lastSessionRefresh available on remoteCache. Used in decide whether we need to write to remoteCache (DC) or not
     public static final String LAST_SESSION_REFRESH_REMOTE = "lsrr";
 
     private String user;
@@ -163,7 +164,8 @@ public class UserSessionEntity extends SessionEntity {
 
     @Override
     public String toString() {
-        return String.format("UserSessionEntity [ id=%s, realm=%s, lastSessionRefresh=%d]", getId(), getRealm(), getLastSessionRefresh());
+        return String.format("UserSessionEntity [id=%s, realm=%s, lastSessionRefresh=%d, clients=%s]", getId(), getRealm(), getLastSessionRefresh(),
+          new TreeSet(this.authenticatedClientSessions.keySet()));
     }
 
     @Override
@@ -194,8 +196,12 @@ public class UserSessionEntity extends SessionEntity {
 
     public static class ExternalizerImpl implements Externalizer<UserSessionEntity> {
 
+        private static final int VERSION_1 = 1;
+
         @Override
         public void writeObject(ObjectOutput output, UserSessionEntity session) throws IOException {
+            output.writeByte(VERSION_1);
+
             MarshallUtil.marshallString(session.getAuthMethod(), output);
             MarshallUtil.marshallString(session.getBrokerSessionId(), output);
             MarshallUtil.marshallString(session.getBrokerUserId(), output);
@@ -223,6 +229,15 @@ public class UserSessionEntity extends SessionEntity {
 
         @Override
         public UserSessionEntity readObject(ObjectInput input) throws IOException, ClassNotFoundException {
+            switch (input.readByte()) {
+                case VERSION_1:
+                    return readObjectVersion1(input);
+                default:
+                    throw new IOException("Unknown version");
+            }
+        }
+
+        public UserSessionEntity readObjectVersion1(ObjectInput input) throws IOException, ClassNotFoundException {
             UserSessionEntity sessionEntity = new UserSessionEntity();
 
             sessionEntity.setAuthMethod(MarshallUtil.unmarshallString(input));
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
index ced77fb..7f10f9e 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
@@ -292,6 +292,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
 
         // We have userSession, which passes predicate. No need for remote lookup.
         if (predicate.test(userSession)) {
+            log.debugf("getUserSessionWithPredicate(%s): found in local cache", id);
             return userSession;
         }
 
@@ -302,6 +303,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
         if (remoteCache != null) {
             UserSessionEntity remoteSessionEntity = (UserSessionEntity) remoteCache.get(id);
             if (remoteSessionEntity != null) {
+                log.debugf("getUserSessionWithPredicate(%s): remote cache contains session entity %s", id, remoteSessionEntity);
 
                 UserSessionModel remoteSessionAdapter = wrap(realm, remoteSessionEntity, offline);
                 if (predicate.test(remoteSessionAdapter)) {
@@ -319,11 +321,26 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
 
                     // Recursion. We should have it locally now
                     return getUserSessionWithPredicate(realm, id, offline, predicate);
+                } else {
+                    log.debugf("getUserSessionWithPredicate(%s): found, but predicate doesn't pass", id);
+
+                    return null;
                 }
+            } else {
+                log.debugf("getUserSessionWithPredicate(%s): not found", id);
+
+                // Session not available on remoteCache. Was already removed there. So removing locally too.
+                // TODO: Can be optimized to skip calling remoteCache.remove
+                removeUserSession(realm, userSession);
+
+                return null;
             }
-        }
+        } else {
+
+            log.debugf("getUserSessionWithPredicate(%s): remote cache not available", id);
 
-        return null;
+            return null;
+        }
     }
 
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java
index 3d31122..60c34bf 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java
@@ -87,8 +87,8 @@ public class KcRemoteStore extends RemoteStore {
     public boolean delete(Object key) throws PersistenceException {
         logger.debugf("Calling delete for key '%s' on cache '%s'", key, cacheName);
 
-        // Optimization - we don't need to know the previous value. Also it's ok to trigger asynchronously
-        getRemoteCache().removeAsync(key);
+        // Optimization - we don't need to know the previous value.
+        getRemoteCache().remove(key);
 
         return true;
     }
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
index 424a2f7..89fd215 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
@@ -17,6 +17,7 @@
 
 package org.keycloak.models.sessions.infinispan.remotestore;
 
+import org.keycloak.common.util.Time;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -32,6 +33,7 @@ import org.keycloak.models.RealmModel;
 import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
 import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
 import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
+import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
 
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -65,7 +67,7 @@ public class RemoteCacheInvoker {
         SessionUpdateTask.CrossDCMessageStatus status = task.getCrossDCMessageStatus(sessionWrapper);
 
         if (status == SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED) {
-            logger.debugf("Skip writing to remoteCache for entity '%s' of cache '%s' and operation '%s'", key, cacheName, operation.toString());
+            logger.debugf("Skip writing to remoteCache for entity '%s' of cache '%s' and operation '%s'", key, cacheName, operation);
             return;
         }
 
@@ -76,11 +78,12 @@ public class RemoteCacheInvoker {
 
         logger.debugf("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key);
 
-        runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, session);
+        runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, sessionWrapper);
     }
 
 
-    private <S extends SessionEntity> void runOnRemoteCache(RemoteCache remoteCache, long maxIdleMs, String key, SessionUpdateTask<S> task, S session) {
+    private <S extends SessionEntity> void runOnRemoteCache(RemoteCache remoteCache, long maxIdleMs, String key, SessionUpdateTask<S> task, SessionEntityWrapper<S> sessionWrapper) {
+        S session = sessionWrapper.getEntity();
         SessionUpdateTask.CacheOperation operation = task.getOperation(session);
 
         switch (operation) {
@@ -92,12 +95,14 @@ public class RemoteCacheInvoker {
                 remoteCache.put(key, session, task.getLifespanMs(), TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
                 break;
             case ADD_IF_ABSENT:
+                final int currentTime = Time.currentTime();
                 SessionEntity existing = (SessionEntity) remoteCache
                         .withFlags(Flag.FORCE_RETURN_VALUE)
                         .putIfAbsent(key, session, -1, TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
                 if (existing != null) {
                     throw new IllegalStateException("There is already existing value in cache for key " + key);
                 }
+                sessionWrapper.putLocalMetadataNoteInt(UserSessionEntity.LAST_SESSION_REFRESH_REMOTE, currentTime);
                 break;
             case REPLACE:
                 replace(remoteCache, task.getLifespanMs(), maxIdleMs, key, task);
@@ -122,17 +127,15 @@ public class RemoteCacheInvoker {
             // Run task on the remote session
             task.runUpdate(session);
 
-            if (logger.isDebugEnabled()) {
-                logger.debugf("Before replaceWithVersion. Written entity: %s", session.toString());
-            }
+            logger.debugf("Before replaceWithVersion. Entity to write version %d: %s", versioned.getVersion(), session);
 
             replaced = remoteCache.replaceWithVersion(key, session, versioned.getVersion(), lifespanMs, TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
 
             if (!replaced) {
-                logger.debugf("Failed to replace entity '%s' . Will retry again", key);
+                logger.debugf("Failed to replace entity '%s' version %d. Will retry again", key, versioned.getVersion());
             } else {
                 if (logger.isDebugEnabled()) {
-                    logger.debugf("Replaced entity in remote cache: %s", session.toString());
+                    logger.debugf("Replaced entity version %d in remote cache: %s", versioned.getVersion(), session);
                 }
             }
         }
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java
index fa7719e..d29e220 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionListener.java
@@ -36,6 +36,9 @@ import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
 import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
 import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
 import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
+import java.util.Random;
+import java.util.logging.Level;
+import org.infinispan.client.hotrod.VersionedValue;
 
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -85,25 +88,48 @@ public class RemoteCacheSessionListener  {
 
         if (shouldUpdateLocalCache(event.getType(), key, event.isCommandRetried())) {
 
-            replaceRemoteEntityInCache(key);
+            replaceRemoteEntityInCache(key, event.getVersion());
         }
     }
 
+    private static final int MAXIMUM_REPLACE_RETRIES = 10;
 
-    private void replaceRemoteEntityInCache(String key) {
+    private void replaceRemoteEntityInCache(String key, long eventVersion) {
         // TODO can be optimized and remoteSession sent in the event itself?
-        SessionEntityWrapper localEntityWrapper = cache.get(key);
-        SessionEntity remoteSession = (SessionEntity) remoteCache.get(key);
+        boolean replaced = false;
+        int replaceRetries = 0;
+        int sleepInterval = 25;
+        do {
+            replaceRetries++;
+            
+            SessionEntityWrapper localEntityWrapper = cache.get(key);
+            VersionedValue remoteSessionVersioned = remoteCache.getVersioned(key);
+            if (remoteSessionVersioned == null || remoteSessionVersioned.getVersion() < eventVersion) {
+                try {
+                    logger.debugf("Got replace remote entity event prematurely, will try again. Event version: %d, got: %d",
+                      eventVersion, remoteSessionVersioned == null ? -1 : remoteSessionVersioned.getVersion());
+                    Thread.sleep(new Random().nextInt(sleepInterval));  // using exponential backoff
+                    continue;
+                } catch (InterruptedException ex) {
+                    continue;
+                } finally {
+                    sleepInterval = sleepInterval << 1;
+                }
+            }
+            SessionEntity remoteSession = (SessionEntity) remoteCache.get(key);
+
+            logger.debugf("Read session%s. Entity read from remote cache: %s", replaceRetries > 1 ? "" : " again", remoteSession);
+
+            SessionEntityWrapper sessionWrapper = remoteSession.mergeRemoteEntityWithLocalEntity(localEntityWrapper);
 
-        if (logger.isDebugEnabled()) {
-            logger.debugf("Read session. Entity read from remote cache: %s", remoteSession.toString());
-        }
-
-        SessionEntityWrapper sessionWrapper = remoteSession.mergeRemoteEntityWithLocalEntity(localEntityWrapper);
+            // We received event from remoteCache, so we won't update it back
+            replaced = cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD, Flag.IGNORE_RETURN_VALUES)
+                    .replace(key, localEntityWrapper, sessionWrapper);
 
-        // We received event from remoteCache, so we won't update it back
-        cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD, Flag.IGNORE_RETURN_VALUES)
-                .replace(key, sessionWrapper);
+            if (! replaced) {
+                logger.debugf("Did not succeed in merging sessions, will try again: %s", remoteSession);
+            }
+        } while (replaceRetries < MAXIMUM_REPLACE_RETRIES && ! replaced);
     }
 
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java
index b8df605..3f09773 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java
@@ -163,6 +163,11 @@ public class UserSessionAdapter implements UserSessionModel {
                 return new LastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
                         .getCrossDCMessageStatus(UserSessionAdapter.this.session, UserSessionAdapter.this.realm, sessionWrapper, offline, lastSessionRefresh);
             }
+
+            @Override
+            public String toString() {
+                return "setLastSessionRefresh(" + lastSessionRefresh + ')';
+            }
         };
 
         update(task);
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
index f18d8d3..b524885 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java
@@ -28,17 +28,11 @@ import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
 import org.infinispan.client.hotrod.annotation.ClientListener;
 import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
 import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
-import org.infinispan.configuration.cache.Configuration;
-import org.infinispan.configuration.cache.ConfigurationBuilder;
-import org.infinispan.configuration.global.GlobalConfigurationBuilder;
-import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.persistence.manager.PersistenceManager;
 import org.infinispan.persistence.remote.RemoteStore;
-import org.infinispan.persistence.remote.configuration.ExhaustedAction;
 import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
 import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
 
@@ -128,7 +122,7 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
     }
 
     private static Worker createWorker(int threadId) {
-        EmbeddedCacheManager manager = createManager(threadId);
+        EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
         Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
 
         System.out.println("Retrieved cache: " + threadId);
@@ -140,56 +134,6 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
         return new Worker(cache, threadId);
     }
 
-    private static EmbeddedCacheManager createManager(int threadId) {
-        System.setProperty("java.net.preferIPv4Stack", "true");
-        System.setProperty("jgroups.tcp.port", "53715");
-        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
-
-        boolean clustered = false;
-        boolean async = false;
-        boolean allowDuplicateJMXDomains = true;
-
-        if (clustered) {
-            gcb = gcb.clusteredDefault();
-            gcb.transport().clusterName("test-clustering");
-        }
-
-        gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
-
-        EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
-
-        Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
-
-        cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
-        return cacheManager;
-
-    }
-
-    private static Configuration getCacheBackedByRemoteStore(int threadId) {
-        ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
-
-        int port = threadId==1 ? 12232 : 13232;
-        //int port = 12232;
-
-        return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
-                .fetchPersistentState(false)
-                .ignoreModifications(false)
-                .purgeOnStartup(false)
-                .preload(false)
-                .shared(true)
-                .remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
-                .rawValues(true)
-                .forceReturnValues(false)
-                .addServer()
-                .host("localhost")
-                .port(port)
-                .connectionPool()
-                .maxActive(20)
-                .exhaustedAction(ExhaustedAction.CREATE_NEW)
-                .async()
-                .   enabled(false).build();
-    }
-
 
     @ClientListener
     public static class HotRodListener {
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
index df1b80e..9c23452 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java
@@ -83,7 +83,7 @@ public class ConcurrencyJDGRemoteCacheTest {
     }
 
     private static Worker createWorker(int threadId) {
-        EmbeddedCacheManager manager = createManager(threadId);
+        EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
         Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
 
         System.out.println("Retrieved cache: " + threadId);
@@ -95,56 +95,6 @@ public class ConcurrencyJDGRemoteCacheTest {
         return new Worker(cache, threadId);
     }
 
-    private static EmbeddedCacheManager createManager(int threadId) {
-        System.setProperty("java.net.preferIPv4Stack", "true");
-        System.setProperty("jgroups.tcp.port", "53715");
-        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
-
-        boolean clustered = false;
-        boolean async = false;
-        boolean allowDuplicateJMXDomains = true;
-
-        if (clustered) {
-            gcb = gcb.clusteredDefault();
-            gcb.transport().clusterName("test-clustering");
-        }
-
-        gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
-
-        EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
-
-        Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
-
-        cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
-        return cacheManager;
-
-    }
-
-    private static Configuration getCacheBackedByRemoteStore(int threadId) {
-        ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
-
-        int port = threadId==1 ? 12232 : 13232;
-        //int port = 12232;
-
-        return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
-                .fetchPersistentState(false)
-                .ignoreModifications(false)
-                .purgeOnStartup(false)
-                .preload(false)
-                .shared(true)
-                .remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
-                .rawValues(true)
-                .forceReturnValues(false)
-                .addServer()
-                    .host("localhost")
-                    .port(port)
-                .connectionPool()
-                    .maxActive(20)
-                    .exhaustedAction(ExhaustedAction.CREATE_NEW)
-                .async()
-                .   enabled(false).build();
-    }
-
 
     @ClientListener
     public static class HotRodListener {
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java
new file mode 100644
index 0000000..ff4c3ce
--- /dev/null
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java
@@ -0,0 +1,292 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.infinispan.Cache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
+import org.infinispan.client.hotrod.annotation.ClientListener;
+import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
+import org.infinispan.context.Flag;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.keycloak.common.util.Time;
+import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
+import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
+import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
+import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
+
+/**
+ * Check that removing of session from remoteCache is session immediately removed on remoteCache in other DC. This is true.
+ *
+ * Also check that listeners are executed asynchronously with some delay.
+ *
+ * Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ConcurrencyJDGRemoveSessionTest {
+
+    protected static final Logger logger = Logger.getLogger(ConcurrencyJDGRemoveSessionTest.class);
+
+    private static final int ITERATIONS = 10000;
+
+    private static RemoteCache remoteCache1;
+    private static RemoteCache remoteCache2;
+
+    private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
+    private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
+
+    private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0);
+    private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0);
+
+    //private static Map<String, EntryInfo> state = new HashMap<>();
+
+    public static void main(String[] args) throws Exception {
+        Cache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createManager(1).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
+        Cache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createManager(2).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
+
+        // Create caches, listeners and finally worker threads
+        Thread worker1 = createWorker(cache1, 1);
+        Thread worker2 = createWorker(cache2, 2);
+
+        // Create 100 initial sessions
+        for (int i=0 ; i<ITERATIONS ; i++) {
+            String sessionId = String.valueOf(i);
+            SessionEntityWrapper<UserSessionEntity> wrappedSession = createSessionEntity(sessionId);
+            cache1.put(sessionId, wrappedSession);
+        }
+
+        logger.info("SESSIONS CREATED");
+
+        // Create 100 initial sessions
+        for (int i=0 ; i<ITERATIONS ; i++) {
+            String sessionId = String.valueOf(i);
+            SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
+            Assert.assertNotNull("Loaded wrapper for key " + sessionId, loadedWrapper);
+        }
+
+        logger.info("SESSIONS AVAILABLE ON DC2");
+
+
+        long start = System.currentTimeMillis();
+
+        try {
+            // Just running in current thread
+            worker1.run();
+
+            logger.info("SESSIONS REMOVED");
+
+            //Thread.sleep(5000);
+
+            // Doing it in opposite direction to ensure that newer are checked first.
+            // This us currently FAILING (expected) as listeners are executed asynchronously.
+            for (int i=ITERATIONS-1 ; i>=0 ; i--) {
+                String sessionId = String.valueOf(i);
+
+                logger.infof("Before call cache2.get: %s", sessionId);
+
+                SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
+                Assert.assertNull("Loaded wrapper not null for key " + sessionId, loadedWrapper);
+            }
+
+            logger.info("SESSIONS NOT AVAILABLE ON DC2");
+
+
+            //        // Start and join workers
+//        worker1.start();
+//        worker2.start();
+//
+//        worker1.join();
+//        worker2.join();
+
+        } finally {
+            Thread.sleep(2000);
+
+            // Finish JVM
+            cache1.getCacheManager().stop();
+            cache2.getCacheManager().stop();
+        }
+
+        long took = System.currentTimeMillis() - start;
+
+//        // Output
+//        for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
+//            System.out.println(entry.getKey() + ":::" + entry.getValue());
+//            worker1.cache.remove(entry.getKey());
+//        }
+
+//        System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
+//                ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
+//                ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() );
+//
+//        System.out.println("Sleeping before other report");
+//
+//        Thread.sleep(1000);
+//
+//        System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
+//                ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
+//                ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
+
+
+    }
+
+
+    private static SessionEntityWrapper<UserSessionEntity> createSessionEntity(String sessionId) {
+        // Create 100 initial sessions
+        UserSessionEntity session = new UserSessionEntity();
+        session.setId(sessionId);
+        session.setRealm("foo");
+        session.setBrokerSessionId("!23123123");
+        session.setBrokerUserId(null);
+        session.setUser("foo");
+        session.setLoginUsername("foo");
+        session.setIpAddress("123.44.143.178");
+        session.setStarted(Time.currentTime());
+        session.setLastSessionRefresh(Time.currentTime());
+
+        AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity();
+        clientSession.setAuthMethod("saml");
+        clientSession.setAction("something");
+        clientSession.setTimestamp(1234);
+        clientSession.setProtocolMappers(new HashSet<>(Arrays.asList("mapper1", "mapper2")));
+        clientSession.setRoles(new HashSet<>(Arrays.asList("role1", "role2")));
+        session.getAuthenticatedClientSessions().put("client1", clientSession);
+
+        SessionEntityWrapper<UserSessionEntity> wrappedSession = new SessionEntityWrapper<>(session);
+        return wrappedSession;
+    }
+
+
+    private static Thread createWorker(Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, int threadId) {
+        System.out.println("Retrieved cache: " + threadId);
+
+        RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
+
+        if (threadId == 1) {
+            remoteCache1 = remoteCache;
+        } else {
+            remoteCache2 = remoteCache;
+        }
+
+        AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2;
+        HotRodListener listener = new HotRodListener(cache, remoteCache, counter);
+        remoteCache.addClientListener(listener);
+
+        return new RemoteCacheWorker(remoteCache, threadId);
+        //return new CacheWorker(cache, threadId);
+    }
+
+
+    private static EmbeddedCacheManager createManager(int threadId) {
+        return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
+    }
+
+
+    @ClientListener
+    public static class HotRodListener {
+
+        private Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache;
+        private RemoteCache remoteCache;
+        private AtomicInteger listenerCount;
+
+        public HotRodListener(Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache, RemoteCache remoteCache, AtomicInteger listenerCount) {
+            this.listenerCount = listenerCount;
+            this.remoteCache = remoteCache;
+            this.origCache = origCache;
+        }
+
+
+        @ClientCacheEntryCreated
+        public void created(ClientCacheEntryCreatedEvent event) {
+            String cacheKey = (String) event.getKey();
+
+            logger.infof("Listener executed for creating of session %s", cacheKey);
+        }
+
+
+        @ClientCacheEntryModified
+        public void modified(ClientCacheEntryModifiedEvent event) {
+            String cacheKey = (String) event.getKey();
+
+            logger.infof("Listener executed for modifying of session %s", cacheKey);
+        }
+
+
+        @ClientCacheEntryRemoved
+        public void removed(ClientCacheEntryRemovedEvent event) {
+            String cacheKey = (String) event.getKey();
+
+            logger.infof("Listener executed for removing of session %s", cacheKey);
+
+            // TODO: for distributed caches, ensure that it is executed just on owner OR if event.isCommandRetried
+            origCache
+                    .getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE)
+                    .remove(cacheKey);
+
+        }
+
+    }
+
+    private static class RemoteCacheWorker extends Thread {
+
+        private final RemoteCache<String, Object> remoteCache;
+
+        private final int myThreadId;
+
+        private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) {
+            this.remoteCache = remoteCache;
+            this.myThreadId = myThreadId;
+        }
+
+        @Override
+        public void run() {
+
+            for (int i=0 ; i<ITERATIONS ; i++) {
+                String sessionId = String.valueOf(i);
+                remoteCache.remove(sessionId);
+
+
+                logger.infof("Session %s removed on DC1", sessionId);
+
+                // Check if it's immediately seen that session is removed on 2nd DC
+                RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1;
+                SessionEntityWrapper thatSession = (SessionEntityWrapper) secondDCRemoteCache.get(sessionId);
+                Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, thatSession);
+
+                // Also check that it's immediatelly removed on my DC
+                SessionEntityWrapper mySession = (SessionEntityWrapper) remoteCache.get(sessionId);
+                Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, mySession);
+            }
+
+        }
+
+    }
+
+}
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
index 7101d38..dd34b19 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGSessionsCacheTest.java
@@ -59,7 +59,7 @@ import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
  */
 public class ConcurrencyJDGSessionsCacheTest {
 
-    protected static final Logger logger = Logger.getLogger(KcRemoteStore.class);
+    protected static final Logger logger = Logger.getLogger(ConcurrencyJDGSessionsCacheTest.class);
 
     private static final int ITERATION_PER_WORKER = 1000;
 
@@ -210,56 +210,11 @@ public class ConcurrencyJDGSessionsCacheTest {
         //return new CacheWorker(cache, threadId);
     }
 
-    private static EmbeddedCacheManager createManager(int threadId) {
-        System.setProperty("java.net.preferIPv4Stack", "true");
-        System.setProperty("jgroups.tcp.port", "53715");
-        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
-
-        boolean clustered = false;
-        boolean async = false;
-        boolean allowDuplicateJMXDomains = true;
-
-        if (clustered) {
-            gcb = gcb.clusteredDefault();
-            gcb.transport().clusterName("test-clustering");
-        }
-
-        gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
-
-        EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
-
-        Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
-
-        cacheManager.defineConfiguration(InfinispanConnectionProvider.SESSION_CACHE_NAME, invalidationCacheConfiguration);
-        return cacheManager;
 
+    private static EmbeddedCacheManager createManager(int threadId) {
+        return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, KcRemoteStoreConfigurationBuilder.class);
     }
 
-    private static Configuration getCacheBackedByRemoteStore(int threadId) {
-        ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
-
-        int port = threadId==1 ? 12232 : 13232;
-        //int port = 12232;
-
-        return cacheConfigBuilder.persistence().addStore(KcRemoteStoreConfigurationBuilder.class)
-                .fetchPersistentState(false)
-                .ignoreModifications(false)
-                .purgeOnStartup(false)
-                .preload(false)
-                .shared(true)
-                .remoteCacheName(InfinispanConnectionProvider.SESSION_CACHE_NAME)
-                .rawValues(true)
-                .forceReturnValues(false)
-                .marshaller(KeycloakHotRodMarshallerFactory.class.getName())
-                .addServer()
-                    .host("localhost")
-                    .port(port)
-                .connectionPool()
-                    .maxActive(20)
-                    .exhaustedAction(ExhaustedAction.CREATE_NEW)
-                .async()
-                    .enabled(false).build();
-    }
 
     @ClientListener
     public static class HotRodListener {
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/TestCacheManagerFactory.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/TestCacheManagerFactory.java
new file mode 100644
index 0000000..06dd95f
--- /dev/null
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/TestCacheManagerFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.persistence.remote.configuration.ExhaustedAction;
+import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+class TestCacheManagerFactory {
+
+
+    <T extends RemoteStoreConfigurationBuilder> EmbeddedCacheManager createManager(int threadId, String cacheName, Class<T> builderClass) {
+        System.setProperty("java.net.preferIPv4Stack", "true");
+        System.setProperty("jgroups.tcp.port", "53715");
+        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
+
+        boolean clustered = false;
+        boolean async = false;
+        boolean allowDuplicateJMXDomains = true;
+
+        if (clustered) {
+            gcb = gcb.clusteredDefault();
+            gcb.transport().clusterName("test-clustering");
+        }
+
+        gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
+
+        EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
+
+        Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId, cacheName, builderClass);
+
+        cacheManager.defineConfiguration(cacheName, invalidationCacheConfiguration);
+        return cacheManager;
+
+    }
+
+
+    private <T extends RemoteStoreConfigurationBuilder> Configuration getCacheBackedByRemoteStore(int threadId, String cacheName, Class<T> builderClass) {
+        ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
+
+        int port = threadId==1 ? 12232 : 13232;
+        //int port = 12232;
+
+        return cacheConfigBuilder.persistence().addStore(builderClass)
+                .fetchPersistentState(false)
+                .ignoreModifications(false)
+                .purgeOnStartup(false)
+                .preload(false)
+                .shared(true)
+                .remoteCacheName(cacheName)
+                .rawValues(true)
+                .forceReturnValues(false)
+                .marshaller(KeycloakHotRodMarshallerFactory.class.getName())
+                .addServer()
+                    .host("localhost")
+                .port(port)
+                .connectionPool()
+                    .maxActive(20)
+                    .exhaustedAction(ExhaustedAction.CREATE_NEW)
+                .async()
+                .   enabled(false).build();
+    }
+}
diff --git a/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java b/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java
index 11d44af..c06caa0 100755
--- a/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java
+++ b/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java
@@ -34,6 +34,7 @@ import org.keycloak.services.ErrorPageException;
 import org.keycloak.services.managers.AuthenticationManager;
 import org.keycloak.services.managers.AuthenticationSessionManager;
 import org.keycloak.services.managers.ClientSessionCode;
+import org.keycloak.services.managers.UserSessionCrossDCManager;
 import org.keycloak.services.messages.Messages;
 import org.keycloak.services.resources.LoginActionsService;
 import org.keycloak.services.util.CacheControlUtil;
@@ -208,7 +209,7 @@ public abstract class AuthorizationEndpointBase {
             }
         }
 
-        UserSessionModel userSession = authSessionId==null ? null : session.sessions().getUserSession(realm, authSessionId);
+        UserSessionModel userSession = authSessionId==null ? null : new UserSessionCrossDCManager(session).getUserSessionIfExistsRemotely(realm, authSessionId);
 
         if (userSession != null) {
             logger.debugf("Sent request to authz endpoint. We don't have authentication session with ID '%s' but we have userSession. Will re-create authentication session with same ID", authSessionId);
diff --git a/services/src/main/java/org/keycloak/protocol/oidc/endpoints/TokenEndpoint.java b/services/src/main/java/org/keycloak/protocol/oidc/endpoints/TokenEndpoint.java
index 0d20597..fef17c6 100644
--- a/services/src/main/java/org/keycloak/protocol/oidc/endpoints/TokenEndpoint.java
+++ b/services/src/main/java/org/keycloak/protocol/oidc/endpoints/TokenEndpoint.java
@@ -386,6 +386,7 @@ public class TokenEndpoint {
             }
 
         } catch (OAuthErrorException e) {
+            logger.trace(e.getMessage(), e);
             event.error(Errors.INVALID_TOKEN);
             throw new ErrorResponseException(e.getError(), e.getDescription(), Response.Status.BAD_REQUEST);
         }
diff --git a/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java b/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java
index 1d1a3be..11795e5 100644
--- a/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java
+++ b/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java
@@ -62,4 +62,17 @@ public class UserSessionCrossDCManager {
 
         });
     }
+
+
+    // Just check if userSession also exists on remoteCache. It can happen that logout happened on 2nd DC and userSession is already removed on remoteCache and this DC wasn't yet notified
+    public UserSessionModel getUserSessionIfExistsRemotely(RealmModel realm, String id) {
+        UserSessionModel userSession = kcSession.sessions().getUserSession(realm, id);
+
+        // This will remove userSession "locally" if it doesn't exists on remoteCache
+        kcSession.sessions().getUserSessionWithPredicate(realm, id, false, (UserSessionModel userSession2) -> {
+            return userSession2 == null;
+        });
+
+        return kcSession.sessions().getUserSession(realm, id);
+    }
 }
diff --git a/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/arquillian/annotation/JmxInfinispanCacheStatistics.java b/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/arquillian/annotation/JmxInfinispanCacheStatistics.java
index 2dd7bbc..ef15acf 100644
--- a/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/arquillian/annotation/JmxInfinispanCacheStatistics.java
+++ b/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/arquillian/annotation/JmxInfinispanCacheStatistics.java
@@ -19,6 +19,7 @@ package org.keycloak.testsuite.arquillian.annotation;
 import org.keycloak.testsuite.arquillian.AuthServerTestEnricher;
 import org.keycloak.testsuite.arquillian.InfinispanStatistics;
 import org.keycloak.testsuite.arquillian.InfinispanStatistics.Constants;
+import org.keycloak.testsuite.crossdc.DC;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
@@ -48,7 +49,7 @@ public @interface JmxInfinispanCacheStatistics {
     // Host address - either given by arrangement of DC ...
 
     /** Index of the data center, starting from 0 */
-    int dcIndex() default -1;
+    DC dc() default DC.UNDEFINED;
     /** Index of the node within data center, starting from 0. Nodes are ordered by arquillian qualifier as per {@link AuthServerTestEnricher} */
     int dcNodeIndex() default -1;
 
diff --git a/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/arquillian/annotation/JmxInfinispanChannelStatistics.java b/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/arquillian/annotation/JmxInfinispanChannelStatistics.java
index 41e9f20..cddb815 100644
--- a/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/arquillian/annotation/JmxInfinispanChannelStatistics.java
+++ b/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/arquillian/annotation/JmxInfinispanChannelStatistics.java
@@ -17,6 +17,7 @@
 package org.keycloak.testsuite.arquillian.annotation;
 
 import org.keycloak.testsuite.arquillian.InfinispanStatistics.Constants;
+import org.keycloak.testsuite.crossdc.DC;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
@@ -40,7 +41,7 @@ public @interface JmxInfinispanChannelStatistics {
     // Host address - either given by arrangement of DC ...
 
     /** Index of the data center, starting from 0 */
-    int dcIndex() default -1;
+    DC dc() default DC.UNDEFINED;
     /** Index of the node within data center, starting from 0. Nodes are ordered by arquillian qualifier as per {@link AuthServerTestEnricher} */
     int dcNodeIndex() default -1;
 
diff --git a/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/arquillian/CacheStatisticsControllerEnricher.java b/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/arquillian/CacheStatisticsControllerEnricher.java
index 4091ca4..a0c08cc 100644
--- a/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/arquillian/CacheStatisticsControllerEnricher.java
+++ b/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/arquillian/CacheStatisticsControllerEnricher.java
@@ -33,6 +33,7 @@ import java.util.Set;
 import org.keycloak.testsuite.arquillian.annotation.JmxInfinispanChannelStatistics;
 import org.keycloak.testsuite.arquillian.jmx.JmxConnectorRegistry;
 import org.keycloak.testsuite.arquillian.undertow.KeycloakOnUndertow;
+import org.keycloak.testsuite.crossdc.DC;
 import java.io.NotSerializableException;
 import java.lang.management.ManagementFactory;
 import java.util.Objects;
@@ -84,7 +85,7 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
 
         ObjectName mbeanName = new ObjectName(String.format(
           "%s:type=%s,name=\"%s(%s)\",manager=\"%s\",component=%s",
-          annotation.domain().isEmpty() ? getDefaultDomain(annotation.dcIndex(), annotation.dcNodeIndex()) : InfinispanConnectionProvider.JMX_DOMAIN,
+          annotation.domain().isEmpty() ? getDefaultDomain(annotation.dc().getDcIndex(), annotation.dcNodeIndex()) : InfinispanConnectionProvider.JMX_DOMAIN,
           annotation.type(),
           annotation.cacheName(),
           annotation.cacheMode(),
@@ -98,8 +99,8 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
             try {
                 Retry.execute(() -> value.reset(), 2, 150);
             } catch (RuntimeException ex) {
-                if (annotation.dcIndex() != -1 && annotation.dcNodeIndex() != -1
-                   && suiteContext.get().getAuthServerBackendsInfo(annotation.dcIndex()).get(annotation.dcNodeIndex()).isStarted()) {
+                if (annotation.dc() != DC.UNDEFINED && annotation.dcNodeIndex() != -1
+                   && suiteContext.get().getAuthServerBackendsInfo(annotation.dc().getDcIndex()).get(annotation.dcNodeIndex()).isStarted()) {
                     LOG.warn("Could not reset statistics for " + mbeanName);
                 }
             }
@@ -113,7 +114,7 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
 
         ObjectName mbeanName = new ObjectName(String.format(
           "%s:type=%s,cluster=\"%s\"",
-          annotation.domain().isEmpty() ? getDefaultDomain(annotation.dcIndex(), annotation.dcNodeIndex()) : InfinispanConnectionProvider.JMX_DOMAIN,
+          annotation.domain().isEmpty() ? getDefaultDomain(annotation.dc().getDcIndex(), annotation.dcNodeIndex()) : InfinispanConnectionProvider.JMX_DOMAIN,
           annotation.type(),
           annotation.cluster()
         ));
@@ -124,8 +125,8 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
             try {
                 Retry.execute(() -> value.reset(), 2, 150);
             } catch (RuntimeException ex) {
-                if (annotation.dcIndex() != -1 && annotation.dcNodeIndex() != -1
-                   && suiteContext.get().getAuthServerBackendsInfo(annotation.dcIndex()).get(annotation.dcNodeIndex()).isStarted()) {
+                if (annotation.dc() != DC.UNDEFINED && annotation.dcNodeIndex() != -1
+                   && suiteContext.get().getAuthServerBackendsInfo(annotation.dc().getDcIndex()).get(annotation.dcNodeIndex()).isStarted()) {
                     LOG.warn("Could not reset statistics for " + mbeanName);
                 }
             }
@@ -170,8 +171,8 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
         final String host;
         final int port;
 
-        if (annotation.dcIndex() != -1 && annotation.dcNodeIndex() != -1) {
-            ContainerInfo node = suiteContext.get().getAuthServerBackendsInfo(annotation.dcIndex()).get(annotation.dcNodeIndex());
+        if (annotation.dc() != DC.UNDEFINED && annotation.dcNodeIndex() != -1) {
+            ContainerInfo node = suiteContext.get().getAuthServerBackendsInfo(annotation.dc().getDcIndex()).get(annotation.dcNodeIndex());
             Container container = node.getArquillianContainer();
             if (container.getDeployableContainer() instanceof KeycloakOnUndertow) {
                 return ManagementFactory.getPlatformMBeanServer();
@@ -204,8 +205,8 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
         final String host;
         final int port;
 
-        if (annotation.dcIndex() != -1 && annotation.dcNodeIndex() != -1) {
-            ContainerInfo node = suiteContext.get().getAuthServerBackendsInfo(annotation.dcIndex()).get(annotation.dcNodeIndex());
+        if (annotation.dc() != DC.UNDEFINED && annotation.dcNodeIndex() != -1) {
+            ContainerInfo node = suiteContext.get().getAuthServerBackendsInfo(annotation.dc().getDcIndex()).get(annotation.dcNodeIndex());
             Container container = node.getArquillianContainer();
             if (container.getDeployableContainer() instanceof KeycloakOnUndertow) {
                 return ManagementFactory.getPlatformMBeanServer();
diff --git a/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/crossdc/DC.java b/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/crossdc/DC.java
new file mode 100644
index 0000000..1ed8cad
--- /dev/null
+++ b/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/crossdc/DC.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.keycloak.testsuite.crossdc;
+
+/**
+ * Identifier of datacentre in the testsuite
+ * @author hmlnarik
+ */
+public enum DC {
+    FIRST,
+    SECOND,
+    UNDEFINED;
+
+    public int getDcIndex() {
+        return ordinal();
+    }
+}
diff --git a/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/Retry.java b/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/Retry.java
index 0f1ac27..5b15a3f 100644
--- a/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/Retry.java
+++ b/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/Retry.java
@@ -17,19 +17,61 @@
 
 package org.keycloak.testsuite;
 
+import java.util.function.Supplier;
+
 /**
  * @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
  */
 public class Retry {
 
-    public static void execute(Runnable runnable, int retryCount, long intervalMillis) {
+    /**
+     * Runs the given {@code runnable} at most {@code retryCount} times until it passes,
+     * leaving {@code intervalMillis} milliseconds between the invocations.
+     * The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
+     * @param runnable
+     * @param attemptsCount Total number of attempts to execute the {@code runnable}
+     * @param intervalMillis
+     * @return Index of the first successful invocation, starting from 0.
+     */
+    public static int execute(Runnable runnable, int attemptsCount, long intervalMillis) {
+        int executionIndex = 0;
         while (true) {
             try {
                 runnable.run();
-                return;
+                return executionIndex;
+            } catch (RuntimeException | AssertionError e) {
+                attemptsCount--;
+                executionIndex++;
+                if (attemptsCount > 0) {
+                    try {
+                        Thread.sleep(intervalMillis);
+                    } catch (InterruptedException ie) {
+                        ie.addSuppressed(e);
+                        throw new RuntimeException(ie);
+                    }
+                } else {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    /**
+     * Runs the given {@code runnable} at most {@code retryCount} times until it passes,
+     * leaving {@code intervalMillis} milliseconds between the invocations.
+     * The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
+     * @param supplier
+     * @param attemptsCount Total number of attempts to execute the {@code runnable}
+     * @param intervalMillis
+     * @return Value generated by the {@code supplier}.
+     */
+    public static <T> T call(Supplier<T> supplier, int attemptsCount, long intervalMillis) {
+        while (true) {
+            try {
+                return supplier.get();
             } catch (RuntimeException | AssertionError e) {
-                retryCount--;
-                if (retryCount > 0) {
+                attemptsCount--;
+                if (attemptsCount > 0) {
                     try {
                         Thread.sleep(intervalMillis);
                     } catch (InterruptedException ie) {
diff --git a/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/util/OAuthClient.java b/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/util/OAuthClient.java
index d42158c..e577758 100644
--- a/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/util/OAuthClient.java
+++ b/testsuite/integration-arquillian/tests/base/src/main/java/org/keycloak/testsuite/util/OAuthClient.java
@@ -54,6 +54,7 @@ import org.keycloak.testsuite.arquillian.SuiteContext;
 import org.keycloak.util.BasicAuthHelper;
 import org.keycloak.util.JsonSerialization;
 import org.keycloak.util.TokenUtil;
+import com.google.common.base.Charsets;
 import org.openqa.selenium.By;
 import org.openqa.selenium.WebDriver;
 
@@ -203,6 +204,7 @@ public class OAuthClient {
     }
 
     public void fillLoginForm(String username, String password) {
+        WaitUtils.waitForPageToLoad(driver);
         String src = driver.getPageSource();
         try {
             driver.findElement(By.id("username")).sendKeys(username);
@@ -250,8 +252,7 @@ public class OAuthClient {
     }
 
     public AccessTokenResponse doAccessTokenRequest(String code, String password) {
-        CloseableHttpClient client = newCloseableHttpClient();
-        try {
+        try (CloseableHttpClient client = newCloseableHttpClient()) {
             HttpPost post = new HttpPost(getAccessTokenUrl());
 
             List<NameValuePair> parameters = new LinkedList<NameValuePair>();
@@ -283,12 +284,7 @@ public class OAuthClient {
                 parameters.add(new BasicNameValuePair(OAuth2Constants.CODE_VERIFIER, codeVerifier));
             }
 
-            UrlEncodedFormEntity formEntity = null;
-            try {
-                formEntity = new UrlEncodedFormEntity(parameters, "UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                throw new RuntimeException(e);
-            }
+            UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(parameters, Charsets.UTF_8);
             post.setEntity(formEntity);
 
             try {
@@ -296,8 +292,8 @@ public class OAuthClient {
             } catch (Exception e) {
                 throw new RuntimeException("Failed to retrieve access token", e);
             }
-        } finally {
-            closeClient(client);
+        } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
         }
     }
 
@@ -310,8 +306,7 @@ public class OAuthClient {
     }
 
     public String introspectTokenWithClientCredential(String clientId, String clientSecret, String tokenType, String tokenToIntrospect) {
-        CloseableHttpClient client = new DefaultHttpClient();
-        try {
+        try (CloseableHttpClient client = new DefaultHttpClient()) {
             HttpPost post = new HttpPost(getTokenIntrospectionUrl());
 
             String authorization = BasicAuthHelper.createHeader(clientId, clientSecret);
@@ -332,19 +327,16 @@ public class OAuthClient {
 
             post.setEntity(formEntity);
 
-            try {
+            try (CloseableHttpResponse response = client.execute(post)) {
                 ByteArrayOutputStream out = new ByteArrayOutputStream();
 
-                CloseableHttpResponse response = client.execute(post);
                 response.getEntity().writeTo(out);
-                response.close();
-
                 return new String(out.toByteArray());
             } catch (Exception e) {
                 throw new RuntimeException("Failed to retrieve access token", e);
             }
-        } finally {
-            closeClient(client);
+        } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
         }
     }
 
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/AbstractKeycloakTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/AbstractKeycloakTest.java
index 262d0b2..d2f7de6 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/AbstractKeycloakTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/AbstractKeycloakTest.java
@@ -144,6 +144,8 @@ public abstract class AbstractKeycloakTest {
             updateMasterAdminPassword();
         }
 
+        beforeAbstractKeycloakTestRealmImport();
+
         if (testContext.getTestRealmReps() == null) {
             importTestRealms();
 
@@ -155,6 +157,9 @@ public abstract class AbstractKeycloakTest {
         oauth.init(adminClient, driver);
     }
 
+    protected void beforeAbstractKeycloakTestRealmImport() throws Exception {
+    }
+
     @After
     public void afterAbstractKeycloakTest() {
         if (resetTimeOffset) {
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/AbstractConcurrencyTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/AbstractConcurrencyTest.java
index 5559a5e..fe20270 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/AbstractConcurrencyTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/AbstractConcurrencyTest.java
@@ -76,7 +76,6 @@ public abstract class AbstractConcurrencyTest extends AbstractTestRealmKeycloakT
                     runnable.run(arrayIndex % numThreads, keycloaks.get(), keycloaks.get().realm(REALM_NAME));
                 } catch (Throwable ex) {
                     failures.add(ex);
-                    log.error(ex.getMessage(), ex);
                 }
                 return null;
             });
@@ -96,6 +95,7 @@ public abstract class AbstractConcurrencyTest extends AbstractTestRealmKeycloakT
         if (! failures.isEmpty()) {
             RuntimeException ex = new RuntimeException("There were failures in threads. Failures count: " + failures.size());
             failures.forEach(ex::addSuppressed);
+            failures.forEach(e -> log.error(e.getMessage(), e));
             throw ex;
         }
     }
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/ConcurrentLoginTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/ConcurrentLoginTest.java
index 11e3bc0..ff6f10f 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/ConcurrentLoginTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/admin/concurrency/ConcurrentLoginTest.java
@@ -22,7 +22,6 @@ import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,11 +45,21 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.keycloak.OAuth2Constants;
+import org.keycloak.admin.client.Keycloak;
+import org.keycloak.admin.client.resource.ClientsResource;
+import org.keycloak.admin.client.resource.RealmResource;
 import org.keycloak.representations.AccessToken;
 import org.keycloak.representations.idm.ClientRepresentation;
+import org.keycloak.testsuite.Retry;
+import org.keycloak.testsuite.admin.ApiUtil;
+import org.keycloak.testsuite.util.ClientBuilder;
 import org.keycloak.testsuite.util.OAuthClient;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.http.client.CookieStore;
+import org.apache.http.impl.client.BasicCookieStore;
 import org.hamcrest.Matchers;
 
 
@@ -60,97 +69,95 @@ import org.hamcrest.Matchers;
  */
 public class ConcurrentLoginTest extends AbstractConcurrencyTest {
     
-    private static final int DEFAULT_THREADS = 10;
-    private static final int CLIENTS_PER_THREAD = 10;
-    private static final int DEFAULT_CLIENTS_COUNT = CLIENTS_PER_THREAD * DEFAULT_THREADS;
-    
+    protected static final int DEFAULT_THREADS = 4;
+    protected static final int CLIENTS_PER_THREAD = 30;
+    protected static final int DEFAULT_CLIENTS_COUNT = CLIENTS_PER_THREAD * DEFAULT_THREADS;
+
     @Before
     public void beforeTest() {
         createClients();
     }
 
     protected void createClients() {
+        final ClientsResource clients = adminClient.realm(REALM_NAME).clients();
         for (int i = 0; i < DEFAULT_CLIENTS_COUNT; i++) {
-            ClientRepresentation client = new ClientRepresentation();
-            client.setClientId("client" + i);
-            client.setDirectAccessGrantsEnabled(true);
-            client.setRedirectUris(Arrays.asList("http://localhost:8180/auth/realms/master/app/*"));
-            client.setWebOrigins(Arrays.asList("http://localhost:8180"));
-            client.setSecret("password");
-
-            log.debug("creating " + client.getClientId());
-            Response create = adminClient.realm("test").clients().create(client);
-            Assert.assertEquals(Response.Status.CREATED, create.getStatusInfo());
+            ClientRepresentation client = ClientBuilder.create()
+              .clientId("client" + i)
+              .directAccessGrants()
+              .redirectUris("http://localhost:8180/auth/realms/master/app/*")
+              .addWebOrigin("http://localhost:8180")
+              .secret("password")
+              .build();
+
+            Response create = clients.create(client);
+            String clientId = ApiUtil.getCreatedId(create);
             create.close();
+            getCleanup(REALM_NAME).addClientUuid(clientId);
+            log.debugf("created %s [uuid=%s]", client.getClientId(), clientId);
         }
         log.debug("clients created");
     }
 
     @Test
-    public void concurrentLogin() throws Throwable {
-        System.out.println("*********************************************");
+    public void concurrentLoginSingleUser() throws Throwable {
+        log.info("*********************************************");
         long start = System.currentTimeMillis();
 
         AtomicReference<String> userSessionId = new AtomicReference<>();
+        LoginTask loginTask = null;
 
         try (CloseableHttpClient httpClient = HttpClientBuilder.create().setRedirectStrategy(new LaxRedirectStrategy()).build()) {
+            loginTask = new LoginTask(httpClient, userSessionId, 100, 1, Arrays.asList(
+              createHttpClientContextForUser(httpClient, "test-user@localhost", "password")
+            ));
+            run(DEFAULT_THREADS, DEFAULT_CLIENTS_COUNT, loginTask);
+            int clientSessionsCount = testingClient.testing().getClientSessionsCountInUserSession("test", userSessionId.get());
+            Assert.assertEquals(1 + DEFAULT_CLIENTS_COUNT, clientSessionsCount);
+        } finally {
+            long end = System.currentTimeMillis() - start;
+            log.infof("Statistics: %s", loginTask == null ? "??" : loginTask.getHistogram());
+            log.info("concurrentLoginSingleUser took " + (end/1000) + "s");
+            log.info("*********************************************");
+        }
+    }
 
-            HttpUriRequest request = handleLogin(getPageContent(oauth.getLoginFormUrl(), httpClient, null), "test-user@localhost", "password");
-            
-            log.debug("Executing login request");
-            
-            Assert.assertTrue(parseAndCloseResponse(httpClient.execute(request)).contains("<title>AUTH_RESPONSE</title>"));
-            AtomicInteger clientIndex = new AtomicInteger();
-            ThreadLocal<OAuthClient> oauthClient = new ThreadLocal<OAuthClient>() {
-                @Override
-                protected OAuthClient initialValue() {
-                    OAuthClient oauth1 = new OAuthClient();
-                    oauth1.init(adminClient, driver);
-                    return oauth1;
-                }
-            };
-
-            run(DEFAULT_THREADS, DEFAULT_CLIENTS_COUNT, (threadIndex, keycloak, realm) -> {
-                int i = clientIndex.getAndIncrement();
-                OAuthClient oauth1 = oauthClient.get();
-                oauth1.clientId("client" + i);
-                log.infof("%d [%s]: Accessing login page for %s", threadIndex, Thread.currentThread().getName(), oauth1.getClientId());
-
-                final HttpClientContext context = HttpClientContext.create();
-                String pageContent = getPageContent(oauth1.getLoginFormUrl(), httpClient, context);
-                String currentUrl = context.getRedirectLocations().get(0).toString();
-                Assert.assertThat(pageContent, Matchers.containsString("<title>AUTH_RESPONSE</title>"));
-                String code = getQueryFromUrl(currentUrl).get(OAuth2Constants.CODE);
+    protected HttpClientContext createHttpClientContextForUser(final CloseableHttpClient httpClient, String userName, String password) throws IOException {
+        final HttpClientContext context = HttpClientContext.create();
+        CookieStore cookieStore = new BasicCookieStore();
+        context.setCookieStore(cookieStore);
+        HttpUriRequest request = handleLogin(getPageContent(oauth.getLoginFormUrl(), httpClient, context), userName, password);
+        log.debug("Executing login request");
+        Assert.assertTrue(parseAndCloseResponse(httpClient.execute(request, context)).contains("<title>AUTH_RESPONSE</title>"));
+        return context;
+    }
 
-                OAuthClient.AccessTokenResponse accessRes = oauth1.doAccessTokenRequest(code, "password");
-                Assert.assertEquals("AccessTokenResponse: error: '" + accessRes.getError() + "' desc: '" + accessRes.getErrorDescription() + "'",
-                  200, accessRes.getStatusCode());
+    @Test
+    public void concurrentLoginMultipleUsers() throws Throwable {
+        log.info("*********************************************");
+        long start = System.currentTimeMillis();
 
-                OAuthClient.AccessTokenResponse refreshRes = oauth1.doRefreshTokenRequest(accessRes.getRefreshToken(), "password");
-                Assert.assertEquals("AccessTokenResponse: error: '" + refreshRes.getError() + "' desc: '" + refreshRes.getErrorDescription() + "'",
-                  200, refreshRes.getStatusCode());
+        AtomicReference<String> userSessionId = new AtomicReference<>();
+        LoginTask loginTask = null;
 
-                if (userSessionId.get() == null) {
-                    AccessToken token = oauth.verifyToken(accessRes.getAccessToken());
-                    userSessionId.set(token.getSessionState());
-                }
-            });
+        try (CloseableHttpClient httpClient = HttpClientBuilder.create().setRedirectStrategy(new LaxRedirectStrategy()).build()) {
+            loginTask = new LoginTask(httpClient, userSessionId, 100, 1, Arrays.asList(
+              createHttpClientContextForUser(httpClient, "test-user@localhost", "password"),
+              createHttpClientContextForUser(httpClient, "john-doh@localhost", "password"),
+              createHttpClientContextForUser(httpClient, "roleRichUser", "password")
+            ));
 
+            run(DEFAULT_THREADS, DEFAULT_CLIENTS_COUNT, loginTask);
             int clientSessionsCount = testingClient.testing().getClientSessionsCountInUserSession("test", userSessionId.get());
-            Assert.assertEquals(clientSessionsCount, 1 + (DEFAULT_THREADS * CLIENTS_PER_THREAD));
+            Assert.assertEquals(1 + DEFAULT_CLIENTS_COUNT / 3 + (DEFAULT_CLIENTS_COUNT % 3 <= 0 ? 0 : 1), clientSessionsCount);
         } finally {
-            logStats(start);
+            long end = System.currentTimeMillis() - start;
+            log.infof("Statistics: %s", loginTask == null ? "??" : loginTask.getHistogram());
+            log.info("concurrentLoginMultipleUsers took " + (end/1000) + "s");
+            log.info("*********************************************");
         }
     }
 
-    protected void logStats(long start) {
-        long end = System.currentTimeMillis() - start;
-        log.info("concurrentLogin took " + (end/1000) + "s");
-        log.info("*********************************************");
-    }
-    
-    private String getPageContent(String url, CloseableHttpClient httpClient, HttpClientContext context) throws IOException {
-
+    protected String getPageContent(String url, CloseableHttpClient httpClient, HttpClientContext context) throws IOException {
         HttpGet request = new HttpGet(url);
 
         request.setHeader("User-Agent", "Mozilla/5.0");
@@ -158,15 +165,10 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
                 "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8");
         request.setHeader("Accept-Language", "en-US,en;q=0.5");
 
-        if (context != null) {
-            return parseAndCloseResponse(httpClient.execute(request, context));
-        } else {
-            return parseAndCloseResponse(httpClient.execute(request));
-        }
-
+        return parseAndCloseResponse(httpClient.execute(request, context));
     }
 
-    private String parseAndCloseResponse(CloseableHttpResponse response) {
+    protected String parseAndCloseResponse(CloseableHttpResponse response) {
         try {
             int responseCode = response.getStatusLine().getStatusCode();
             String resp = EntityUtils.toString(response.getEntity());
@@ -186,16 +188,15 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
             }
         }
     }
-    
-    private HttpUriRequest handleLogin(String html, String username, String password) throws UnsupportedEncodingException {
 
-        System.out.println("Extracting form's data...");
+    protected HttpUriRequest handleLogin(String html, String username, String password) throws UnsupportedEncodingException {
+        log.debug("Extracting form's data...");
 
         // Keycloak form id
         Element loginform = Jsoup.parse(html).getElementById("kc-form-login");
         String method = loginform.attr("method");
         String action = loginform.attr("action");
-        
+
         List<NameValuePair> paramList = new ArrayList<>();
 
         for (Element inputElement : loginform.getElementsByTag("input")) {
@@ -207,9 +208,9 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
                 paramList.add(new BasicNameValuePair(key, password));
             }
         }
-        
+
         boolean isPost = method != null && "post".equalsIgnoreCase(method);
-        
+
         if (isPost) {
             HttpPost req = new HttpPost(action);
 
@@ -226,8 +227,8 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
             throw new UnsupportedOperationException("not supported yet!");
         }
     }
-    
-    private Map<String, String> getQueryFromUrl(String url) throws URISyntaxException {
+
+    private static Map<String, String> getQueryFromUrl(String url) throws URISyntaxException {
         Map<String, String> m = new HashMap<>();
         List<NameValuePair> pairs = URLEncodedUtils.parse(new URI(url), "UTF-8");
         for (NameValuePair p : pairs) {
@@ -236,5 +237,98 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
         return m;
     }
 
+    public class LoginTask implements KeycloakRunnable {
+
+        private final AtomicInteger clientIndex = new AtomicInteger();
+        private final ThreadLocal<OAuthClient> oauthClient = new ThreadLocal<OAuthClient>() {
+                @Override
+                protected OAuthClient initialValue() {
+                    OAuthClient oauth1 = new OAuthClient();
+                    oauth1.init(adminClient, driver);
+                    return oauth1;
+                }
+            };
+
+        private final CloseableHttpClient httpClient;
+        private final AtomicReference<String> userSessionId;
+
+        private final int retryDelayMs;
+        private final int retryCount;
+        private final AtomicInteger[] retryHistogram;
+        private final AtomicInteger totalInvocations = new AtomicInteger();
+        private final List<HttpClientContext> clientContexts;
+
+        public LoginTask(CloseableHttpClient httpClient, AtomicReference<String> userSessionId, int retryDelayMs, int retryCount, List<HttpClientContext> clientContexts) {
+            this.httpClient = httpClient;
+            this.userSessionId = userSessionId;
+            this.retryDelayMs = retryDelayMs;
+            this.retryCount = retryCount;
+            this.retryHistogram = new AtomicInteger[retryCount];
+            for (int i = 0; i < retryHistogram.length; i ++) {
+                retryHistogram[i] = new AtomicInteger();
+            }
+            this.clientContexts = clientContexts;
+        }
+
+        @Override
+        public void run(int threadIndex, Keycloak keycloak, RealmResource realm) throws Throwable {
+            int i = clientIndex.getAndIncrement();
+            OAuthClient oauth1 = oauthClient.get();
+            oauth1.clientId("client" + i);
+            log.infof("%d [%s]: Accessing login page for %s", threadIndex, Thread.currentThread().getName(), oauth1.getClientId());
+
+            final HttpClientContext templateContext = clientContexts.get(i % clientContexts.size());
+            final HttpClientContext context = HttpClientContext.create();
+            context.setCookieStore(templateContext.getCookieStore());
+            String pageContent = getPageContent(oauth1.getLoginFormUrl(), httpClient, context);
+            Assert.assertThat(pageContent, Matchers.containsString("<title>AUTH_RESPONSE</title>"));
+            Assert.assertThat(context.getRedirectLocations(), Matchers.notNullValue());
+            Assert.assertThat(context.getRedirectLocations(), Matchers.not(Matchers.empty()));
+            String currentUrl = context.getRedirectLocations().get(0).toString();
+            String code = getQueryFromUrl(currentUrl).get(OAuth2Constants.CODE);
+
+            AtomicReference<OAuthClient.AccessTokenResponse> accessResRef = new AtomicReference<>();
+            totalInvocations.incrementAndGet();
+
+            // obtain access + refresh token via code-to-token flow
+            OAuthClient.AccessTokenResponse accessRes = oauth1.doAccessTokenRequest(code, "password");
+            Assert.assertEquals("AccessTokenResponse: client: " + oauth1.getClientId() + ", error: '" + accessRes.getError() + "' desc: '" + accessRes.getErrorDescription() + "'",
+              200, accessRes.getStatusCode());
+            accessResRef.set(accessRes);
+
+            // Refresh access + refresh token using refresh token
+            int invocationIndex = Retry.execute(() -> {
+                OAuthClient.AccessTokenResponse refreshRes = oauth1.doRefreshTokenRequest(accessResRef.get().getRefreshToken(), "password");
+                Assert.assertEquals("AccessTokenResponse: client: " + oauth1.getClientId() + ", error: '" + refreshRes.getError() + "' desc: '" + refreshRes.getErrorDescription() + "'",
+                  200, refreshRes.getStatusCode());
+            }, retryCount, retryDelayMs);
+
+            retryHistogram[invocationIndex].incrementAndGet();
+
+            if (userSessionId.get() == null) {
+                AccessToken token = oauth1.verifyToken(accessResRef.get().getAccessToken());
+                userSessionId.set(token.getSessionState());
+            }
+        }
+
+        public int getRetryDelayMs() {
+            return retryDelayMs;
+        }
+
+        public int getRetryCount() {
+            return retryCount;
+        }
+
+        public Map<Integer, Integer> getHistogram() {
+            Map<Integer, Integer> res = new LinkedHashMap<>(retryCount);
+            for (int i = 0; i < retryHistogram.length; i ++) {
+                AtomicInteger item = retryHistogram[i];
+
+                res.put(i * retryDelayMs, item.get());
+            }
+            return res;
+        }
+    }
+
     
 }
\ No newline at end of file
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/cluster/ConcurrentLoginClusterTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/cluster/ConcurrentLoginClusterTest.java
index 3c755f4..0986c17 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/cluster/ConcurrentLoginClusterTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/cluster/ConcurrentLoginClusterTest.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.jboss.arquillian.container.test.api.ContainerController;
 import org.jboss.arquillian.test.api.ArquillianResource;
-import org.junit.After;
 import org.junit.Before;
 import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
 import org.keycloak.representations.idm.RealmRepresentation;
@@ -75,9 +74,8 @@ public class ConcurrentLoginClusterTest extends ConcurrentLoginTest {
 
 
     @Override
-    protected void logStats(long start) {
-        super.logStats(start);
-
+    public void concurrentLoginSingleUser() throws Throwable {
+        super.concurrentLoginSingleUser();
         JGroupsStats stats = testingClient.testing().cache(InfinispanConnectionProvider.SESSION_CACHE_NAME).getJgroupsStats();
         log.info("JGroups statistics: " + stats.statsAsString());
     }
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/AbstractAdminCrossDCTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/AbstractAdminCrossDCTest.java
index 2ad3cc3..27fed71 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/AbstractAdminCrossDCTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/AbstractAdminCrossDCTest.java
@@ -96,7 +96,7 @@ public abstract class AbstractAdminCrossDCTest extends AbstractCrossDCTest {
 
             Matcher<? super T> matcherInstance = matcherOnOldStat.apply(oldStat);
             assertThat(newStat, matcherInstance);
-        }, 5, 200);
+        }, 20, 200);
     }
 
     protected void assertStatistics(InfinispanStatistics stats, Runnable testedCode, BiConsumer<Map<String, Object>, Map<String, Object>> assertionOnStats) {
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/AbstractCrossDCTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/AbstractCrossDCTest.java
index cb21255..ec558bc 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/AbstractCrossDCTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/AbstractCrossDCTest.java
@@ -40,7 +40,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
- *
+ * Abstract cross-data-centre test that defines primitives for handling cross-DC setup.
  * @author hmlnarik
  */
 public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest {
@@ -63,7 +63,7 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest 
     @Before
     public void enableOnlyFirstNodeInFirstDc() {
         this.loadBalancerCtrl.disableAllBackendNodes();
-        loadBalancerCtrl.enableBackendNodeByName(getAutomaticallyStartedBackendNodes(0)
+        loadBalancerCtrl.enableBackendNodeByName(getAutomaticallyStartedBackendNodes(DC.FIRST)
                         .findFirst()
                         .orElseThrow(() -> new IllegalStateException("No node is started automatically"))
                         .getQualifier()
@@ -84,7 +84,7 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest 
     }
 
     @Before
-    public void InitRESTClientsForStartedNodes() {
+    public void initRESTClientsForStartedNodes() {
         log.debug("Init REST clients for automatically started nodes");
         this.suiteContext.getDcAuthServerBackendsInfo().stream()
                 .flatMap(List::stream)
@@ -188,7 +188,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest 
      * Disables routing requests to the given data center in the load balancer.
      * @param dcIndex
      */
-    public void disableDcOnLoadBalancer(int dcIndex) {
+    public void disableDcOnLoadBalancer(DC dc) {
+        int dcIndex = dc.ordinal();
         log.infof("Disabling load balancer for dc=%d", dcIndex);
         this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex).forEach(c -> loadBalancerCtrl.disableBackendNodeByName(c.getQualifier()));
     }
@@ -197,7 +198,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest 
      * Enables routing requests to all started nodes to the given data center in the load balancer.
      * @param dcIndex
      */
-    public void enableDcOnLoadBalancer(int dcIndex) {
+    public void enableDcOnLoadBalancer(DC dc) {
+        int dcIndex = dc.ordinal();
         log.infof("Enabling load balancer for dc=%d", dcIndex);
         final List<ContainerInfo> dcNodes = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex);
         if (! dcNodes.stream().anyMatch(ContainerInfo::isStarted)) {
@@ -214,7 +216,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest 
      * @param dcIndex
      * @param nodeIndex
      */
-    public void disableLoadBalancerNode(int dcIndex, int nodeIndex) {
+    public void disableLoadBalancerNode(DC dc, int nodeIndex) {
+        int dcIndex = dc.ordinal();
         log.infof("Disabling load balancer for dc=%d, node=%d", dcIndex, nodeIndex);
         loadBalancerCtrl.disableBackendNodeByName(this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex).get(nodeIndex).getQualifier());
     }
@@ -224,7 +227,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest 
      * @param dcIndex
      * @param nodeIndex
      */
-    public void enableLoadBalancerNode(int dcIndex, int nodeIndex) {
+    public void enableLoadBalancerNode(DC dc, int nodeIndex) {
+        int dcIndex = dc.ordinal();
         log.infof("Enabling load balancer for dc=%d, node=%d", dcIndex, nodeIndex);
         final ContainerInfo backendNode = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex).get(nodeIndex);
         if (backendNode == null) {
@@ -242,7 +246,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest 
      * @param nodeIndex
      * @return Started instance descriptor.
      */
-    public ContainerInfo startBackendNode(int dcIndex, int nodeIndex) {
+    public ContainerInfo startBackendNode(DC dc, int nodeIndex) {
+        int dcIndex = dc.ordinal();
         assertThat((Integer) dcIndex, lessThan(this.suiteContext.getDcAuthServerBackendsInfo().size()));
         final List<ContainerInfo> dcNodes = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex);
         assertThat((Integer) nodeIndex, lessThan(dcNodes.size()));
@@ -261,7 +266,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest 
      * @param nodeIndex
      * @return Stopped instance descriptor.
      */
-    public ContainerInfo stopBackendNode(int dcIndex, int nodeIndex) {
+    public ContainerInfo stopBackendNode(DC dc, int nodeIndex) {
+        int dcIndex = dc.ordinal();
         assertThat((Integer) dcIndex, lessThan(this.suiteContext.getDcAuthServerBackendsInfo().size()));
         final List<ContainerInfo> dcNodes = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex);
         assertThat((Integer) nodeIndex, lessThan(dcNodes.size()));
@@ -279,7 +285,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest 
      * @param dcIndex
      * @return
      */
-    public Stream<ContainerInfo> getManuallyStartedBackendNodes(int dcIndex) {
+    public Stream<ContainerInfo> getManuallyStartedBackendNodes(DC dc) {
+        int dcIndex = dc.ordinal();
         final List<ContainerInfo> dcNodes = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex);
         return dcNodes.stream().filter(ContainerInfo::isManual);
     }
@@ -289,7 +296,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest 
      * @param dcIndex
      * @return
      */
-    public Stream<ContainerInfo> getAutomaticallyStartedBackendNodes(int dcIndex) {
+    public Stream<ContainerInfo> getAutomaticallyStartedBackendNodes(DC dc) {
+        int dcIndex = dc.ordinal();
         final List<ContainerInfo> dcNodes = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex);
         return dcNodes.stream().filter(c -> ! c.isManual());
     }
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/ActionTokenCrossDCTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/ActionTokenCrossDCTest.java
index 972be31..1a4e079 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/ActionTokenCrossDCTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/ActionTokenCrossDCTest.java
@@ -41,6 +41,7 @@ import org.keycloak.testsuite.arquillian.annotation.JmxInfinispanCacheStatistics
 import org.keycloak.testsuite.arquillian.annotation.JmxInfinispanChannelStatistics;
 import org.keycloak.testsuite.arquillian.InfinispanStatistics;
 import org.keycloak.testsuite.arquillian.InfinispanStatistics.Constants;
+import org.keycloak.testsuite.pages.ProceedPage;
 import java.util.concurrent.TimeUnit;
 import org.hamcrest.Matchers;
 import static org.hamcrest.Matchers.greaterThan;
@@ -59,6 +60,9 @@ public class ActionTokenCrossDCTest extends AbstractAdminCrossDCTest {
     protected LoginPasswordUpdatePage passwordUpdatePage;
 
     @Page
+    protected ProceedPage proceedPage;
+
+    @Page
     protected ErrorPage errorPage;
 
     private String createUser(UserRepresentation userRep) {
@@ -73,11 +77,11 @@ public class ActionTokenCrossDCTest extends AbstractAdminCrossDCTest {
 
     @Test
     public void sendResetPasswordEmailSuccessWorksInCrossDc(
-      @JmxInfinispanCacheStatistics(dcIndex=0, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.ACTION_TOKEN_CACHE) InfinispanStatistics cacheDc0Node0Statistics,
-      @JmxInfinispanCacheStatistics(dcIndex=0, dcNodeIndex=1, cacheName=InfinispanConnectionProvider.ACTION_TOKEN_CACHE) InfinispanStatistics cacheDc0Node1Statistics,
-      @JmxInfinispanCacheStatistics(dcIndex=1, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.ACTION_TOKEN_CACHE) InfinispanStatistics cacheDc1Node0Statistics,
+      @JmxInfinispanCacheStatistics(dc=DC.FIRST, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.ACTION_TOKEN_CACHE) InfinispanStatistics cacheDc0Node0Statistics,
+      @JmxInfinispanCacheStatistics(dc=DC.FIRST, dcNodeIndex=1, cacheName=InfinispanConnectionProvider.ACTION_TOKEN_CACHE) InfinispanStatistics cacheDc0Node1Statistics,
+      @JmxInfinispanCacheStatistics(dc=DC.SECOND, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.ACTION_TOKEN_CACHE) InfinispanStatistics cacheDc1Node0Statistics,
       @JmxInfinispanChannelStatistics() InfinispanStatistics channelStatisticsCrossDc) throws Exception {
-        startBackendNode(0, 1);
+        startBackendNode(DC.FIRST, 1);
         cacheDc0Node1Statistics.waitToBecomeAvailable(10, TimeUnit.SECONDS);
 
         Comparable originalNumberOfEntries = cacheDc0Node0Statistics.getSingleStatistics(Constants.STAT_CACHE_NUMBER_OF_ENTRIES);
@@ -107,6 +111,8 @@ public class ActionTokenCrossDCTest extends AbstractAdminCrossDCTest {
           Matchers::is
         );
 
+        proceedPage.assertCurrent();
+        proceedPage.clickProceedLink();
         passwordUpdatePage.assertCurrent();
 
         // Verify that there was at least one message sent via the channel
@@ -120,8 +126,8 @@ public class ActionTokenCrossDCTest extends AbstractAdminCrossDCTest {
         // Verify that there was an action token added in the node which was targetted by the link
         assertThat(cacheDc0Node0Statistics.getSingleStatistics(Constants.STAT_CACHE_NUMBER_OF_ENTRIES), greaterThan(originalNumberOfEntries));
 
-        disableDcOnLoadBalancer(0);
-        enableDcOnLoadBalancer(1);
+        disableDcOnLoadBalancer(DC.FIRST);
+        enableDcOnLoadBalancer(DC.SECOND);
 
         // Make sure that after going to the link, the invalidated action token has been retrieved from Infinispan server cluster in the other DC
         assertSingleStatistics(cacheDc1Node0Statistics, Constants.STAT_CACHE_NUMBER_OF_ENTRIES,
@@ -134,7 +140,7 @@ public class ActionTokenCrossDCTest extends AbstractAdminCrossDCTest {
 
     @Test
     public void sendResetPasswordEmailAfterNewNodeAdded() throws IOException, MessagingException {
-        disableDcOnLoadBalancer(1);
+        disableDcOnLoadBalancer(DC.SECOND);
 
         UserRepresentation userRep = new UserRepresentation();
         userRep.setEnabled(true);
@@ -156,14 +162,16 @@ public class ActionTokenCrossDCTest extends AbstractAdminCrossDCTest {
 
         driver.navigate().to(link);
 
+        proceedPage.assertCurrent();
+        proceedPage.clickProceedLink();
         passwordUpdatePage.assertCurrent();
 
         passwordUpdatePage.changePassword("new-pass", "new-pass");
 
         assertEquals("Your account has been updated.", driver.getTitle());
 
-        disableDcOnLoadBalancer(0);
-        getManuallyStartedBackendNodes(1)
+        disableDcOnLoadBalancer(DC.FIRST);
+        getManuallyStartedBackendNodes(DC.SECOND)
           .findFirst()
           .ifPresent(c -> {
               containerController.start(c.getQualifier());
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/ConcurrentLoginCrossDCTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/ConcurrentLoginCrossDCTest.java
index fde1285..b710943 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/ConcurrentLoginCrossDCTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/ConcurrentLoginCrossDCTest.java
@@ -17,18 +17,26 @@
 
 package org.keycloak.testsuite.crossdc;
 
-import java.util.LinkedList;
+import org.keycloak.admin.client.Keycloak;
+import org.keycloak.admin.client.resource.RealmResource;
 import java.util.List;
 
 import org.jboss.arquillian.container.test.api.ContainerController;
 import org.jboss.arquillian.test.api.ArquillianResource;
-import org.junit.Before;
-import org.keycloak.representations.idm.RealmRepresentation;
-import org.keycloak.testsuite.Assert;
 import org.keycloak.testsuite.admin.concurrency.ConcurrentLoginTest;
 import org.keycloak.testsuite.arquillian.ContainerInfo;
 import org.keycloak.testsuite.arquillian.LoadBalancerController;
 import org.keycloak.testsuite.arquillian.annotation.LoadBalancer;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.LaxRedirectStrategy;
+import org.junit.Ignore;
+import org.junit.Test;
 
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -42,42 +50,64 @@ public class ConcurrentLoginCrossDCTest extends ConcurrentLoginTest {
     @ArquillianResource
     protected ContainerController containerController;
 
+    private static final int INVOCATIONS_BEFORE_SIMULATING_DC_FAILURE = 10;
+    private static final int LOGIN_TASK_DELAY_MS = 100;
+    private static final int LOGIN_TASK_RETRIES = 15;
 
-    // Need to postpone that
     @Override
-    public void addTestRealms(List<RealmRepresentation> testRealms) {
-    }
-
-
-    @Before
-    @Override
-    public void beforeTest() {
-        log.debug("Initializing load balancer - only enabling started nodes in the first DC");
+    public void beforeAbstractKeycloakTestRealmImport() {
+        log.debug("Initializing load balancer - enabling all started nodes across DCs");
         this.loadBalancerCtrl.disableAllBackendNodes();
 
-        // This should enable only the started nodes in first datacenter
-        this.suiteContext.getDcAuthServerBackendsInfo().get(0).stream()
+        this.suiteContext.getDcAuthServerBackendsInfo().stream()
+                .flatMap(List::stream)
                 .filter(ContainerInfo::isStarted)
                 .map(ContainerInfo::getQualifier)
                 .forEach(loadBalancerCtrl::enableBackendNodeByName);
+    }
 
-        this.suiteContext.getDcAuthServerBackendsInfo().get(1).stream()
-                .filter(ContainerInfo::isStarted)
-                .map(ContainerInfo::getQualifier)
-                .forEach(loadBalancerCtrl::enableBackendNodeByName);
+    @Test
+    public void concurrentLoginWithRandomDcFailures() throws Throwable {
+        log.info("*********************************************");
+        long start = System.currentTimeMillis();
+
+        AtomicReference<String> userSessionId = new AtomicReference<>();
+        LoginTask loginTask = null;
+
+        try (CloseableHttpClient httpClient = HttpClientBuilder.create().setRedirectStrategy(new LaxRedirectStrategy()).build()) {
+            loginTask = new LoginTask(httpClient, userSessionId, LOGIN_TASK_DELAY_MS, LOGIN_TASK_RETRIES, Arrays.asList(
+              createHttpClientContextForUser(httpClient, "test-user@localhost", "password")
+            ));
+            HttpUriRequest request = handleLogin(getPageContent(oauth.getLoginFormUrl(), httpClient, HttpClientContext.create()), "test-user@localhost", "password");
+            log.debug("Executing login request");
+            org.junit.Assert.assertTrue(parseAndCloseResponse(httpClient.execute(request)).contains("<title>AUTH_RESPONSE</title>"));
+
+            run(DEFAULT_THREADS, DEFAULT_CLIENTS_COUNT, loginTask, new SwapDcAvailability());
+            int clientSessionsCount = testingClient.testing().getClientSessionsCountInUserSession("test", userSessionId.get());
+            org.junit.Assert.assertEquals(1 + DEFAULT_CLIENTS_COUNT, clientSessionsCount);
+        } finally {
+            long end = System.currentTimeMillis() - start;
+            log.infof("Statistics: %s", loginTask == null ? "??" : loginTask.getHistogram());
+            log.info("concurrentLoginWithRandomDcFailures took " + (end/1000) + "s");
+            log.info("*********************************************");
+        }
+    }
 
+    private class SwapDcAvailability implements KeycloakRunnable {
 
+        private final AtomicInteger invocationCounter = new AtomicInteger();
 
-        // Import realms
-        log.info("Importing realms");
-        List<RealmRepresentation> testRealms = new LinkedList<>();
-        super.addTestRealms(testRealms);
-        for (RealmRepresentation testRealm : testRealms) {
-            importRealm(testRealm);
+        @Override
+        public void run(int threadIndex, Keycloak keycloak, RealmResource realm) throws Throwable {
+            final int currentInvocarion = invocationCounter.getAndIncrement();
+            if (currentInvocarion % INVOCATIONS_BEFORE_SIMULATING_DC_FAILURE == 0) {
+                int failureIndex = currentInvocarion / INVOCATIONS_BEFORE_SIMULATING_DC_FAILURE;
+                int dcToEnable = failureIndex % 2;
+                int dcToDisable = (failureIndex + 1) % 2;
+                suiteContext.getDcAuthServerBackendsInfo().get(dcToDisable).forEach(c -> loadBalancerCtrl.disableBackendNodeByName(c.getQualifier()));
+                suiteContext.getDcAuthServerBackendsInfo().get(dcToEnable).forEach(c -> loadBalancerCtrl.enableBackendNodeByName(c.getQualifier()));
+            }
         }
-        log.info("Realms imported");
-
-        // Finally create clients
-        createClients();
     }
+
 }
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LastSessionRefreshCrossDCTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LastSessionRefreshCrossDCTest.java
index ec2007e..69ed3bc 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LastSessionRefreshCrossDCTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LastSessionRefreshCrossDCTest.java
@@ -43,7 +43,7 @@ public class LastSessionRefreshCrossDCTest extends AbstractAdminCrossDCTest {
         testRealm().update(realmRep);
 
         // Enable second DC
-        enableDcOnLoadBalancer(1);
+        enableDcOnLoadBalancer(DC.SECOND);
 
         // Login
         OAuthClient.AuthorizationEndpointResponse response1 = oauth.doLogin("test-user@localhost", "password");
@@ -68,7 +68,7 @@ public class LastSessionRefreshCrossDCTest extends AbstractAdminCrossDCTest {
         setTimeOffset(10);
 
         // refresh token on DC0
-        disableDcOnLoadBalancer(1);
+        disableDcOnLoadBalancer(DC.SECOND);
         tokenResponse = oauth.doRefreshTokenRequest(refreshToken1, "password");
         String refreshToken2 = tokenResponse.getRefreshToken();
 
@@ -85,8 +85,8 @@ public class LastSessionRefreshCrossDCTest extends AbstractAdminCrossDCTest {
         }, 50, 50);
 
         // try refresh with old token on DC1. It should fail.
-        disableDcOnLoadBalancer(0);
-        enableDcOnLoadBalancer(1);
+        disableDcOnLoadBalancer(DC.FIRST);
+        enableDcOnLoadBalancer(DC.SECOND);
         tokenResponse = oauth.doRefreshTokenRequest(refreshToken1, "password");
         Assert.assertNull(tokenResponse.getAccessToken());
         Assert.assertNotNull(tokenResponse.getError());
@@ -106,7 +106,7 @@ public class LastSessionRefreshCrossDCTest extends AbstractAdminCrossDCTest {
     @Test
     public void testLastSessionRefreshUpdate() {
         // Disable DC1 on loadbalancer
-        disableDcOnLoadBalancer(1);
+        disableDcOnLoadBalancer(DC.SECOND);
 
         // Get statistics
         int stores0 = getRemoteCacheStats(0).getGlobalStores();
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java
index afcbf19..c0eb849 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java
@@ -22,6 +22,7 @@ import javax.ws.rs.core.Response;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.junit.Test;
 import org.keycloak.testsuite.Assert;
+import org.keycloak.testsuite.Retry;
 import org.keycloak.testsuite.util.Matchers;
 import org.keycloak.testsuite.util.OAuthClient;
 
@@ -34,12 +35,11 @@ public class LoginCrossDCTest extends AbstractAdminCrossDCTest {
 
     @Test
     public void loginTest() throws Exception {
-        log.info("Started to sleep");
-
-        enableDcOnLoadBalancer(1);
+        enableDcOnLoadBalancer(DC.SECOND);
 
+        //log.info("Started to sleep");
         //Thread.sleep(10000000);
-        for (int i=0 ; i<10 ; i++) {
+        for (int i=0 ; i<30 ; i++) {
             OAuthClient.AuthorizationEndpointResponse response1 = oauth.doLogin("test-user@localhost", "password");
             String code = response1.getCode();
             OAuthClient.AccessTokenResponse response2 = oauth.doAccessTokenRequest(code, "password");
diff --git a/testsuite/integration-arquillian/tests/base/src/test/resources/log4j.properties b/testsuite/integration-arquillian/tests/base/src/test/resources/log4j.properties
index 8533f9c..8f74373 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/resources/log4j.properties
+++ b/testsuite/integration-arquillian/tests/base/src/test/resources/log4j.properties
@@ -18,8 +18,9 @@
 log4j.rootLogger=info, keycloak
 
 log4j.appender.keycloak=org.apache.log4j.ConsoleAppender
-log4j.appender.keycloak.layout=org.apache.log4j.PatternLayout
-log4j.appender.keycloak.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p [%c] %m%n
+log4j.appender.keycloak.layout=org.apache.log4j.EnhancedPatternLayout
+keycloak.testsuite.logging.pattern=%d{HH:mm:ss,SSS} %-5p [%c] %m%n
+log4j.appender.keycloak.layout.ConversionPattern=${keycloak.testsuite.logging.pattern}
 
 # Logging with "info" when running test from IDE, but disabled when running test with "mvn" . Both cases can be overriden by use system property "keycloak.logging.level" (eg. -Dkeycloak.logging.level=debug )
 log4j.logger.org.keycloak=${keycloak.logging.level:info}
diff --git a/testsuite/integration-arquillian/tests/pom.xml b/testsuite/integration-arquillian/tests/pom.xml
index 3906c87..55292cb 100755
--- a/testsuite/integration-arquillian/tests/pom.xml
+++ b/testsuite/integration-arquillian/tests/pom.xml
@@ -85,6 +85,7 @@
         <keycloak.connectionsInfinispan.remoteStorePort>12232</keycloak.connectionsInfinispan.remoteStorePort>
         <keycloak.connectionsInfinispan.remoteStorePort.2>13232</keycloak.connectionsInfinispan.remoteStorePort.2>
         <keycloak.connectionsJpa.url.crossdc>jdbc:h2:mem:test-dc-shared</keycloak.connectionsJpa.url.crossdc>
+        <keycloak.testsuite.logging.pattern>%d{HH:mm:ss,SSS} %-5p [%c] %m%n</keycloak.testsuite.logging.pattern>
 
         <adapter.test.props/>
         <migration.import.properties/>
@@ -284,6 +285,7 @@
                             <keycloak.connectionsInfinispan.remoteStorePort>${keycloak.connectionsInfinispan.remoteStorePort}</keycloak.connectionsInfinispan.remoteStorePort>
                             <keycloak.connectionsInfinispan.remoteStorePort.2>${keycloak.connectionsInfinispan.remoteStorePort.2}</keycloak.connectionsInfinispan.remoteStorePort.2>
                             <keycloak.connectionsInfinispan.remoteStoreServer>${keycloak.connectionsInfinispan.remoteStoreServer}</keycloak.connectionsInfinispan.remoteStoreServer>
+                            <keycloak.testsuite.logging.pattern>${keycloak.testsuite.logging.pattern}</keycloak.testsuite.logging.pattern>
 
                             <keycloak.connectionsJpa.url.crossdc>${keycloak.connectionsJpa.url.crossdc}</keycloak.connectionsJpa.url.crossdc>
                         </systemPropertyVariables>
@@ -386,6 +388,7 @@
                 <auth.server.crossdc>true</auth.server.crossdc>
                 <cache.server.jboss>true</cache.server.jboss>
                 <cache.server.config.dir>${cache.server.home}/standalone/configuration</cache.server.config.dir>                
+                <keycloak.testsuite.logging.pattern>%d{HH:mm:ss,SSS} [%t] %-5p [%c{1.}] %m%n</keycloak.testsuite.logging.pattern>
             </properties>
             <dependencies>
                 <dependency>
@@ -460,6 +463,7 @@
                 <auth.server.crossdc>true</auth.server.crossdc>
                 <cache.server.jboss>true</cache.server.jboss>
                 <cache.server.config.dir>${cache.server.home}/standalone/configuration</cache.server.config.dir>
+                <keycloak.testsuite.logging.pattern>%d{HH:mm:ss,SSS} [%t] %-5p [%c{1.}] %m%n</keycloak.testsuite.logging.pattern>
             </properties>
             <dependencies>
                 <dependency>
@@ -584,6 +588,8 @@
                 <auth.server.backend2.home>${containers.home}/auth-server-${auth.server}-backend2</auth.server.backend2.home>
 
                 <auth.server.config.dir>${auth.server.backend1.home}/standalone/configuration</auth.server.config.dir>
+
+                <keycloak.testsuite.logging.pattern>%d{HH:mm:ss,SSS} [%t] %-5p [%c{1.}] %m%n</keycloak.testsuite.logging.pattern>
             </properties>
             <build>
                 <plugins>