keycloak-aplcache

KEYCLOAK-7275 KEYCLOAK-5479 Faster offline sessions preloading

11/1/2018 9:54:32 AM

Changes

Details

diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java
index 6551bf8..a76639d 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java
@@ -20,7 +20,6 @@ package org.keycloak.models.sessions.infinispan;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
 import org.keycloak.models.AuthenticatedClientSessionModel;
 import org.keycloak.models.ClientModel;
@@ -33,7 +32,7 @@ import org.keycloak.models.sessions.infinispan.changes.ClientSessionUpdateTask;
 import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
 import org.keycloak.models.sessions.infinispan.changes.Tasks;
 import org.keycloak.models.sessions.infinispan.changes.UserSessionUpdateTask;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshChecker;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshChecker;
 import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
 import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
 import java.util.UUID;
@@ -150,7 +149,7 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
 
             @Override
             public CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<AuthenticatedClientSessionEntity> sessionWrapper) {
-                return new LastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
+                return new CrossDCLastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
                         .shouldSaveClientSessionToRemoteCache(kcSession, client.getRealm(), sessionWrapper, userSession, offline, timestamp);
             }
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStore.java
new file mode 100644
index 0000000..81669d9
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStore.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.changes.sessions;
+
+import java.util.Map;
+
+import org.jboss.logging.Logger;
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.models.KeycloakSession;
+
+/**
+ * Cross-DC based CrossDCLastSessionRefreshStore
+ *
+ * Tracks the queue of lastSessionRefreshes, which were updated on this host. Those will be sent to the second DC in bulk, so second DC can update
+ * lastSessionRefreshes on it's side. Message is sent either periodically or if there are lots of stored lastSessionRefreshes.
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class CrossDCLastSessionRefreshStore extends AbstractLastSessionRefreshStore {
+
+    protected static final Logger logger = Logger.getLogger(CrossDCLastSessionRefreshStore.class);
+
+    private final String eventKey;
+
+    protected CrossDCLastSessionRefreshStore(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
+        super(maxIntervalBetweenMessagesSeconds, maxCount);
+        this.eventKey = eventKey;
+    }
+
+
+    protected void sendMessage(KeycloakSession kcSession, Map<String, SessionData> refreshesToSend) {
+        LastSessionRefreshEvent event = new LastSessionRefreshEvent(refreshesToSend);
+
+        if (logger.isDebugEnabled()) {
+            logger.debugf("Sending lastSessionRefreshes for key '%s'. Refreshes: %s", eventKey, event.getLastSessionRefreshes().toString());
+        }
+
+        // Don't notify local DC about the lastSessionRefreshes. They were processed here already
+        ClusterProvider cluster = kcSession.getProvider(ClusterProvider.class);
+        cluster.notify(eventKey, event, true, ClusterProvider.DCNotify.ALL_BUT_LOCAL_DC);
+    }
+
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStoreFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStoreFactory.java
new file mode 100644
index 0000000..b0d5702
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStoreFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.changes.sessions;
+
+import org.infinispan.Cache;
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
+import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class CrossDCLastSessionRefreshStoreFactory extends AbstractLastSessionRefreshStoreFactory {
+
+    // Name of periodic tasks to send events to the other DCs
+    public static final String LSR_PERIODIC_TASK_NAME = "lastSessionRefreshes";
+    public static final String LSR_OFFLINE_PERIODIC_TASK_NAME = "lastSessionRefreshes-offline";
+
+
+    public CrossDCLastSessionRefreshStore createAndInit(KeycloakSession kcSession, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, boolean offline) {
+        return createAndInit(kcSession, cache, DEFAULT_TIMER_INTERVAL_MS, DEFAULT_MAX_INTERVAL_BETWEEN_MESSAGES_SECONDS, DEFAULT_MAX_COUNT, offline);
+    }
+
+
+    public CrossDCLastSessionRefreshStore createAndInit(KeycloakSession kcSession, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache,
+                                                        long timerIntervalMs, int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
+        String eventKey = offline ? LSR_OFFLINE_PERIODIC_TASK_NAME :  LSR_PERIODIC_TASK_NAME;
+        CrossDCLastSessionRefreshStore store = createStoreInstance(maxIntervalBetweenMessagesSeconds, maxCount, eventKey);
+
+        // Register listener
+        ClusterProvider cluster = kcSession.getProvider(ClusterProvider.class);
+        cluster.registerListener(eventKey, new CrossDCLastSessionRefreshListener(kcSession, cache, offline));
+
+        // Setup periodic timer check
+        setupPeriodicTimer(kcSession, store, timerIntervalMs, eventKey);
+
+        return store;
+    }
+
+
+    protected CrossDCLastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
+        return new CrossDCLastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey);
+    }
+
+
+
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStore.java
new file mode 100644
index 0000000..cecc417
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStore.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.changes.sessions;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.jboss.logging.Logger;
+import org.keycloak.common.util.Time;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.RealmModel;
+import org.keycloak.models.session.UserSessionPersisterProvider;
+import org.keycloak.models.utils.SessionTimeoutHelper;
+
+/**
+ * The store is supposed to do periodic bulk update of lastSessionRefresh times of all userSessions, which were refreshed during some period
+ * of time. The updates are sent to UserSessionPersisterProvider (DB)
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class PersisterLastSessionRefreshStore extends AbstractLastSessionRefreshStore {
+
+    protected static final Logger logger = Logger.getLogger(PersisterLastSessionRefreshStore.class);
+
+    private final boolean offline;
+
+    protected PersisterLastSessionRefreshStore(int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
+        super(maxIntervalBetweenMessagesSeconds, maxCount);
+        this.offline = offline;
+    }
+
+
+    protected void sendMessage(KeycloakSession kcSession, Map<String, SessionData> refreshesToSend) {
+        Map<String, Set<String>> sessionIdsByRealm =
+                refreshesToSend.entrySet().stream().collect(
+                        Collectors.groupingBy(entry -> entry.getValue().getRealmId(),
+                                Collectors.mapping(Map.Entry::getKey, Collectors.toSet())));
+
+        // Update DB with a bit lower value than current time to ensure 'revokeRefreshToken' will work correctly taking server
+        int lastSessionRefresh = Time.currentTime() - SessionTimeoutHelper.PERIODIC_TASK_INTERVAL_SECONDS;
+
+        if (logger.isDebugEnabled()) {
+            logger.debugf("Updating %d userSessions with lastSessionRefresh: %d", refreshesToSend.size(), lastSessionRefresh);
+        }
+
+        UserSessionPersisterProvider persister = kcSession.getProvider(UserSessionPersisterProvider.class);
+
+        for (Map.Entry<String, Set<String>> entry : sessionIdsByRealm.entrySet()) {
+            RealmModel realm = kcSession.realms().getRealm(entry.getKey());
+
+            // Case when realm was deleted in the meantime. UserSessions were already deleted as well (callback for realm deletion)
+            if (realm == null) {
+                continue;
+            }
+
+            Set<String> userSessionIds = entry.getValue();
+
+            persister.updateLastSessionRefreshes(realm, lastSessionRefresh, userSessionIds, offline);
+        }
+    }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStoreFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStoreFactory.java
new file mode 100644
index 0000000..d720ef8
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStoreFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.changes.sessions;
+
+import org.infinispan.Cache;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
+import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class PersisterLastSessionRefreshStoreFactory extends AbstractLastSessionRefreshStoreFactory {
+
+    public PersisterLastSessionRefreshStore createAndInit(KeycloakSession kcSession, boolean offline) {
+        return createAndInit(kcSession, DEFAULT_TIMER_INTERVAL_MS, DEFAULT_MAX_INTERVAL_BETWEEN_MESSAGES_SECONDS, DEFAULT_MAX_COUNT, offline);
+    }
+
+
+    private PersisterLastSessionRefreshStore createAndInit(KeycloakSession kcSession,
+                                                          long timerIntervalMs, int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
+        PersisterLastSessionRefreshStore store = createStoreInstance(maxIntervalBetweenMessagesSeconds, maxCount, offline);
+
+        // Setup periodic timer check
+        setupPeriodicTimer(kcSession, store, timerIntervalMs, "db-last-session-refresh");
+
+        return store;
+    }
+
+
+    protected PersisterLastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
+        return new PersisterLastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, offline);
+    }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
index bf7b04f..b1429a6 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
@@ -19,10 +19,13 @@ package org.keycloak.models.sessions.infinispan;
 
 import org.infinispan.Cache;
 import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
 import org.infinispan.context.Flag;
 import org.infinispan.stream.CacheCollectors;
 import org.jboss.logging.Logger;
 import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.common.util.ObjectUtil;
+import org.keycloak.common.util.Retry;
 import org.keycloak.common.util.Time;
 import org.keycloak.models.AuthenticatedClientSessionModel;
 import org.keycloak.models.ClientModel;
@@ -35,7 +38,8 @@ import org.keycloak.models.UserSessionModel;
 import org.keycloak.models.UserSessionProvider;
 import org.keycloak.models.session.UserSessionPersisterProvider;
 import org.keycloak.models.sessions.infinispan.changes.Tasks;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStore;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
+import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
 import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
 import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
 import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
@@ -61,6 +65,8 @@ import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
 import org.keycloak.models.utils.SessionTimeoutHelper;
 
 import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -69,11 +75,11 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
-import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -100,15 +106,18 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
 
     protected final SessionEventsSenderTransaction clusterEventsSenderTx;
 
-    protected final LastSessionRefreshStore lastSessionRefreshStore;
-    protected final LastSessionRefreshStore offlineLastSessionRefreshStore;
+    protected final CrossDCLastSessionRefreshStore lastSessionRefreshStore;
+    protected final CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore;
+    protected final PersisterLastSessionRefreshStore persisterLastSessionRefreshStore;
 
+    protected final RemoteCacheInvoker remoteCacheInvoker;
     protected final InfinispanKeyGenerator keyGenerator;
 
     public InfinispanUserSessionProvider(KeycloakSession session,
                                          RemoteCacheInvoker remoteCacheInvoker,
-                                         LastSessionRefreshStore lastSessionRefreshStore,
-                                         LastSessionRefreshStore offlineLastSessionRefreshStore,
+                                         CrossDCLastSessionRefreshStore lastSessionRefreshStore,
+                                         CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore,
+                                         PersisterLastSessionRefreshStore persisterLastSessionRefreshStore,
                                          InfinispanKeyGenerator keyGenerator,
                                          Cache<String, SessionEntityWrapper<UserSessionEntity>> sessionCache,
                                          Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionCache,
@@ -134,6 +143,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
 
         this.lastSessionRefreshStore = lastSessionRefreshStore;
         this.offlineLastSessionRefreshStore = offlineLastSessionRefreshStore;
+        this.persisterLastSessionRefreshStore = persisterLastSessionRefreshStore;
+        this.remoteCacheInvoker = remoteCacheInvoker;
         this.keyGenerator = keyGenerator;
 
         session.getTransactionManager().enlistAfterCompletion(clusterEventsSenderTx);
@@ -160,14 +171,18 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
         return offline ? offlineClientSessionTx : clientSessionTx;
     }
 
-    protected LastSessionRefreshStore getLastSessionRefreshStore() {
+    protected CrossDCLastSessionRefreshStore getLastSessionRefreshStore() {
         return lastSessionRefreshStore;
     }
 
-    protected LastSessionRefreshStore getOfflineLastSessionRefreshStore() {
+    protected CrossDCLastSessionRefreshStore getOfflineLastSessionRefreshStore() {
         return offlineLastSessionRefreshStore;
     }
 
+    protected PersisterLastSessionRefreshStore getPersisterLastSessionRefreshStore() {
+        return persisterLastSessionRefreshStore;
+    }
+
     @Override
     public AuthenticatedClientSessionModel createClientSession(RealmModel realm, ClientModel client, UserSessionModel userSession) {
         final UUID clientSessionId = keyGenerator.generateKeyUUID(session, clientSessionCache);
@@ -535,7 +550,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
     }
 
     private void removeExpiredOfflineUserSessions(RealmModel realm) {
-        UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
         int expiredOffline = Time.currentTime() - realm.getOfflineSessionIdleTimeout() - SessionTimeoutHelper.PERIODIC_CLEANER_IDLE_TIMEOUT_WINDOW_SECONDS;
 
         // Each cluster node cleanups just local sessions, which are those owned by himself (+ few more taking l1 cache into account)
@@ -570,8 +584,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
                             futures.addTask(f);
                         });
 
-                        // TODO:mposolda can be likely optimized to delete all expired at one step
-                        persister.removeUserSession( userSessionEntity.getId(), true);
                     }
                 });
 
@@ -796,7 +808,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
 
     @Override
     public UserSessionModel createOfflineUserSession(UserSessionModel userSession) {
-        UserSessionAdapter offlineUserSession = importUserSession(userSession, true, false);
+        UserSessionAdapter offlineUserSession = importUserSession(userSession, true);
 
         // started and lastSessionRefresh set to current time
         int currentTime = Time.currentTime();
@@ -866,8 +878,117 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
         return getUserSessions(realm, client, first, max, true);
     }
 
+
     @Override
-    public UserSessionAdapter importUserSession(UserSessionModel userSession, boolean offline, boolean importAuthenticatedClientSessions) {
+    public void importUserSessions(Collection<UserSessionModel> persistentUserSessions, boolean offline) {
+        if (persistentUserSessions == null || persistentUserSessions.isEmpty()) {
+            return;
+        }
+
+        Map<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionsById = new HashMap<>();
+
+        Map<String, SessionEntityWrapper<UserSessionEntity>> sessionsById = persistentUserSessions.stream()
+                .map((UserSessionModel persistentUserSession) -> {
+
+                    UserSessionEntity userSessionEntityToImport = createUserSessionEntityInstance(persistentUserSession);
+
+                    for (Map.Entry<String, AuthenticatedClientSessionModel> entry : persistentUserSession.getAuthenticatedClientSessions().entrySet()) {
+                        String clientUUID = entry.getKey();
+                        AuthenticatedClientSessionModel clientSession = entry.getValue();
+                        AuthenticatedClientSessionEntity clientSessionToImport = createAuthenticatedClientSessionInstance(clientSession, userSessionEntityToImport.getRealmId(), offline);
+
+                        // Update timestamp to same value as userSession. LastSessionRefresh of userSession from DB will have correct value
+                        clientSessionToImport.setTimestamp(userSessionEntityToImport.getLastSessionRefresh());
+
+                        clientSessionsById.put(clientSessionToImport.getId(), new SessionEntityWrapper<>(clientSessionToImport));
+
+                        // Update userSession entity with the clientSession
+                        AuthenticatedClientSessionStore clientSessions = userSessionEntityToImport.getAuthenticatedClientSessions();
+                        clientSessions.put(clientUUID, clientSessionToImport.getId());
+                    }
+
+                    return userSessionEntityToImport;
+                })
+                .map(SessionEntityWrapper::new)
+                .collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
+
+        // Directly put all entities to the infinispan cache
+        Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = CacheDecorators.skipCacheLoaders(getCache(offline));
+        cache.putAll(sessionsById);
+
+        // put all entities to the remoteCache (if exists)
+        RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
+        if (remoteCache != null) {
+            Map<String, SessionEntityWrapper<UserSessionEntity>> sessionsByIdForTransport = sessionsById.values().stream()
+                    .map(SessionEntityWrapper::forTransport)
+                    .collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
+
+            Retry.executeWithBackoff((int iteration) -> {
+
+                try {
+                    remoteCache.putAll(sessionsByIdForTransport);
+                } catch (HotRodClientException re) {
+                    if (log.isDebugEnabled()) {
+                        log.debugf(re, "Failed to put import %d sessions to remoteCache. Iteration '%s'. Will try to retry the task",
+                                sessionsByIdForTransport.size(), iteration);
+                    }
+
+                    // Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
+                    throw re;
+                }
+
+            }, 10, 10);
+        }
+
+        // Import client sessions
+        Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessCache = offline ? offlineClientSessionCache : clientSessionCache;
+        clientSessCache = CacheDecorators.skipCacheLoaders(clientSessCache);
+
+        clientSessCache.putAll(clientSessionsById);
+
+        // put all entities to the remoteCache (if exists)
+        RemoteCache remoteCacheClientSessions = InfinispanUtil.getRemoteCache(clientSessCache);
+        if (remoteCacheClientSessions != null) {
+            Map<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> sessionsByIdForTransport = clientSessionsById.values().stream()
+                    .map(SessionEntityWrapper::forTransport)
+                    .collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
+
+            Retry.executeWithBackoff((int iteration) -> {
+
+                try {
+                    remoteCacheClientSessions.putAll(sessionsByIdForTransport);
+                } catch (HotRodClientException re) {
+                    if (log.isDebugEnabled()) {
+                        log.debugf(re, "Failed to put import %d client sessions to remoteCache. Iteration '%s'. Will try to retry the task",
+                                sessionsByIdForTransport.size(), iteration);
+                    }
+
+                    // Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
+                    throw re;
+                }
+
+            }, 10, 10);
+        }
+    }
+
+
+    // Imports just userSession without it's clientSessions
+    protected UserSessionAdapter importUserSession(UserSessionModel userSession, boolean offline) {
+        UserSessionEntity entity = createUserSessionEntityInstance(userSession);
+
+        InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx = getTransaction(offline);
+        InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx = getClientSessionTransaction(offline);
+
+        SessionUpdateTask<UserSessionEntity> importTask = Tasks.addIfAbsentSync();
+        userSessionUpdateTx.addTask(userSession.getId(), importTask, entity);
+
+        UserSessionAdapter importedSession = wrap(userSession.getRealm(), entity, offline);
+
+        return importedSession;
+    }
+
+
+    private UserSessionEntity createUserSessionEntityInstance(UserSessionModel userSession) {
         UserSessionEntity entity = new UserSessionEntity();
         entity.setId(userSession.getId());
         entity.setRealmId(userSession.getRealm().getId());
@@ -896,22 +1017,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
         entity.setStarted(userSession.getStarted());
         entity.setLastSessionRefresh(userSession.getLastSessionRefresh());
 
-        InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx = getTransaction(offline);
-        InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx = getClientSessionTransaction(offline);
-
-        SessionUpdateTask<UserSessionEntity> importTask = Tasks.addIfAbsentSync();
-        userSessionUpdateTx.addTask(userSession.getId(), importTask, entity);
-
-        UserSessionAdapter importedSession = wrap(userSession.getRealm(), entity, offline);
-
-        // Handle client sessions
-        if (importAuthenticatedClientSessions) {
-            for (AuthenticatedClientSessionModel clientSession : userSession.getAuthenticatedClientSessions().values()) {
-                importClientSession(importedSession, clientSession, userSessionUpdateTx, clientSessionUpdateTx, offline);
-            }
-        }
-
-        return importedSession;
+        return entity;
     }
 
 
@@ -919,16 +1025,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
                                                                   InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx,
                                                                   InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx,
                                                                   boolean offline) {
-        final UUID clientSessionId = keyGenerator.generateKeyUUID(session, getClientSessionCache(offline));
-        AuthenticatedClientSessionEntity entity = new AuthenticatedClientSessionEntity(clientSessionId);
-        entity.setRealmId(sessionToImportInto.getRealm().getId());
-
-        entity.setAction(clientSession.getAction());
-        entity.setAuthMethod(clientSession.getProtocol());
-
-        entity.setNotes(clientSession.getNotes() == null ? new ConcurrentHashMap<>() : clientSession.getNotes());
-        entity.setRedirectUri(clientSession.getRedirectUri());
-        entity.setTimestamp(clientSession.getTimestamp());
+        AuthenticatedClientSessionEntity entity = createAuthenticatedClientSessionInstance(clientSession, sessionToImportInto.getRealm().getId(), offline);
+        final UUID clientSessionId = entity.getId();
 
         SessionUpdateTask<AuthenticatedClientSessionEntity> createClientSessionTask = Tasks.addIfAbsentSync();
         clientSessionUpdateTx.addTask(entity.getId(), createClientSessionTask, entity);
@@ -942,6 +1040,22 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
         return new AuthenticatedClientSessionAdapter(session,this, entity, clientSession.getClient(), sessionToImportInto, userSessionUpdateTx, clientSessionUpdateTx, offline);
     }
 
+
+    private AuthenticatedClientSessionEntity createAuthenticatedClientSessionInstance(AuthenticatedClientSessionModel clientSession, String realmId, boolean offline) {
+        final UUID clientSessionId = keyGenerator.generateKeyUUID(session, getClientSessionCache(offline));
+        AuthenticatedClientSessionEntity entity = new AuthenticatedClientSessionEntity(clientSessionId);
+        entity.setRealmId(realmId);
+
+        entity.setAction(clientSession.getAction());
+        entity.setAuthMethod(clientSession.getProtocol());
+
+        entity.setNotes(clientSession.getNotes() == null ? new ConcurrentHashMap<>() : clientSession.getNotes());
+        entity.setRedirectUri(clientSession.getRedirectUri());
+        entity.setTimestamp(clientSession.getTimestamp());
+
+        return entity;
+    }
+
     private static class RegisterClientSessionTask implements SessionUpdateTask<UserSessionEntity> {
 
         private final String clientUuid;
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java
index 15d8c25..a59706a 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java
@@ -32,8 +32,10 @@ import org.keycloak.models.RealmModel;
 import org.keycloak.models.UserModel;
 import org.keycloak.models.UserSessionProvider;
 import org.keycloak.models.UserSessionProviderFactory;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStore;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStoreFactory;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
+import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
+import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStoreFactory;
 import org.keycloak.models.sessions.infinispan.initializer.CacheInitializer;
 import org.keycloak.models.sessions.infinispan.initializer.DBLockBasedCacheInitializer;
 import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
@@ -80,8 +82,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
     private Config.Scope config;
 
     private RemoteCacheInvoker remoteCacheInvoker;
-    private LastSessionRefreshStore lastSessionRefreshStore;
-    private LastSessionRefreshStore offlineLastSessionRefreshStore;
+    private CrossDCLastSessionRefreshStore lastSessionRefreshStore;
+    private CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore;
+    private PersisterLastSessionRefreshStore persisterLastSessionRefreshStore;
     private InfinispanKeyGenerator keyGenerator;
 
     @Override
@@ -93,7 +96,8 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
         Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionsCache = connections.getCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME);
         Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> loginFailures = connections.getCache(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME);
 
-        return new InfinispanUserSessionProvider(session, remoteCacheInvoker, lastSessionRefreshStore, offlineLastSessionRefreshStore, keyGenerator,
+        return new InfinispanUserSessionProvider(session, remoteCacheInvoker, lastSessionRefreshStore, offlineLastSessionRefreshStore,
+                persisterLastSessionRefreshStore, keyGenerator,
           cache, offlineSessionsCache, clientSessionCache, offlineClientSessionsCache, loginFailures);
     }
 
@@ -169,6 +173,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
 
                 initializer.initCache();
                 initializer.loadSessions();
+
+                // Initialize persister for periodically doing bulk DB updates of lastSessionRefresh timestamps of refreshed sessions
+                persisterLastSessionRefreshStore = new PersisterLastSessionRefreshStoreFactory().createAndInit(session, true);
             }
 
         });
@@ -233,7 +240,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
         });
 
         if (sessionsRemoteCache) {
-            lastSessionRefreshStore = new LastSessionRefreshStoreFactory().createAndInit(session, sessionsCache, false);
+            lastSessionRefreshStore = new CrossDCLastSessionRefreshStoreFactory().createAndInit(session, sessionsCache, false);
         }
 
         Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionsCache = ispn.getCache(InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME);
@@ -248,7 +255,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
         });
 
         if (offlineSessionsRemoteCache) {
-            offlineLastSessionRefreshStore = new LastSessionRefreshStoreFactory().createAndInit(session, offlineSessionsCache, true);
+            offlineLastSessionRefreshStore = new CrossDCLastSessionRefreshStoreFactory().createAndInit(session, offlineSessionsCache, true);
         }
 
         Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionsCache = ispn.getCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME);
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java
index 8dbcc20..afbd37e 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java
@@ -84,7 +84,6 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
             });
 
             state = new InitializerState(ctx[0].getSegmentsCount());
-            saveStateToCache(state);
         } else {
             KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
                 @Override
@@ -102,7 +101,7 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
     }
 
 
-    protected void startLoadingImpl(InitializerState state, SessionLoader.LoaderContext ctx) {
+    protected void startLoadingImpl(InitializerState state, SessionLoader.LoaderContext loaderCtx) {
         // Assume each worker has same processor's count
         int processors = Runtime.getRuntime().availableProcessors();
 
@@ -114,6 +113,8 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
         int errors = 0;
 
         try {
+            List<SessionLoader.WorkerResult> previousResults = new LinkedList<>();
+
             while (!state.isFinished()) {
                 int nodesCount = transport==null ? 1 : transport.getMembers().size();
                 int distributedWorkersCount = processors * nodesCount;
@@ -126,34 +127,43 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
                     log.trace("unfinished segments for this iteration: " + segments);
                 }
 
-                List<Future<WorkerResult>> futures = new LinkedList<>();
+                List<Future<SessionLoader.WorkerResult>> futures = new LinkedList<>();
+
+                int workerId = 0;
                 for (Integer segment : segments) {
+                    SessionLoader.WorkerContext workerCtx = sessionLoader.computeWorkerContext(loaderCtx, segment, workerId, previousResults);
+
                     SessionInitializerWorker worker = new SessionInitializerWorker();
-                    worker.setWorkerEnvironment(segment, ctx, sessionLoader);
+                    worker.setWorkerEnvironment(loaderCtx, workerCtx, sessionLoader);
+
                     if (!distributed) {
                         worker.setEnvironment(workCache, null);
                     }
 
-                    Future<WorkerResult> future = executorService.submit(worker);
+                    Future<SessionLoader.WorkerResult> future = executorService.submit(worker);
                     futures.add(future);
+
+                    workerId++;
                 }
 
-                for (Future<WorkerResult> future : futures) {
+                boolean anyFailure = false;
+                for (Future<SessionLoader.WorkerResult> future : futures) {
                     try {
-                        WorkerResult result = future.get();
+                        SessionLoader.WorkerResult result = future.get();
+                        previousResults.add(result);
 
-                        if (result.getSuccess()) {
-                            int computedSegment = result.getSegment();
-                            state.markSegmentFinished(computedSegment);
-                        } else {
+                        if (!result.isSuccess()) {
                             if (log.isTraceEnabled()) {
                                 log.tracef("Segment %d failed to compute", result.getSegment());
                             }
+                            anyFailure = true;
                         }
                     } catch (InterruptedException ie) {
+                        anyFailure = true;
                         errors++;
                         log.error("Interruped exception when computed future. Errors: " + errors, ie);
                     } catch (ExecutionException ee) {
+                        anyFailure = true;
                         errors++;
                         log.error("ExecutionException when computed future. Errors: " + errors, ee);
                     }
@@ -163,11 +173,19 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
                     throw new RuntimeException("Maximum count of worker errors occured. Limit was " + maxErrors + ". See server.log for details");
                 }
 
-                saveStateToCache(state);
+                // Save just if no error happened. Otherwise re-compute
+                if (!anyFailure) {
+                    for (SessionLoader.WorkerResult result : previousResults) {
+                        state.markSegmentFinished(result.getSegment());
+                    }
 
-                log.debugf("New initializer state pushed. The state is: %s", state);
+                    log.debugf("New initializer state is: %s", state);
+                }
             }
 
+            // Push the state after computation is finished
+            saveStateToCache(state);
+
             // Loader callback after the task is finished
             this.sessionLoader.afterAllSessionsLoaded(this);
 
@@ -179,33 +197,4 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
         }
     }
 
-
-    public static class WorkerResult implements Serializable {
-
-        private Integer segment;
-        private Boolean success;
-
-        public static WorkerResult create (Integer segment, boolean success) {
-            WorkerResult res = new WorkerResult();
-            res.setSegment(segment);
-            res.setSuccess(success);
-            return res;
-        }
-
-        public Integer getSegment() {
-            return segment;
-        }
-
-        public void setSegment(Integer segment) {
-            this.segment = segment;
-        }
-
-        public Boolean getSuccess() {
-            return success;
-        }
-
-        public void setSuccess(Boolean success) {
-            this.success = success;
-        }
-    }
 }
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java
index cf62230..74ad20a 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java
@@ -21,8 +21,8 @@ import org.infinispan.Cache;
 import org.infinispan.client.hotrod.exceptions.HotRodClientException;
 import org.infinispan.context.Flag;
 import org.jboss.logging.Logger;
-import org.keycloak.cluster.ClusterProvider;
 import org.keycloak.common.util.Retry;
+import org.keycloak.common.util.Time;
 import org.keycloak.models.KeycloakSession;
 import org.keycloak.models.UserSessionModel;
 import org.keycloak.models.session.UserSessionPersisterProvider;
@@ -33,7 +33,11 @@ import java.util.List;
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
  */
-public class OfflinePersistentUserSessionLoader implements SessionLoader<OfflinePersistentUserSessionLoaderContext>, Serializable {
+public class OfflinePersistentUserSessionLoader implements SessionLoader<OfflinePersistentLoaderContext,
+        OfflinePersistentWorkerContext, OfflinePersistentWorkerResult>, Serializable {
+
+    // Placeholder String used in the searching conditions to identify very first session
+    private static final String FIRST_SESSION_ID = "000";
 
     private static final Logger log = Logger.getLogger(OfflinePersistentUserSessionLoader.class);
 
@@ -53,46 +57,67 @@ public class OfflinePersistentUserSessionLoader implements SessionLoader<Offline
 
     @Override
     public void init(KeycloakSession session) {
-        UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
+    }
 
-        // TODO: check if update of timestamps in persister can be skipped entirely
-        int clusterStartupTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
 
-        log.debugf("Clearing detached sessions from persistent storage and updating timestamps to %d", clusterStartupTime);
+    @Override
+    public OfflinePersistentLoaderContext computeLoaderContext(KeycloakSession session) {
+        UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
+        int sessionsCount = persister.getUserSessionsCount(true);
 
-        persister.clearDetachedUserSessions();
-        persister.updateAllTimestamps(clusterStartupTime);
+        return new OfflinePersistentLoaderContext(sessionsCount, sessionsPerSegment);
     }
 
 
     @Override
-    public OfflinePersistentUserSessionLoaderContext computeLoaderContext(KeycloakSession session) {
-        UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
-        int sessionsCount = persister.getUserSessionsCount(true);
+    public OfflinePersistentWorkerContext computeWorkerContext(OfflinePersistentLoaderContext loaderCtx, int segment, int workerId, List<OfflinePersistentWorkerResult> previousResults) {
+        int lastCreatedOn;
+        String lastSessionId;
+        if (previousResults.isEmpty()) {
+            lastCreatedOn = 0;
+            lastSessionId = FIRST_SESSION_ID;
+        } else {
+            OfflinePersistentWorkerResult lastResult = previousResults.get(previousResults.size() - 1);
+            lastCreatedOn = lastResult.getLastCreatedOn();
+            lastSessionId = lastResult.getLastSessionId();
+        }
 
-        return new OfflinePersistentUserSessionLoaderContext(sessionsCount, sessionsPerSegment);
+        // We know the last loaded session. New workers iteration will start from this place
+        return new OfflinePersistentWorkerContext(segment, workerId, lastCreatedOn, lastSessionId);
     }
 
 
     @Override
-    public boolean loadSessions(KeycloakSession session, OfflinePersistentUserSessionLoaderContext ctx, int segment) {
-        int first = ctx.getSessionsPerSegment() * segment;
-        int max = sessionsPerSegment;
+    public OfflinePersistentWorkerResult createFailedWorkerResult(OfflinePersistentLoaderContext loaderContext, OfflinePersistentWorkerContext workerContext) {
+        return new OfflinePersistentWorkerResult(false, workerContext.getSegment(), workerContext.getWorkerId(), -1, FIRST_SESSION_ID);
+    }
 
-        if (log.isTraceEnabled()) {
-            log.tracef("Loading sessions - first: %d, max: %d", first, max);
-        }
+
+    @Override
+    public OfflinePersistentWorkerResult loadSessions(KeycloakSession session, OfflinePersistentLoaderContext loaderContext, OfflinePersistentWorkerContext ctx) {
+        int first = ctx.getWorkerId() * sessionsPerSegment;
+
+        log.tracef("Loading sessions for segment: %d", ctx.getSegment());
 
         UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
-        List<UserSessionModel> sessions = persister.loadUserSessions(first, max, true);
+        List<UserSessionModel> sessions = persister.loadUserSessions(first, sessionsPerSegment, true, ctx.getLastCreatedOn(), ctx.getLastSessionId());
+
+        log.tracef("Sessions loaded from DB - segment: %d", ctx.getSegment());
 
-        for (UserSessionModel persistentSession : sessions) {
+        UserSessionModel lastSession = null;
+        if (!sessions.isEmpty()) {
+            lastSession = sessions.get(sessions.size() - 1);
 
             // Save to memory/infinispan
-            UserSessionModel offlineUserSession = session.sessions().importUserSession(persistentSession, true, true);
+            session.sessions().importUserSessions(sessions, true);
         }
 
-        return true;
+        int lastCreatedOn = lastSession==null ? Time.currentTime() + 100000 : lastSession.getStarted();
+        String lastSessionId = lastSession==null ? FIRST_SESSION_ID : lastSession.getId();
+
+        log.tracef("Sessions imported to infinispan - segment: %d, lastCreatedOn: %d, lastSessionId: %s", ctx.getSegment(), lastCreatedOn, lastSessionId);
+
+        return new OfflinePersistentWorkerResult(true, ctx.getSegment(), ctx.getWorkerId(), lastCreatedOn, lastSessionId);
     }
 
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerContext.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerContext.java
new file mode 100644
index 0000000..8d8e3f3
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerContext.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.initializer;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class OfflinePersistentWorkerContext extends SessionLoader.WorkerContext {
+
+    private final int lastCreatedOn;
+    private final String lastSessionId;
+
+    public OfflinePersistentWorkerContext(int segment, int workerId, int lastCreatedOn, String lastSessionId) {
+        super(segment, workerId);
+        this.lastCreatedOn = lastCreatedOn;
+        this.lastSessionId = lastSessionId;
+    }
+
+
+    public int getLastCreatedOn() {
+        return lastCreatedOn;
+    }
+
+    public String getLastSessionId() {
+        return lastSessionId;
+    }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerResult.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerResult.java
new file mode 100644
index 0000000..44aa2c5
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerResult.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.initializer;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class OfflinePersistentWorkerResult extends SessionLoader.WorkerResult {
+
+    private final int lastCreatedOn;
+    private final String lastSessionId;
+
+
+    public OfflinePersistentWorkerResult(boolean success, int segment, int workerId, int lastCreatedOn, String lastSessionId) {
+        super(success, segment, workerId);
+        this.lastCreatedOn = lastCreatedOn;
+        this.lastSessionId = lastSessionId;
+    }
+
+
+    public int getLastCreatedOn() {
+        return lastCreatedOn;
+    }
+
+
+    public String getLastSessionId() {
+        return lastSessionId;
+    }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java
index 7df94bf..9bfb463 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java
@@ -31,19 +31,20 @@ import java.util.Set;
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
  */
-public class SessionInitializerWorker implements DistributedCallable<String, Serializable, InfinispanCacheInitializer.WorkerResult>, Serializable {
+public class SessionInitializerWorker implements DistributedCallable<String, Serializable, SessionLoader.WorkerResult>, Serializable {
 
     private static final Logger log = Logger.getLogger(SessionInitializerWorker.class);
 
-    private int segment;
-    private SessionLoader.LoaderContext ctx;
+
+    private SessionLoader.LoaderContext loaderCtx;
+    private SessionLoader.WorkerContext workerCtx;
     private SessionLoader sessionLoader;
 
     private transient Cache<String, Serializable> workCache;
 
-    public void setWorkerEnvironment(int segment, SessionLoader.LoaderContext ctx, SessionLoader sessionLoader) {
-        this.segment = segment;
-        this.ctx = ctx;
+    public void setWorkerEnvironment(SessionLoader.LoaderContext loaderCtx, SessionLoader.WorkerContext workerCtx, SessionLoader sessionLoader) {
+        this.loaderCtx = loaderCtx;
+        this.workerCtx = workerCtx;
         this.sessionLoader = sessionLoader;
     }
 
@@ -53,27 +54,28 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ser
     }
 
     @Override
-    public InfinispanCacheInitializer.WorkerResult call() throws Exception {
+    public SessionLoader.WorkerResult call() throws Exception {
         if (log.isTraceEnabled()) {
-            log.tracef("Running computation for segment: %d", segment);
+            log.tracef("Running computation for segment: %s", workerCtx.toString());
         }
 
         KeycloakSessionFactory sessionFactory = workCache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
         if (sessionFactory == null) {
             log.debugf("KeycloakSessionFactory not yet set in cache. Worker skipped");
-            return InfinispanCacheInitializer.WorkerResult.create(segment, false);
+            return sessionLoader.createFailedWorkerResult(loaderCtx, workerCtx);
         }
 
+        SessionLoader.WorkerResult[] ref = new SessionLoader.WorkerResult[1];
         KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
 
             @Override
             public void run(KeycloakSession session) {
-                sessionLoader.loadSessions(session, ctx, segment);
+                ref[0] = sessionLoader.loadSessions(session, loaderCtx, workerCtx);
             }
 
         });
 
-        return InfinispanCacheInitializer.WorkerResult.create(segment, true);
+        return ref[0];
     }
 
 }
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java
index 71b519e..83fe4e4 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java
@@ -18,13 +18,16 @@
 package org.keycloak.models.sessions.infinispan.initializer;
 
 import java.io.Serializable;
+import java.util.List;
 
 import org.keycloak.models.KeycloakSession;
 
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
  */
-public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContext> extends Serializable {
+public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContext,
+        WORKER_CONTEXT extends SessionLoader.WorkerContext,
+        WORKER_RESULT extends SessionLoader.WorkerResult> extends Serializable {
 
     /**
      * Will be triggered just once on cluster coordinator node to perform some generic initialization tasks (Eg. update DB before starting load).
@@ -38,7 +41,7 @@ public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContex
 
     /**
      *
-     * Will be triggered just once on cluster coordinator node to count the number of segments and other context data specific to the worker task.
+     * Will be triggered just once on cluster coordinator node to count the number of segments and other context data specific to whole computation.
      * Each segment will be then later computed in one "worker" task
      *
      * This method could be expensive to call, so the "computed" loaderContext object is passed among workers/loaders and needs to be serializable
@@ -50,14 +53,36 @@ public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContex
 
 
     /**
+     * Compute the worker context for current iteration
+     *
+     * @param loaderCtx global loader context
+     * @param segment the current segment (page) to compute
+     * @param workerId ID of worker for current worker iteration. Usually the number 0-8 (with single cluster node)
+     * @param previousResults workerResults from previous computation. Can be empty list in case of the operation is triggered for the 1st time
+     * @return
+     */
+    WORKER_CONTEXT computeWorkerContext(LOADER_CONTEXT loaderCtx, int segment, int workerId, List<WORKER_RESULT> previousResults);
+
+
+    /**
      * Will be called on all cluster nodes to load the specified page.
      *
      * @param session
-     * @param loaderContext loaderContext object, which was already computed before
-     * @param segment to be computed
+     * @param loaderContext global loaderContext object, which was already computed before
+     * @param workerContext for current iteration
      * @return
      */
-    boolean loadSessions(KeycloakSession session, LOADER_CONTEXT loaderContext, int segment);
+    WORKER_RESULT loadSessions(KeycloakSession session, LOADER_CONTEXT loaderContext, WORKER_CONTEXT workerContext);
+
+
+    /**
+     * Called when it's not possible to compute current iteration and load session for some reason (EG. infinispan not yet fully initialized)
+     *
+     * @param loaderContext
+     * @param workerContext
+     * @return
+     */
+    WORKER_RESULT createFailedWorkerResult(LOADER_CONTEXT loaderContext, WORKER_CONTEXT workerContext);
 
 
     /**
@@ -81,9 +106,78 @@ public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContex
      * Object, which contains some context data to be used by SessionLoader implementation. It's computed just once and then passed
      * to each {@link SessionLoader}. It needs to be {@link Serializable}
      */
-    interface LoaderContext extends Serializable {
+    class LoaderContext implements Serializable {
+
+        private final int segmentsCount;
+
+        public LoaderContext(int segmentsCount) {
+            this.segmentsCount = segmentsCount;
+        }
+
+
+        public int getSegmentsCount() {
+            return segmentsCount;
+        }
+
+    }
+
+
+    /**
+     * Object, which is computed before each worker iteration and contains some data to be used by the corresponding worker iteration.
+     * For example info about which segment/page should be loaded by current worker.
+     */
+    class WorkerContext implements Serializable {
+
+        private final int segment;
+        private final int workerId;
+
+        public WorkerContext(int segment, int workerId) {
+            this.segment = segment;
+            this.workerId = workerId;
+        }
+
+
+        public int getSegment() {
+            return this.segment;
+        }
+
+
+        public int getWorkerId() {
+            return this.workerId;
+        }
+    }
+
+
+    /**
+     * Result of single worker iteration
+     */
+    class WorkerResult implements Serializable {
+
+        private final boolean success;
+        private final int segment;
+        private final int workerId;
+
+
+        public WorkerResult(boolean success, int segment, int workerId) {
+            this.success = success;
+            this.segment = segment;
+            this.workerId = workerId;
+        }
+
+
+        public boolean isSuccess() {
+            return success;
+        }
+
+
+        public int getSegment() {
+            return segment;
+        }
+
 
-        int getSegmentsCount();
+        public int getWorkerId() {
+            return workerId;
+        }
 
     }
 }
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java
index 233a58a..2e927df 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java
@@ -19,6 +19,7 @@ package org.keycloak.models.sessions.infinispan.remotestore;
 
 import java.io.Serializable;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -42,7 +43,7 @@ import static org.infinispan.client.hotrod.impl.Util.await;
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
  */
-public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessionsLoaderContext>, Serializable {
+public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessionsLoaderContext, SessionLoader.WorkerContext, SessionLoader.WorkerResult>, Serializable {
 
     private static final Logger log = Logger.getLogger(RemoteCacheSessionsLoader.class);
 
@@ -92,26 +93,38 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
 
 
     @Override
-    public boolean loadSessions(KeycloakSession session, RemoteCacheSessionsLoaderContext context, int segment) {
+    public WorkerContext computeWorkerContext(RemoteCacheSessionsLoaderContext loaderCtx, int segment, int workerId, List<WorkerResult> previousResults) {
+        return new WorkerContext(segment, workerId);
+    }
+
+
+    @Override
+    public WorkerResult createFailedWorkerResult(RemoteCacheSessionsLoaderContext loaderContext, WorkerContext workerContext) {
+        return new WorkerResult(false, workerContext.getSegment(), workerContext.getWorkerId());
+    }
+
+
+    @Override
+    public WorkerResult loadSessions(KeycloakSession session, RemoteCacheSessionsLoaderContext loaderContext, WorkerContext ctx) {
         Cache cache = getCache(session);
         Cache decoratedCache = cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE, Flag.IGNORE_RETURN_VALUES);
         RemoteCache remoteCache = getRemoteCache(session);
 
-        Set<Integer> myIspnSegments = getMyIspnSegments(segment, context);
+        Set<Integer> myIspnSegments = getMyIspnSegments(ctx.getSegment(), loaderContext);
 
-        log.debugf("Will do bulk load of sessions from remote cache '%s' . Segment: %d", cache.getName(), segment);
+        log.debugf("Will do bulk load of sessions from remote cache '%s' . Segment: %d", cache.getName(), ctx.getSegment());
 
         CloseableIterator<Map.Entry> iterator = null;
         int countLoaded = 0;
         try {
-            iterator = remoteCache.retrieveEntries(null, myIspnSegments, context.getSessionsPerSegment());
+            iterator = remoteCache.retrieveEntries(null, myIspnSegments, loaderContext.getSessionsPerSegment());
             while (iterator.hasNext()) {
                 countLoaded++;
                 Map.Entry entry = iterator.next();
                 decoratedCache.putAsync(entry.getKey(), entry.getValue());
             }
         } catch (RuntimeException e) {
-            log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), segment);
+            log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), ctx.getSegment());
             throw e;
         } finally {
             if (iterator != null) {
@@ -119,9 +132,9 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
             }
         }
 
-        log.debugf("Successfully finished loading sessions from cache '%s' . Segment: %d, Count of sessions loaded: %d", cache.getName(), segment, countLoaded);
+        log.debugf("Successfully finished loading sessions from cache '%s' . Segment: %d, Count of sessions loaded: %d", cache.getName(), ctx.getSegment(), countLoaded);
 
-        return true;
+        return new WorkerResult(true, ctx.getSegment(), ctx.getWorkerId());
     }
 
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java
index ec74871..bf8787c 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java
@@ -17,38 +17,34 @@
 
 package org.keycloak.models.sessions.infinispan.remotestore;
 
-import java.io.Serializable;
-
 import org.keycloak.models.sessions.infinispan.initializer.SessionLoader;
 
 /**
  *
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
  */
-public class RemoteCacheSessionsLoaderContext implements SessionLoader.LoaderContext, Serializable {
+public class RemoteCacheSessionsLoaderContext extends SessionLoader.LoaderContext {
 
     // Count of hash segments for remote infinispan cache. It's by default 256 for distributed/replicated caches
     private final int ispnSegmentsCount;
 
-    // Count of segments (worker iterations for distributedExecutionService executions on KC side). Each segment will be 1 worker iteration.
-    // Count of segments could be lower than "ispnSegmentsCount" and depends on the size of the cache. For example if we have cache with just 500 items,
-    // we don't need 256 segments and send 256 requests to remoteCache to preload thing. Instead, we will have lower number of segments (EG. 8)
-    // and we will map more ispnSegments into 1 worker segment (In this case 256 / 8 = 32. So 32 ISPN segments mapped to each worker segment)
-    private final int segmentsCount;
-
     private final int sessionsPerSegment;
     private final int sessionsTotal;
 
 
     public RemoteCacheSessionsLoaderContext(int ispnSegmentsCount, int sessionsPerSegment, int sessionsTotal) {
+        super(computeSegmentsCount(sessionsTotal, sessionsPerSegment, ispnSegmentsCount));
         this.ispnSegmentsCount = ispnSegmentsCount;
         this.sessionsPerSegment = sessionsPerSegment;
         this.sessionsTotal = sessionsTotal;
-        this.segmentsCount = computeSegmentsCount(sessionsTotal, sessionsPerSegment, ispnSegmentsCount);
     }
 
 
-    private int computeSegmentsCount(int sessionsTotal, int sessionsPerSegment, int ispnSegments) {
+    // Count of segments (worker iterations for distributedExecutionService executions on KC side). Each segment will be 1 worker iteration.
+    // Count of segments could be lower than "ispnSegmentsCount" and depends on the size of the cache. For example if we have cache with just 500 items,
+    // we don't need 256 segments and send 256 requests to remoteCache to preload thing. Instead, we will have lower number of segments (EG. 8)
+    // and we will map more ispnSegments into 1 worker segment (In this case 256 / 8 = 32. So 32 ISPN segments mapped to each worker segment)
+    private static int computeSegmentsCount(int sessionsTotal, int sessionsPerSegment, int ispnSegments) {
         // No support by remote ISPN cache for segments. This can happen if remoteCache is local (non-clustered)
         if (ispnSegments < 0) {
             return 1;
@@ -68,11 +64,6 @@ public class RemoteCacheSessionsLoaderContext implements SessionLoader.LoaderCon
     }
 
 
-    @Override
-    public int getSegmentsCount() {
-        return segmentsCount;
-    }
-
     public int getIspnSegmentsCount() {
         return ispnSegmentsCount;
     }
@@ -89,7 +80,7 @@ public class RemoteCacheSessionsLoaderContext implements SessionLoader.LoaderCon
     @Override
     public String toString() {
         return new StringBuilder("RemoteCacheSessionsLoaderContext [ ")
-                .append("segmentsCount: ").append(segmentsCount)
+                .append("segmentsCount: ").append(getSegmentsCount())
                 .append(", ispnSegmentsCount: ").append(ispnSegmentsCount)
                 .append(", sessionsPerSegment: ").append(sessionsPerSegment)
                 .append(", sessionsTotal: ").append(sessionsTotal)
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java
index a7c6c7f..7355e6c 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java
@@ -24,10 +24,11 @@ import org.keycloak.models.RealmModel;
 import org.keycloak.models.UserModel;
 import org.keycloak.models.UserSessionModel;
 import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshChecker;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshChecker;
 import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
 import org.keycloak.models.sessions.infinispan.changes.Tasks;
 import org.keycloak.models.sessions.infinispan.changes.UserSessionUpdateTask;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshListener;
 import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
 import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore;
 import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
@@ -42,6 +43,8 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshListener.IGNORE_REMOTE_CACHE_UPDATE;
+
 /**
  * @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
  */
@@ -207,6 +210,15 @@ public class UserSessionAdapter implements UserSessionModel {
     }
 
     public void setLastSessionRefresh(int lastSessionRefresh) {
+        if (offline) {
+            // Received the message from the other DC that we should update the lastSessionRefresh in local cluster. Don't update DB in that case.
+            // The other DC already did.
+            Boolean ignoreRemoteCacheUpdate = (Boolean) session.getAttribute(CrossDCLastSessionRefreshListener.IGNORE_REMOTE_CACHE_UPDATE);
+            if (ignoreRemoteCacheUpdate == null || !ignoreRemoteCacheUpdate) {
+                provider.getPersisterLastSessionRefreshStore().putLastSessionRefresh(session, entity.getId(), realm.getId(), lastSessionRefresh);
+            }
+        }
+
         UserSessionUpdateTask task = new UserSessionUpdateTask() {
 
             @Override
@@ -216,7 +228,7 @@ public class UserSessionAdapter implements UserSessionModel {
 
             @Override
             public CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<UserSessionEntity> sessionWrapper) {
-                return new LastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
+                return new CrossDCLastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
                         .shouldSaveUserSessionToRemoteCache(UserSessionAdapter.this.session, UserSessionAdapter.this.realm, sessionWrapper, offline, lastSessionRefresh);
             }
 
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java
index 950a4c0..8ae5053 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java
@@ -31,6 +31,7 @@ import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
 import org.keycloak.models.KeycloakSession;
 import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
 import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+import org.keycloak.models.sessions.infinispan.initializer.SessionLoader;
 import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoader;
 import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoaderContext;
 import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
@@ -110,7 +111,7 @@ public class RemoteCacheSessionsLoaderTest {
             Set<String> visitedKeys = new HashSet<>();
             for (int currentSegment=0 ; currentSegment<ctx.getSegmentsCount() ; currentSegment++) {
                 logger.infof("Loading segment %d", currentSegment);
-                loader.loadSessions(null, ctx, currentSegment);
+                loader.loadSessions(null, ctx, new SessionLoader.WorkerContext(currentSegment, currentSegment));
 
                 logger.infof("Loaded %d keys for segment %d", cache2.keySet().size(), currentSegment);
                 totalCount = totalCount + cache2.keySet().size();
diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ConcurrencyLockingTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ConcurrencyLockingTest.java
index e439b66..dd0cf32 100755
--- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ConcurrencyLockingTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ConcurrencyLockingTest.java
@@ -11,6 +11,8 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -25,7 +27,12 @@ public class ConcurrencyLockingTest {
     public void testLocking() throws Exception {
         final DefaultCacheManager cacheManager = getVersionedCacheManager();
         Cache<String, String> cache = cacheManager.getCache("COUNTER_CACHE");
-        cache.put("key", "init");
+
+        Map<String, String> map = new HashMap<>();
+        map.put("key1", "val1");
+        map.put("key2", "val2");
+        cache.putAll(map);
+
         ExecutorService executor = Executors.newSingleThreadExecutor();
         executor.execute(new Runnable() {
             @Override
diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java
index 12a3ee5..719a39a 100644
--- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java
@@ -17,17 +17,18 @@
 
 package org.keycloak.models.sessions.infinispan.initializer;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.infinispan.Cache;
+import org.infinispan.client.hotrod.ProtocolVersion;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.commons.api.BasicCache;
 import org.infinispan.configuration.cache.CacheMode;
 import org.infinispan.configuration.cache.Configuration;
 import org.infinispan.configuration.cache.ConfigurationBuilder;
 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
-import org.infinispan.context.Flag;
 import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
@@ -37,7 +38,6 @@ import org.keycloak.common.util.Time;
 import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
 import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
 import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
-import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
 import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
 import java.util.UUID;
 
@@ -49,20 +49,36 @@ import java.util.UUID;
 @Ignore
 public class DistributedCacheConcurrentWritesTest {
 
-    private static final int ITERATION_PER_WORKER = 1000;
+    private static final int BATCHES_PER_WORKER = 1000;
+    private static final int ITEMS_IN_BATCH = 100;
 
-    private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
-    private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
+    public static void main(String[] args) throws Exception {
+        BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createCache("node1");
+        BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createCache("node2");
 
-    private static final UUID CLIENT_1_UUID = UUID.randomUUID();
+        // NOTE: This setup requires infinispan servers to be up and running on localhost:12232 and localhost:13232
+//        BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createRemoteCache("node1");
+//        BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createRemoteCache("node2");
+
+        try {
+            testConcurrentPut(cache1, cache2);
+        } finally {
+
+            // Kill JVM
+            cache1.stop();
+            cache2.stop();
+            stopMgr(cache1);
+            stopMgr(cache2);
+
+            System.out.println("Managers killed");
+        }
+    }
 
-    public static void main(String[] args) throws Exception {
-        CacheWrapper<String, UserSessionEntity> cache1 = createCache("node1");
-        CacheWrapper<String, UserSessionEntity> cache2 = createCache("node2");
 
+    private static SessionEntityWrapper<UserSessionEntity> createEntityInstance(String id) {
         // Create initial item
         UserSessionEntity session = new UserSessionEntity();
-        session.setId("123");
+        session.setId(id);
         session.setRealmId("foo");
         session.setBrokerSessionId("!23123123");
         session.setBrokerUserId(null);
@@ -76,177 +92,97 @@ public class DistributedCacheConcurrentWritesTest {
         clientSession.setAuthMethod("saml");
         clientSession.setAction("something");
         clientSession.setTimestamp(1234);
-        session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId());
-
-        try {
-            for (int i = 0; i < 10; i++) {
-                testConcurrentReplaceWithRemove("key-" + i, session, cache1, cache2);
-            }
-        } finally {
-
-            // Kill JVM
-            cache1.getCache().stop();
-            cache2.getCache().stop();
-            cache1.getCache().getCacheManager().stop();
-            cache2.getCache().getCacheManager().stop();
+        session.getAuthenticatedClientSessions().put("foo-client", clientSession.getId());
 
-            System.out.println("Managers killed");
-        }
+        return new SessionEntityWrapper<>(session);
     }
 
 
     // Reproducer for KEYCLOAK-7443 and KEYCLOAK-7489. The infinite loop can happen if cache.replace(key, old, new) is called and entity was removed on one cluster node in the meantime
-    private static void testConcurrentReplaceWithRemove(String key, UserSessionEntity session, CacheWrapper<String, UserSessionEntity> cache1,
-                                                 CacheWrapper<String, UserSessionEntity> cache2) throws InterruptedException {
-        cache1.put(key, session);
+    private static void testConcurrentPut(BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache1,
+                                          BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache2) throws InterruptedException {
 
-        // Create 2 workers for concurrent write and start them
-        Worker worker1 = new Worker(1, cache1, key);
-        Worker worker2 = new Worker(2, cache2, key);
+        // Create workers for concurrent write and start them
+        Worker worker1 = new Worker(1, cache1);
+        Worker worker2 = new Worker(2, cache2);
 
         long start = System.currentTimeMillis();
 
-        System.out.println("Started clustering test for key " + key);
+        System.out.println("Started clustering test");
 
         worker1.start();
         //worker1.join();
         worker2.start();
 
-        Thread.sleep(1000);
-        // Try to remove the entity after some sleep time.
-        cache1.wrappedCache.getAdvancedCache()
-                .withFlags(Flag.CACHE_MODE_LOCAL)
-                .remove(key);
-
         worker1.join();
         worker2.join();
 
         long took = System.currentTimeMillis() - start;
 
-        System.out.println("Test finished for key '" + key + "'. Took: " + took + " ms");
-
-//        System.out.println("Took: " + took + " ms for key . Notes count: " + session.getNotes().size() + ", failedReplaceCounter: " + failedReplaceCounter.get()
-//                + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
+        System.out.println("Test finished. Took: " + took + " ms. Cache size: " + cache1.size());
 
-//        // JGroups statistics
-//        JChannel channel = (JChannel)((JGroupsTransport)cache1.wrappedCache.getAdvancedCache().getRpcManager().getTransport()).getChannel();
-//        System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 +
-//                ", received messages: " + channel.getReceivedMessages());
+        // JGroups statistics
+        printStats(cache1);
     }
 
 
     private static class Worker extends Thread {
 
-        private final CacheWrapper<String, UserSessionEntity> cache;
-        private final int threadId;
-        private final String key;
+        private final BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache;
+        private final int startIndex;
 
-        public Worker(int threadId, CacheWrapper<String, UserSessionEntity> cache, String key) {
-            this.threadId = threadId;
+        public Worker(int threadId, BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache) {
             this.cache = cache;
-            this.key = key;
-            setName("th-" + key + "-" + threadId);
+            this.startIndex = (threadId - 1) * (ITEMS_IN_BATCH * BATCHES_PER_WORKER);
+            setName("th-" + threadId);
         }
 
         @Override
         public void run() {
 
-            for (int i=0 ; i<ITERATION_PER_WORKER ; i++) {
-
-                String noteKey = "n-" + threadId + "-" + i;
-
-                  // This code can be used to reproduce infinite loop ( KEYCLOAK-7443 )
-//                boolean replaced = false;
-//                while (!replaced) {
-//                    SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get(key);
-//                    oldWrapped.getEntity().getNotes().put(noteKey, "someVal");
-//                    replaced = cacheReplace(oldWrapped, oldWrapped.getEntity());
-//                }
-
-                int count = 0;
-                boolean replaced = false;
-                while (!replaced && count < 25) {
-                    count++;
-                    SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get(key);
-                    oldWrapped.getEntity().getNotes().put(noteKey, "someVal");
-                    replaced = cacheReplace(oldWrapped, oldWrapped.getEntity());
-                }
-                if (!replaced) {
-                    System.err.println("FAILED TO REPLACE ENTITY: " + key);
-                    return;
-                }
-            }
+            for (int page = 0; page < BATCHES_PER_WORKER ; page++) {
+                int startPageIndex = startIndex + page * ITEMS_IN_BATCH;
 
-        }
+                putItemsClassic(startPageIndex);
+                //putItemsAll(startPageIndex);
 
-        private boolean cacheReplace(SessionEntityWrapper<UserSessionEntity> oldSession, UserSessionEntity newSession) {
-            try {
-                boolean replaced = cache.replace(key, oldSession, newSession);
-                //cache.replace(key, newSession);
-                if (!replaced) {
-                    failedReplaceCounter.incrementAndGet();
-                    //return false;
-                    //System.out.println("Replace failed!!!");
-                }
-                return replaced;
-            } catch (Exception re) {
-                failedReplaceCounter2.incrementAndGet();
-                return false;
+                System.out.println("Thread " + getName() + ": Saved items from " + startPageIndex + " to " + (startPageIndex + ITEMS_IN_BATCH - 1));
             }
-            //return replaced;
         }
 
-    }
-
-    // Session clone
-
-    private static UserSessionEntity cloneSession(UserSessionEntity session) {
-        UserSessionEntity clone = new UserSessionEntity();
-        clone.setId(session.getId());
-        clone.setRealmId(session.getRealmId());
-        clone.setNotes(new ConcurrentHashMap<>(session.getNotes()));
-        return clone;
-    }
-
 
-    // Cache creation utils
-
-    public static class CacheWrapper<K, V extends SessionEntity> {
-
-        private final Cache<K, SessionEntityWrapper<V>> wrappedCache;
-
-        public CacheWrapper(Cache<K, SessionEntityWrapper<V>> wrappedCache) {
-            this.wrappedCache = wrappedCache;
+        // put items 1 by 1
+        private void putItemsClassic(int startPageIndex) {
+            for (int i = startPageIndex ; i < (startPageIndex + ITEMS_IN_BATCH) ; i++) {
+                String key = "key-" + startIndex + i;
+                SessionEntityWrapper<UserSessionEntity> session = createEntityInstance(key);
+                cache.put(key, session);
+            }
         }
 
 
-        public SessionEntityWrapper<V> get(K key) {
-            SessionEntityWrapper<V> val = wrappedCache.get(key);
-            return val;
-        }
-
-        public void put(K key, V newVal) {
-            SessionEntityWrapper<V> newWrapper = new SessionEntityWrapper<>(newVal);
-            wrappedCache.put(key, newWrapper);
-        }
+        // put all items together
+        private void putItemsAll(int startPageIndex) {
+            Map<String, SessionEntityWrapper<UserSessionEntity>> mapp = new HashMap<>();
 
+            for (int i = startPageIndex ; i < (startPageIndex + ITEMS_IN_BATCH) ; i++) {
+                String key = "key-" + startIndex + i;
+                SessionEntityWrapper<UserSessionEntity> session = createEntityInstance(key);
+                mapp.put(key, session);
+            }
 
-        public boolean replace(K key, SessionEntityWrapper<V> oldVal, V newVal) {
-            SessionEntityWrapper<V> newWrapper = new SessionEntityWrapper<>(newVal);
-            return wrappedCache.replace(key, oldVal, newWrapper);
+            cache.putAll(mapp);
         }
+    }
 
-        private Cache<K, SessionEntityWrapper<V>> getCache() {
-            return wrappedCache;
-        }
 
-    }
+    // Cache creation utils
 
 
-    public static CacheWrapper<String, UserSessionEntity> createCache(String nodeName) {
+    public static BasicCache<String, SessionEntityWrapper<UserSessionEntity>> createCache(String nodeName) {
         EmbeddedCacheManager mgr = createManager(nodeName);
-        Cache<String, SessionEntityWrapper<UserSessionEntity>> wrapped = mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
-        return new CacheWrapper<>(wrapped);
+        Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
+        return cache;
     }
 
 
@@ -272,7 +208,7 @@ public class DistributedCacheConcurrentWritesTest {
         ConfigurationBuilder distConfigBuilder = new ConfigurationBuilder();
         if (clustered) {
             distConfigBuilder.clustering().cacheMode(async ? CacheMode.DIST_ASYNC : CacheMode.DIST_SYNC);
-            distConfigBuilder.clustering().hash().numOwners(2);
+            distConfigBuilder.clustering().hash().numOwners(1);
 
             // Disable L1 cache
             distConfigBuilder.clustering().hash().l1().enabled(false);
@@ -283,4 +219,42 @@ public class DistributedCacheConcurrentWritesTest {
         return cacheManager;
 
     }
+
+
+    public static BasicCache<String, SessionEntityWrapper<UserSessionEntity>> createRemoteCache(String nodeName) {
+        int port = ("node1".equals(nodeName)) ? 12232 : 13232;
+
+        org.infinispan.client.hotrod.configuration.ConfigurationBuilder builder = new org.infinispan.client.hotrod.configuration.ConfigurationBuilder();
+        org.infinispan.client.hotrod.configuration.Configuration cfg = builder
+                .addServer().host("localhost").port(port)
+                .version(ProtocolVersion.PROTOCOL_VERSION_26)
+                .build();
+        RemoteCacheManager mgr = new RemoteCacheManager(cfg);
+        return mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
+    }
+
+    // CLEANUP METHODS
+
+    private static void stopMgr(BasicCache cache) {
+        if (cache instanceof Cache) {
+            ((Cache) cache).getCacheManager().stop();
+        } else {
+            ((RemoteCache) cache).getRemoteCacheManager().stop();
+        }
+    }
+
+
+    private static void printStats(BasicCache cache) {
+        if (cache instanceof Cache) {
+            Cache cache1 = (Cache) cache;
+
+            JChannel channel = ((JGroupsTransport)cache1.getAdvancedCache().getRpcManager().getTransport()).getChannel();
+
+            System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 +
+                    ", received messages: " + channel.getReceivedMessages());
+        } else {
+            Map<String, String> stats = ((RemoteCache) cache).stats().getStatsMap();
+            System.out.println("Stats: " + stats);
+        }
+    }
 }
diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java
index 1e72aa8..8a04231 100644
--- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java
@@ -19,7 +19,6 @@ package org.keycloak.models.sessions.infinispan.initializer;
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.keycloak.models.cache.infinispan.UserCacheSession;
 import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoaderContext;
 import org.keycloak.storage.CacheableStorageProviderModel;
 
@@ -35,16 +34,16 @@ public class InitializerStateTest {
 
     @Test
     public void testOfflineLoaderContext() {
-        OfflinePersistentUserSessionLoaderContext ctx = new OfflinePersistentUserSessionLoaderContext(28, 5);
+        OfflinePersistentLoaderContext ctx = new OfflinePersistentLoaderContext(28, 5);
         Assert.assertEquals(ctx.getSegmentsCount(), 6);
 
-        ctx = new OfflinePersistentUserSessionLoaderContext(19, 5);
+        ctx = new OfflinePersistentLoaderContext(19, 5);
         Assert.assertEquals(ctx.getSegmentsCount(), 4);
 
-        ctx = new OfflinePersistentUserSessionLoaderContext(20, 5);
+        ctx = new OfflinePersistentLoaderContext(20, 5);
         Assert.assertEquals(ctx.getSegmentsCount(), 4);
 
-        ctx = new OfflinePersistentUserSessionLoaderContext(21, 5);
+        ctx = new OfflinePersistentLoaderContext(21, 5);
         Assert.assertEquals(ctx.getSegmentsCount(), 5);
     }
 
@@ -78,7 +77,7 @@ public class InitializerStateTest {
 
     @Test
     public void testComputationState() {
-        OfflinePersistentUserSessionLoaderContext ctx = new OfflinePersistentUserSessionLoaderContext(28, 5);
+        OfflinePersistentLoaderContext ctx = new OfflinePersistentLoaderContext(28, 5);
         Assert.assertEquals(ctx.getSegmentsCount(), 6);
 
         InitializerState state = new InitializerState(ctx.getSegmentsCount());
diff --git a/model/jpa/src/main/java/org/keycloak/connections/jpa/updater/liquibase/custom/JpaUpdate4_7_0_OfflineSessionsTimestamps.java b/model/jpa/src/main/java/org/keycloak/connections/jpa/updater/liquibase/custom/JpaUpdate4_7_0_OfflineSessionsTimestamps.java
new file mode 100644
index 0000000..2aa1b11
--- /dev/null
+++ b/model/jpa/src/main/java/org/keycloak/connections/jpa/updater/liquibase/custom/JpaUpdate4_7_0_OfflineSessionsTimestamps.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.connections.jpa.updater.liquibase.custom;
+
+import liquibase.exception.CustomChangeException;
+import liquibase.statement.core.UpdateStatement;
+import liquibase.structure.core.Table;
+import org.keycloak.common.util.Time;
+
+/**
+ * Update CREATED_ON and LAST_SESSION_REFRESH columns to current startup time
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class JpaUpdate4_7_0_OfflineSessionsTimestamps extends CustomKeycloakTask {
+
+    @Override
+    protected void generateStatementsImpl() throws CustomChangeException {
+        String offlineUserSessionsTableName = database.correctObjectName("OFFLINE_USER_SESSION", Table.class);
+
+        try {
+            int currentTime = Time.currentTime();
+
+            UpdateStatement updateStatement = new UpdateStatement(null, null, offlineUserSessionsTableName)
+                    .addNewColumnValue("LAST_SESSION_REFRESH", currentTime);
+
+            statements.add(updateStatement);
+
+            confirmationMessage.append("Updated column LAST_SESSION_REFRESH in OFFLINE_USER_SESSION table with time " + currentTime);
+        } catch (Exception e) {
+            throw new CustomChangeException(getTaskId() + ": Exception when updating data from previous version", e);
+        }
+    }
+
+    @Override
+    protected String getTaskId() {
+        return "Update 4.7.0.Final";
+    }
+}
diff --git a/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java b/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java
index 587ed7e..132af92 100644
--- a/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java
+++ b/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java
@@ -18,10 +18,10 @@
 package org.keycloak.models.jpa.session;
 
 import org.jboss.logging.Logger;
+import org.keycloak.common.util.Time;
 import org.keycloak.models.AuthenticatedClientSessionModel;
 import org.keycloak.models.ClientModel;
 import org.keycloak.models.KeycloakSession;
-import org.keycloak.models.ModelException;
 import org.keycloak.models.RealmModel;
 import org.keycloak.models.UserModel;
 import org.keycloak.models.UserSessionModel;
@@ -30,17 +30,23 @@ import org.keycloak.models.session.PersistentClientSessionModel;
 import org.keycloak.models.session.PersistentUserSessionAdapter;
 import org.keycloak.models.session.PersistentUserSessionModel;
 import org.keycloak.models.session.UserSessionPersisterProvider;
+import org.keycloak.models.utils.SessionTimeoutHelper;
 import org.keycloak.storage.StorageId;
 
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 import javax.persistence.TypedQuery;
+
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -63,6 +69,7 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
 
         PersistentUserSessionEntity entity = new PersistentUserSessionEntity();
         entity.setUserSessionId(model.getUserSessionId());
+        entity.setCreatedOn(model.getStarted());
         entity.setRealmId(adapter.getRealm().getId());
         entity.setUserId(adapter.getUser().getId());
         String offlineStr = offlineToString(offline);
@@ -100,26 +107,6 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
     }
 
     @Override
-    public void updateUserSession(UserSessionModel userSession, boolean offline) {
-        PersistentUserSessionAdapter adapter;
-        if (userSession instanceof PersistentUserSessionAdapter) {
-            adapter = (PersistentUserSessionAdapter) userSession;
-        } else {
-            adapter = new PersistentUserSessionAdapter(userSession);
-        }
-
-        PersistentUserSessionModel model = adapter.getUpdatedModel();
-
-        String offlineStr = offlineToString(offline);
-        PersistentUserSessionEntity entity = em.find(PersistentUserSessionEntity.class, new PersistentUserSessionEntity.Key(userSession.getId(), offlineStr));
-        if (entity == null) {
-            throw new ModelException("UserSession with ID " + userSession.getId() + ", offline: " + offline + " not found");
-        }
-        entity.setLastSessionRefresh(model.getLastSessionRefresh());
-        entity.setData(model.getData());
-    }
-
-    @Override
     public void removeUserSession(String userSessionId, boolean offline) {
         String offlineStr = offlineToString(offline);
 
@@ -200,7 +187,6 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
                     .setParameter("externalClientId", clientStorageId.getExternalId())
                     .executeUpdate();
         }
-        num = em.createNamedQuery("deleteDetachedUserSessions").executeUpdate();
     }
 
     @Override
@@ -213,24 +199,53 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
         num = em.createNamedQuery("deleteUserSessionsByUser").setParameter("userId", userId).executeUpdate();
     }
 
+
     @Override
-    public void clearDetachedUserSessions() {
-        int num = em.createNamedQuery("deleteDetachedClientSessions").executeUpdate();
-        num = em.createNamedQuery("deleteDetachedUserSessions").executeUpdate();
+    public void updateLastSessionRefreshes(RealmModel realm, int lastSessionRefresh, Collection<String> userSessionIds, boolean offline) {
+        String offlineStr = offlineToString(offline);
+
+        int us = em.createNamedQuery("updateUserSessionLastSessionRefresh")
+                .setParameter("lastSessionRefresh", lastSessionRefresh)
+                .setParameter("realmId", realm.getId())
+                .setParameter("offline", offlineStr)
+                .setParameter("userSessionIds", userSessionIds)
+                .executeUpdate();
+
+        logger.debugf("Updated lastSessionRefresh of %d user sessions in realm '%s'", us, realm.getName());
     }
 
     @Override
-    public void updateAllTimestamps(int time) {
-        int num = em.createNamedQuery("updateClientSessionsTimestamps").setParameter("timestamp", time).executeUpdate();
-        num = em.createNamedQuery("updateUserSessionsTimestamps").setParameter("lastSessionRefresh", time).executeUpdate();
+    public void removeExpired(RealmModel realm) {
+        int expiredOffline = Time.currentTime() - realm.getOfflineSessionIdleTimeout() - SessionTimeoutHelper.PERIODIC_CLEANER_IDLE_TIMEOUT_WINDOW_SECONDS;
+
+        String offlineStr = offlineToString(true);
+
+        logger.tracef("Trigger removing expired user sessions for realm '%s'", realm.getName());
+
+        int cs = em.createNamedQuery("deleteExpiredClientSessions")
+                .setParameter("realmId", realm.getId())
+                .setParameter("lastSessionRefresh", expiredOffline)
+                .setParameter("offline", offlineStr)
+                .executeUpdate();
+
+        int us = em.createNamedQuery("deleteExpiredUserSessions")
+                .setParameter("realmId", realm.getId())
+                .setParameter("lastSessionRefresh", expiredOffline)
+                .setParameter("offline", offlineStr)
+                .executeUpdate();
+
+        logger.debugf("Removed %d expired user sessions and %d expired client sessions in realm '%s'", us, cs, realm.getName());
+
     }
 
     @Override
-    public List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline) {
+    public List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline, int lastCreatedOn, String lastUserSessionId) {
         String offlineStr = offlineToString(offline);
 
         TypedQuery<PersistentUserSessionEntity> query = em.createNamedQuery("findUserSessions", PersistentUserSessionEntity.class);
         query.setParameter("offline", offlineStr);
+        query.setParameter("lastCreatedOn", lastCreatedOn);
+        query.setParameter("lastSessionId", lastUserSessionId);
 
         if (firstResult != -1) {
             query.setFirstResult(firstResult);
@@ -239,26 +254,14 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
             query.setMaxResults(maxResults);
         }
 
-        List<PersistentUserSessionEntity> results = query.getResultList();
-        List<UserSessionModel> result = new ArrayList<>();
-        List<String> userSessionIds = new ArrayList<>();
-        for (PersistentUserSessionEntity entity : results) {
-            RealmModel realm = session.realms().getRealm(entity.getRealmId());
-            try {
-                UserModel user = session.users().getUserById(entity.getUserId(), realm);
-                // Case when user was deleted in the meantime
-                if (user == null) {
-                    onUserRemoved(realm, entity.getUserId());
-                    return loadUserSessions(firstResult, maxResults, offline);
-                }
-            } catch (Exception e) {
-                logger.debugv(e,"Failed to load user with id {0}", entity.getUserId());
-            }
+        List<PersistentUserSessionAdapter> result = query.getResultStream()
+                .map(this::toAdapter)
+                .collect(Collectors.toList());
 
+        Map<String, PersistentUserSessionAdapter> sessionsById = result.stream()
+                .collect(Collectors.toMap(UserSessionModel::getId, Function.identity()));
 
-            result.add(toAdapter(realm, entity));
-            userSessionIds.add(entity.getUserSessionId());
-        }
+        Set<String> userSessionIds = sessionsById.keySet();
 
         Set<String> removedClientUUIDs = new HashSet<>();
 
@@ -268,28 +271,17 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
             query2.setParameter("offline", offlineStr);
             List<PersistentClientSessionEntity> clientSessions = query2.getResultList();
 
-            // Assume both userSessions and clientSessions ordered by userSessionId
-            int j = 0;
-            for (UserSessionModel ss : result) {
-                PersistentUserSessionAdapter userSession = (PersistentUserSessionAdapter) ss;
-                Map<String, AuthenticatedClientSessionModel> currentClientSessions = userSession.getAuthenticatedClientSessions(); // This is empty now and we want to fill it
-
-                boolean next = true;
-                while (next && j < clientSessions.size()) {
-                    PersistentClientSessionEntity clientSession = clientSessions.get(j);
-                    if (clientSession.getUserSessionId().equals(userSession.getId())) {
-                        PersistentAuthenticatedClientSessionAdapter clientSessAdapter = toAdapter(userSession.getRealm(), userSession, clientSession);
-
-                        // Case when client was removed in the meantime
-                        if (clientSessAdapter.getClient() == null) {
-                            removedClientUUIDs.add(clientSession.getClientId());
-                        } else {
-                            currentClientSessions.put(clientSession.getClientId(), clientSessAdapter);
-                        }
-                        j++;
-                    } else {
-                        next = false;
-                    }
+            for (PersistentClientSessionEntity clientSession : clientSessions) {
+                PersistentUserSessionAdapter userSession = sessionsById.get(clientSession.getUserSessionId());
+
+                PersistentAuthenticatedClientSessionAdapter clientSessAdapter = toAdapter(userSession.getRealm(), userSession, clientSession);
+                Map<String, AuthenticatedClientSessionModel> currentClientSessions = userSession.getAuthenticatedClientSessions();
+
+                // Case when client was removed in the meantime
+                if (clientSessAdapter.getClient() == null) {
+                    removedClientUUIDs.add(clientSession.getClientId());
+                } else {
+                    currentClientSessions.put(clientSession.getClientId(), clientSessAdapter);
                 }
             }
         }
@@ -298,12 +290,18 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
             onClientRemoved(clientUUID);
         }
 
-        return result;
+        return (List) result;
+    }
+
+    private PersistentUserSessionAdapter toAdapter(PersistentUserSessionEntity entity) {
+        RealmModel realm = session.realms().getRealm(entity.getRealmId());
+        return toAdapter(realm, entity);
     }
 
     private PersistentUserSessionAdapter toAdapter(RealmModel realm, PersistentUserSessionEntity entity) {
         PersistentUserSessionModel model = new PersistentUserSessionModel();
         model.setUserSessionId(entity.getUserSessionId());
+        model.setStarted(entity.getCreatedOn());
         model.setLastSessionRefresh(entity.getLastSessionRefresh());
         model.setData(entity.getData());
         model.setOffline(offlineFromString(entity.getOffline()));
diff --git a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java
index 44c3188..5703f18 100644
--- a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java
+++ b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java
@@ -36,10 +36,9 @@ import java.io.Serializable;
         @NamedQuery(name="deleteClientSessionsByClientStorageProvider", query="delete from PersistentClientSessionEntity sess where sess.clientStorageProvider = :clientStorageProvider"),
         @NamedQuery(name="deleteClientSessionsByUser", query="delete from PersistentClientSessionEntity sess where sess.userSessionId IN (select u.userSessionId from PersistentUserSessionEntity u where u.userId = :userId)"),
         @NamedQuery(name="deleteClientSessionsByUserSession", query="delete from PersistentClientSessionEntity sess where sess.userSessionId = :userSessionId and sess.offline = :offline"),
-        @NamedQuery(name="deleteDetachedClientSessions", query="delete from PersistentClientSessionEntity sess where NOT EXISTS (select u.userSessionId from PersistentUserSessionEntity u where u.userSessionId = sess.userSessionId )"),
+        @NamedQuery(name="deleteExpiredClientSessions", query="delete from PersistentClientSessionEntity sess where sess.userSessionId IN (select u.userSessionId from PersistentUserSessionEntity u where u.realmId = :realmId AND u.offline = :offline AND u.lastSessionRefresh < :lastSessionRefresh)"),
         @NamedQuery(name="findClientSessionsByUserSession", query="select sess from PersistentClientSessionEntity sess where sess.userSessionId=:userSessionId and sess.offline = :offline"),
-        @NamedQuery(name="findClientSessionsByUserSessions", query="select sess from PersistentClientSessionEntity sess where sess.offline = :offline and sess.userSessionId IN (:userSessionIds) order by sess.userSessionId"),
-        @NamedQuery(name="updateClientSessionsTimestamps", query="update PersistentClientSessionEntity c set timestamp = :timestamp"),
+        @NamedQuery(name="findClientSessionsByUserSessions", query="select sess from PersistentClientSessionEntity sess where sess.offline = :offline and sess.userSessionId IN (:userSessionIds) order by sess.userSessionId")
 })
 @Table(name="OFFLINE_CLIENT_SESSION")
 @Entity
diff --git a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java
index cc35be2..ade3ea0 100644
--- a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java
+++ b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java
@@ -34,10 +34,13 @@ import java.io.Serializable;
 @NamedQueries({
         @NamedQuery(name="deleteUserSessionsByRealm", query="delete from PersistentUserSessionEntity sess where sess.realmId = :realmId"),
         @NamedQuery(name="deleteUserSessionsByUser", query="delete from PersistentUserSessionEntity sess where sess.userId = :userId"),
-        @NamedQuery(name="deleteDetachedUserSessions", query="delete from PersistentUserSessionEntity sess where NOT EXISTS (select c.userSessionId from PersistentClientSessionEntity c where c.userSessionId = sess.userSessionId)"),
+        @NamedQuery(name="deleteExpiredUserSessions", query="delete from PersistentUserSessionEntity sess where sess.realmId = :realmId AND sess.offline = :offline AND sess.lastSessionRefresh < :lastSessionRefresh"),
+        @NamedQuery(name="updateUserSessionLastSessionRefresh", query="update PersistentUserSessionEntity sess set lastSessionRefresh = :lastSessionRefresh where sess.realmId = :realmId" +
+                " AND sess.offline = :offline AND sess.userSessionId IN (:userSessionIds)"),
         @NamedQuery(name="findUserSessionsCount", query="select count(sess) from PersistentUserSessionEntity sess where sess.offline = :offline"),
-        @NamedQuery(name="findUserSessions", query="select sess from PersistentUserSessionEntity sess where sess.offline = :offline order by sess.userSessionId"),
-        @NamedQuery(name="updateUserSessionsTimestamps", query="update PersistentUserSessionEntity c set lastSessionRefresh = :lastSessionRefresh"),
+        @NamedQuery(name="findUserSessions", query="select sess from PersistentUserSessionEntity sess where sess.offline = :offline" +
+                " AND (sess.createdOn > :lastCreatedOn OR (sess.createdOn = :lastCreatedOn AND sess.userSessionId > :lastSessionId))" +
+                " order by sess.createdOn,sess.userSessionId")
 
 })
 @Table(name="OFFLINE_USER_SESSION")
@@ -55,6 +58,9 @@ public class PersistentUserSessionEntity {
     @Column(name="USER_ID", length = 255)
     protected String userId;
 
+    @Column(name = "CREATED_ON")
+    protected int createdOn;
+
     @Column(name = "LAST_SESSION_REFRESH")
     protected int lastSessionRefresh;
 
@@ -90,6 +96,14 @@ public class PersistentUserSessionEntity {
         this.userId = userId;
     }
 
+    public int getCreatedOn() {
+        return createdOn;
+    }
+
+    public void setCreatedOn(int createdOn) {
+        this.createdOn = createdOn;
+    }
+
     public int getLastSessionRefresh() {
         return lastSessionRefresh;
     }
diff --git a/model/jpa/src/main/resources/META-INF/jpa-changelog-4.7.0.xml b/model/jpa/src/main/resources/META-INF/jpa-changelog-4.7.0.xml
index f8c9693..43a9871 100644
--- a/model/jpa/src/main/resources/META-INF/jpa-changelog-4.7.0.xml
+++ b/model/jpa/src/main/resources/META-INF/jpa-changelog-4.7.0.xml
@@ -19,9 +19,30 @@
 
     <changeSet author="sguilhen@redhat.com" id="4.7.0-KEYCLOAK-1267">
         <addColumn tableName="REALM">
-            <column name="SSO_MAX_LIFESPAN_REMEMBER_ME" type="INT"/>
-            <column name="SSO_IDLE_TIMEOUT_REMEMBER_ME" type="INT"/>
+            <column name="SSO_MAX_LIFESPAN_REMEMBER_ME" type="INT" defaultValueNumeric="0"/>
+            <column name="SSO_IDLE_TIMEOUT_REMEMBER_ME" type="INT" defaultValueNumeric="0"/>
         </addColumn>
     </changeSet>
 
+
+    <changeSet author="keycloak" id="4.7.0-KEYCLOAK-7275">
+        <renameColumn tableName="OFFLINE_USER_SESSION" oldColumnName="LAST_SESSION_REFRESH" newColumnName="CREATED_ON" columnDataType="INT" />
+
+        <addNotNullConstraint tableName="OFFLINE_USER_SESSION" columnName="CREATED_ON" columnDataType="INT" defaultNullValue="0" />
+
+        <addColumn tableName="OFFLINE_USER_SESSION">
+            <column name="LAST_SESSION_REFRESH" type="INT" defaultValueNumeric="0">
+                <constraints nullable="false"/>
+            </column>
+        </addColumn>
+
+        <!--Update "lastSessionRefresh" to the current time when migrating from previous version-->
+        <customChange class="org.keycloak.connections.jpa.updater.liquibase.custom.JpaUpdate4_7_0_OfflineSessionsTimestamps"/>
+
+        <createIndex indexName="IDX_OFFLINE_USS_CREATEDON" tableName="OFFLINE_USER_SESSION">
+            <column name="CREATED_ON" type="INT"/>
+        </createIndex>
+
+    </changeSet>
+
 </databaseChangeLog>
diff --git a/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java b/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java
index ac1c224..61d563d 100755
--- a/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java
+++ b/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java
@@ -19,6 +19,7 @@ package org.keycloak.models;
 
 import org.keycloak.provider.Provider;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -63,7 +64,7 @@ public interface UserSessionProvider extends Provider {
     void removeUserSession(RealmModel realm, UserSessionModel session);
     void removeUserSessions(RealmModel realm, UserModel user);
 
-    /** Implementation should propagate removal of expired userSessions to userSessionPersister too **/
+    /** Implementation doesn't need to propagate removal of expired userSessions to userSessionPersister. Cleanup on persister will be called separately **/
     void removeExpired(RealmModel realm);
     void removeUserSessions(RealmModel realm);
 
@@ -89,8 +90,8 @@ public interface UserSessionProvider extends Provider {
     long getOfflineSessionsCount(RealmModel realm, ClientModel client);
     List<UserSessionModel> getOfflineUserSessions(RealmModel realm, ClientModel client, int first, int max);
 
-    /** Triggered by persister during pre-load. It optionally imports authenticatedClientSessions too if requested. Otherwise the imported UserSession will have empty list of AuthenticationSessionModel **/
-    UserSessionModel importUserSession(UserSessionModel persistentUserSession, boolean offline, boolean importAuthenticatedClientSessions);
+    /** Triggered by persister during pre-load. It imports authenticatedClientSessions too **/
+    void importUserSessions(Collection<UserSessionModel> persistentUserSessions, boolean offline);
 
     void close();
 
diff --git a/server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java b/server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java
index 10b28a2..f818f16 100644
--- a/server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java
+++ b/server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java
@@ -26,6 +26,7 @@ import org.keycloak.models.RealmModel;
 import org.keycloak.models.UserModel;
 import org.keycloak.models.UserSessionModel;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -75,11 +76,6 @@ public class DisabledUserSessionPersisterProvider implements UserSessionPersiste
     }
 
     @Override
-    public void updateUserSession(UserSessionModel userSession, boolean offline) {
-
-    }
-
-    @Override
     public void removeUserSession(String userSessionId, boolean offline) {
 
     }
@@ -105,17 +101,17 @@ public class DisabledUserSessionPersisterProvider implements UserSessionPersiste
     }
 
     @Override
-    public void clearDetachedUserSessions() {
+    public void updateLastSessionRefreshes(RealmModel realm, int lastSessionRefresh, Collection<String> userSessionIds, boolean offline) {
 
     }
 
     @Override
-    public void updateAllTimestamps(int time) {
+    public void removeExpired(RealmModel realm) {
 
     }
 
     @Override
-    public List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline) {
+    public List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline, int lastCreatedOn, String lastUserSessionId) {
         return Collections.emptyList();
     }
 
diff --git a/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java b/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java
index 61d5818..7fcec1c 100644
--- a/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java
+++ b/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java
@@ -54,12 +54,12 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
         data.setIpAddress(other.getIpAddress());
         data.setNotes(other.getNotes());
         data.setRememberMe(other.isRememberMe());
-        data.setStarted(other.getStarted());
         if (other.getState() != null) {
             data.setState(other.getState().toString());
         }
 
         this.model = new PersistentUserSessionModel();
+        this.model.setStarted(other.getStarted());
         this.model.setUserSessionId(other.getId());
         this.model.setLastSessionRefresh(other.getLastSessionRefresh());
 
@@ -157,7 +157,7 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
 
     @Override
     public int getStarted() {
-        return getData().getStarted();
+        return model.getStarted();
     }
 
     @Override
@@ -274,6 +274,7 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
         @JsonProperty("rememberMe")
         private boolean rememberMe;
 
+        // TODO: Keeping those just for backwards compatibility. @JsonIgnoreProperties doesn't work on Wildfly - probably due to classloading issues
         @JsonProperty("started")
         private int started;
 
@@ -323,10 +324,12 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
             this.rememberMe = rememberMe;
         }
 
+        @Deprecated
         public int getStarted() {
             return started;
         }
 
+        @Deprecated
         public void setStarted(int started) {
             this.started = started;
         }
diff --git a/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java b/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java
index ced1768..508b817 100644
--- a/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java
+++ b/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java
@@ -23,6 +23,7 @@ package org.keycloak.models.session;
 public class PersistentUserSessionModel {
 
     private String userSessionId;
+    private int started;
     private int lastSessionRefresh;
     private boolean offline;
     private String data;
@@ -35,6 +36,14 @@ public class PersistentUserSessionModel {
         this.userSessionId = userSessionId;
     }
 
+    public int getStarted() {
+        return started;
+    }
+
+    public void setStarted(int started) {
+        this.started = started;
+    }
+
     public int getLastSessionRefresh() {
         return lastSessionRefresh;
     }
diff --git a/server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java b/server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java
index ba5a595..11c7567 100644
--- a/server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java
+++ b/server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java
@@ -24,6 +24,7 @@ import org.keycloak.models.UserModel;
 import org.keycloak.models.UserSessionModel;
 import org.keycloak.provider.Provider;
 
+import java.util.Collection;
 import java.util.List;
 
 /**
@@ -37,8 +38,6 @@ public interface UserSessionPersisterProvider extends Provider {
     // Assuming that corresponding userSession is already persisted
     void createClientSession(AuthenticatedClientSessionModel clientSession, boolean offline);
 
-    void updateUserSession(UserSessionModel userSession, boolean offline);
-
     // Called during logout (for online session) or during periodic expiration. It will remove all corresponding clientSessions too
     void removeUserSession(String userSessionId, boolean offline);
 
@@ -49,14 +48,14 @@ public interface UserSessionPersisterProvider extends Provider {
     void onClientRemoved(RealmModel realm, ClientModel client);
     void onUserRemoved(RealmModel realm, UserModel user);
 
-    // Called at startup to remove userSessions without any clientSession
-    void clearDetachedUserSessions();
+    // Bulk update of lastSessionRefresh of all specified userSessions to the given value.
+    void updateLastSessionRefreshes(RealmModel realm, int lastSessionRefresh, Collection<String> userSessionIds, boolean offline);
 
-    // Update "lastSessionRefresh" of all userSessions and "timestamp" of all clientSessions to specified time
-    void updateAllTimestamps(int time);
+    // Remove userSessions and clientSessions, which are expired
+    void removeExpired(RealmModel realm);
 
     // Called during startup. For each userSession, it loads also clientSessions
-    List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline);
+    List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline, int lastCreatedOn, String lastUserSessionId);
 
     int getUserSessionsCount(boolean offline);
 
diff --git a/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java b/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java
index fc3c584..1311635 100755
--- a/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java
+++ b/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java
@@ -309,7 +309,7 @@ public class TokenManager {
             if (clientSession.getCurrentRefreshToken() != null &&
                     !refreshToken.getId().equals(clientSession.getCurrentRefreshToken()) &&
                     refreshToken.getIssuedAt() < clientSession.getTimestamp() &&
-                    clusterStartupTime != clientSession.getTimestamp()) {
+                    clusterStartupTime <= clientSession.getTimestamp()) {
                 throw new OAuthErrorException(OAuthErrorException.INVALID_GRANT, "Stale token");
             }
 
diff --git a/services/src/main/java/org/keycloak/services/scheduled/ClearExpiredUserSessions.java b/services/src/main/java/org/keycloak/services/scheduled/ClearExpiredUserSessions.java
index e61969a..4ebd599 100755
--- a/services/src/main/java/org/keycloak/services/scheduled/ClearExpiredUserSessions.java
+++ b/services/src/main/java/org/keycloak/services/scheduled/ClearExpiredUserSessions.java
@@ -20,6 +20,7 @@ package org.keycloak.services.scheduled;
 import org.keycloak.models.KeycloakSession;
 import org.keycloak.models.RealmModel;
 import org.keycloak.models.UserSessionProvider;
+import org.keycloak.models.session.UserSessionPersisterProvider;
 import org.keycloak.timer.ScheduledTask;
 
 /**
@@ -35,6 +36,7 @@ public class ClearExpiredUserSessions implements ScheduledTask {
         for (RealmModel realm : session.realms().getRealms()) {
             sessions.removeExpired(realm);
             session.authenticationSessions().removeExpired(realm);
+            session.getProvider(UserSessionPersisterProvider.class).removeExpired(realm);
         }
     }
 
diff --git a/testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java b/testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java
index fca6e02..7e93627 100644
--- a/testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java
+++ b/testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java
@@ -44,7 +44,7 @@ import org.keycloak.models.UserCredentialModel;
 import org.keycloak.models.UserModel;
 import org.keycloak.models.UserProvider;
 import org.keycloak.models.UserSessionModel;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStoreFactory;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
 import org.keycloak.models.utils.ModelToRepresentation;
 import org.keycloak.protocol.oidc.OIDCLoginProtocol;
 import org.keycloak.protocol.oidc.mappers.AudienceProtocolMapper;
@@ -695,8 +695,8 @@ public class TestingResourceProvider implements RealmResourceProvider {
     @Produces(MediaType.APPLICATION_JSON)
     public Response suspendPeriodicTasks() {
         suspendTask(ClearExpiredUserSessions.TASK_NAME);
-        suspendTask(LastSessionRefreshStoreFactory.LSR_PERIODIC_TASK_NAME);
-        suspendTask(LastSessionRefreshStoreFactory.LSR_OFFLINE_PERIODIC_TASK_NAME);
+        suspendTask(CrossDCLastSessionRefreshStoreFactory.LSR_PERIODIC_TASK_NAME);
+        suspendTask(CrossDCLastSessionRefreshStoreFactory.LSR_OFFLINE_PERIODIC_TASK_NAME);
 
         return Response.noContent().build();
     }
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java
index f10ca46..51eb60c 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java
@@ -31,8 +31,8 @@ import org.keycloak.common.util.Time;
 import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
 import org.keycloak.models.KeycloakSession;
 import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStore;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStoreFactory;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
 import org.keycloak.models.sessions.infinispan.changes.sessions.SessionData;
 import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
 import org.keycloak.representations.idm.RealmRepresentation;
@@ -69,7 +69,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
 
         @Override
         public void run(KeycloakSession session) {
-            LastSessionRefreshStore customStore = createStoreInstance(session, 1000000, 1000);
+            CrossDCLastSessionRefreshStore customStore = createStoreInstance(session, 1000000, 1000);
             System.out.println("sss");
 
             int lastSessionRefresh = Time.currentTime();
@@ -113,7 +113,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
         @Override
         public void run(KeycloakSession session) {
             // Long timer interval. No message due the timer wasn't executed
-            LastSessionRefreshStore customStore1 = createStoreInstance(session, 100000, 10);
+            CrossDCLastSessionRefreshStore customStore1 = createStoreInstance(session, 100000, 10);
             Time.setOffset(100);
 
             try {
@@ -124,7 +124,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
             Assert.assertEquals(0, counter.get());
 
             // Short timer interval 10 ms. 1 message due the interval is executed and lastRun was in the past due to Time.setOffset
-            LastSessionRefreshStore customStore2 = createStoreInstance(session, 10, 10);
+            CrossDCLastSessionRefreshStore customStore2 = createStoreInstance(session, 10, 10);
             Time.setOffset(200);
 
             Retry.execute(() -> {
@@ -152,12 +152,12 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
 
         AtomicInteger counter = new AtomicInteger();
 
-        LastSessionRefreshStore createStoreInstance(KeycloakSession session, long timerIntervalMs, int maxIntervalBetweenMessagesSeconds) {
-            LastSessionRefreshStoreFactory factory = new LastSessionRefreshStoreFactory() {
+        CrossDCLastSessionRefreshStore createStoreInstance(KeycloakSession session, long timerIntervalMs, int maxIntervalBetweenMessagesSeconds) {
+            CrossDCLastSessionRefreshStoreFactory factory = new CrossDCLastSessionRefreshStoreFactory() {
 
                 @Override
-                protected LastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
-                    return new LastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey) {
+                protected CrossDCLastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
+                    return new CrossDCLastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey) {
 
                         @Override
                         protected void sendMessage(KeycloakSession kcSession, Map<String, SessionData> refreshesToSend) {
diff --git a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java
index 172dd3f..a95e27f 100644
--- a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java
+++ b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java
@@ -97,9 +97,9 @@ public class UserSessionInitializerTest {
         List<UserSessionModel> loadedSessions = session.sessions().getOfflineUserSessions(realm, testApp, 0, 10);
         UserSessionProviderTest.assertSessions(loadedSessions, origSessions);
 
-        UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, serverStartTime, "test-app", "third-party");
-        UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[1].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.2", started, serverStartTime, "test-app");
-        UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[2].getId(), session.users().getUserByUsername("user2", realm), "127.0.0.3", started, serverStartTime, "test-app");
+        UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, started, "test-app", "third-party");
+        UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[1].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started, "test-app");
+        UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[2].getId(), session.users().getUserByUsername("user2", realm), "127.0.0.3", started, started, "test-app");
     }
 
 
@@ -130,7 +130,7 @@ public class UserSessionInitializerTest {
         List<UserSessionModel> loadedSessions = session.sessions().getOfflineUserSessions(realm, thirdparty, 0, 10);
         Assert.assertEquals(1, loadedSessions.size());
 
-        UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, serverStartTime, "third-party");
+        UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, started, "third-party");
 
         // Revert client
         realm.addClient("test-app");
diff --git a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java
index 8a3ffa9..7c10701 100644
--- a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java
+++ b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java
@@ -22,6 +22,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.keycloak.cluster.ClusterProvider;
 import org.keycloak.common.util.Time;
 import org.keycloak.models.AuthenticatedClientSessionModel;
 import org.keycloak.models.ClientModel;
@@ -37,8 +38,12 @@ import org.keycloak.models.UserManager;
 import org.keycloak.testsuite.rule.KeycloakRule;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
  */
@@ -112,51 +117,6 @@ public class UserSessionPersisterProviderTest {
         assertSessionLoaded(loadedSessions, origSessions[2].getId(), session.users().getUserByUsername("user2", realm), "127.0.0.3", started, started, "test-app");
     }
 
-    @Test
-    public void testUpdateTimestamps() {
-        // Create some sessions in infinispan
-        int started = Time.currentTime();
-        UserSessionModel[] origSessions = createSessions();
-
-        resetSession();
-
-        // Persist 3 created userSessions and clientSessions as offline
-        ClientModel testApp = realm.getClientByClientId("test-app");
-        List<UserSessionModel> userSessions = session.sessions().getUserSessions(realm, testApp);
-        for (UserSessionModel userSession : userSessions) {
-            persistUserSession(userSession, true);
-        }
-
-        // Persist 1 online session
-        UserSessionModel userSession = session.sessions().getUserSession(realm, origSessions[0].getId());
-        persistUserSession(userSession, false);
-
-        resetSession();
-
-        // update timestamps
-        int newTime = started + 50;
-        persister.updateAllTimestamps(newTime);
-
-        // Assert online session
-        List<UserSessionModel> loadedSessions = loadPersistedSessionsPaginated(false, 1, 1, 1);
-        Assert.assertEquals(2, assertTimestampsUpdated(loadedSessions, newTime));
-
-        // Assert offline sessions
-        loadedSessions = loadPersistedSessionsPaginated(true, 2, 2, 3);
-        Assert.assertEquals(4, assertTimestampsUpdated(loadedSessions, newTime));
-    }
-
-    private int assertTimestampsUpdated(List<UserSessionModel> loadedSessions, int expectedTime) {
-        int clientSessionsCount = 0;
-        for (UserSessionModel loadedSession : loadedSessions) {
-            Assert.assertEquals(expectedTime, loadedSession.getLastSessionRefresh());
-            for (AuthenticatedClientSessionModel clientSession : loadedSession.getAuthenticatedClientSessions().values()) {
-                Assert.assertEquals(expectedTime, clientSession.getTimestamp());
-                clientSessionsCount++;
-            }
-        }
-        return clientSessionsCount;
-    }
 
     @Test
     public void testUpdateAndRemove() {
@@ -177,48 +137,30 @@ public class UserSessionPersisterProviderTest {
         UserSessionModel persistedSession = loadedSessions.get(0);
         UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started, "test-app");
 
-        // Update userSession
-        Time.setOffset(10);
-        try {
-            persistedSession.setLastSessionRefresh(Time.currentTime());
-            persistedSession.setNote("foo", "bar");
-            persistedSession.setState(UserSessionModel.State.LOGGED_IN);
-            persister.updateUserSession(persistedSession, true);
-
-            // create new clientSession
-            AuthenticatedClientSessionModel clientSession = createClientSession(realm.getClientByClientId("third-party"), session.sessions().getUserSession(realm, persistedSession.getId()),
-                    "http://redirect", "state");
-            persister.createClientSession(clientSession, true);
+        // create new clientSession
+        AuthenticatedClientSessionModel clientSession = createClientSession(realm.getClientByClientId("third-party"), session.sessions().getUserSession(realm, persistedSession.getId()),
+                "http://redirect", "state");
+        persister.createClientSession(clientSession, true);
 
-            resetSession();
-
-            // Assert session updated
-            loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
-            persistedSession = loadedSessions.get(0);
-            UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started+10, "test-app", "third-party");
-            Assert.assertEquals("bar", persistedSession.getNote("foo"));
-            Assert.assertEquals(UserSessionModel.State.LOGGED_IN, persistedSession.getState());
+        resetSession();
 
-            // Remove clientSession
-            persister.removeClientSession(userSession.getId(), realm.getClientByClientId("third-party").getId(), true);
+        // Remove clientSession
+        persister.removeClientSession(userSession.getId(), realm.getClientByClientId("third-party").getId(), true);
 
-            resetSession();
+        resetSession();
 
-            // Assert clientSession removed
-            loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
-            persistedSession = loadedSessions.get(0);
-            UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started + 10, "test-app");
+        // Assert clientSession removed
+        loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
+        persistedSession = loadedSessions.get(0);
+        UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started , "test-app");
 
-            // Remove userSession
-            persister.removeUserSession(persistedSession.getId(), true);
+        // Remove userSession
+        persister.removeUserSession(persistedSession.getId(), true);
 
-            resetSession();
+        resetSession();
 
-            // Assert nothing found
-            loadPersistedSessionsPaginated(true, 10, 0, 0);
-        } finally {
-            Time.setOffset(0);
-        }
+        // Assert nothing found
+        loadPersistedSessionsPaginated(true, 10, 0, 0);
     }
 
     @Test
@@ -302,8 +244,8 @@ public class UserSessionPersisterProviderTest {
 
         resetSession();
 
-        // Assert nothing loaded - userSession was removed as well because it was last userSession
-        loadPersistedSessionsPaginated(true, 10, 0, 0);
+        // Assert loading still works - last userSession is still there, but no clientSession on it
+        loadPersistedSessionsPaginated(true, 10, 1, 1);
 
         // Cleanup
         realmMgr = new RealmManager(session);
@@ -340,23 +282,108 @@ public class UserSessionPersisterProviderTest {
         UserSessionModel persistedSession = loadedSessions.get(0);
         UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user2", realm), "127.0.0.3", started, started, "test-app");
 
-        // KEYCLOAK-2431 Assert that userSessionPersister is resistent even to situation, when users are deleted "directly"
+        // KEYCLOAK-2431 Assert that userSessionPersister is resistent even to situation, when users are deleted "directly".
+        // No exception will happen. However session will be still there
         UserModel user2 = session.users().getUserByUsername("user2", realm);
         session.users().removeUser(realm, user2);
 
-        loadedSessions = loadPersistedSessionsPaginated(true, 10, 0, 0);
+        loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
 
+        // Cleanup
+        UserSessionModel userSession = loadedSessions.get(0);
+        session.sessions().removeUserSession(realm, userSession);
+        persister.removeUserSession(userSession.getId(), userSession.isOffline());
     }
 
     // KEYCLOAK-1999
     @Test
     public void testNoSessions() {
         UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
-        List<UserSessionModel> sessions = persister.loadUserSessions(0, 1, true);
+        List<UserSessionModel> sessions = persister.loadUserSessions(0, 1, true, 0, "abc");
         Assert.assertEquals(0, sessions.size());
     }
 
 
+    @Test
+    public void testMoreSessions() {
+        // Create 10 userSessions - each having 1 clientSession
+        List<UserSessionModel> userSessions = new ArrayList<>();
+        UserModel user = session.users().getUserByUsername("user1", realm);
+
+        for (int i=0 ; i<20 ; i++) {
+            // Having different offsets for each session (to ensure that lastSessionRefresh is also different)
+            Time.setOffset(i);
+
+            UserSessionModel userSession = session.sessions().createUserSession(realm, user, "user1", "127.0.0.1", "form", true, null, null);
+            createClientSession(realm.getClientByClientId("test-app"), userSession, "http://redirect", "state");
+            userSessions.add(userSession);
+        }
+
+        resetSession();
+
+        for (UserSessionModel userSession : userSessions) {
+            UserSessionModel userSession2 = session.sessions().getUserSession(realm, userSession.getId());
+            persistUserSession(userSession2, true);
+        }
+
+        resetSession();
+
+        List<UserSessionModel> loadedSessions = loadPersistedSessionsPaginated(true, 2, 10, 20);
+        user = session.users().getUserByUsername("user1", realm);
+        ClientModel testApp = realm.getClientByClientId("test-app");
+
+        for (UserSessionModel loadedSession : loadedSessions) {
+            assertEquals(user.getId(), loadedSession.getUser().getId());
+            assertEquals("127.0.0.1", loadedSession.getIpAddress());
+            assertEquals(user.getUsername(), loadedSession.getLoginUsername());
+
+            assertEquals(1, loadedSession.getAuthenticatedClientSessions().size());
+            assertTrue(loadedSession.getAuthenticatedClientSessions().containsKey(testApp.getId()));
+        }
+    }
+
+
+    @Test
+    public void testExpiredSessions() {
+        // Create some sessions in infinispan
+        int started = Time.currentTime();
+        UserSessionModel[] origSessions = createSessions();
+
+        resetSession();
+
+        // Persist 2 offline sessions of 2 users
+        UserSessionModel userSession1 = session.sessions().getUserSession(realm, origSessions[1].getId());
+        UserSessionModel userSession2 = session.sessions().getUserSession(realm, origSessions[2].getId());
+        persistUserSession(userSession1, true);
+        persistUserSession(userSession2, true);
+
+        resetSession();
+
+        // Update one of the sessions with lastSessionRefresh of 20 days ahead
+        int lastSessionRefresh = Time.currentTime() + 1728000;
+        persister.updateLastSessionRefreshes(realm, lastSessionRefresh, Collections.singleton(userSession1.getId()), true);
+
+        resetSession();
+
+        // Increase time offset - 40 days
+        Time.setOffset(3456000);
+        try {
+            // Run expiration thread
+            persister.removeExpired(realm);
+
+            // Test the updated session is still in persister. Not updated session is not there anymore
+            List<UserSessionModel> loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
+            UserSessionModel persistedSession = loadedSessions.get(0);
+            UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, lastSessionRefresh, "test-app");
+
+        } finally {
+            // Cleanup
+            Time.setOffset(0);
+        }
+
+    }
+
+
     private AuthenticatedClientSessionModel createClientSession(ClientModel client, UserSessionModel userSession, String redirect, String state) {
         AuthenticatedClientSessionModel clientSession = session.sessions().createClientSession(realm, client, userSession);
         clientSession.setRedirectUri(redirect);
@@ -407,19 +434,32 @@ public class UserSessionPersisterProviderTest {
     private List<UserSessionModel> loadPersistedSessionsPaginated(boolean offline, int sessionsPerPage, int expectedPageCount, int expectedSessionsCount) {
         int count = persister.getUserSessionsCount(offline);
 
-        int start = 0;
+
         int pageCount = 0;
         boolean next = true;
         List<UserSessionModel> result = new ArrayList<>();
-        while (next && start < count) {
-            List<UserSessionModel> sess = persister.loadUserSessions(start, sessionsPerPage, offline);
-            if (sess.size() == 0) {
+        int lastCreatedOn = 0;
+        String lastSessionId = "abc";
+
+        while (next) {
+            List<UserSessionModel> sess = persister.loadUserSessions(0, sessionsPerPage, offline, lastCreatedOn, lastSessionId);
+
+            if (sess.size() < sessionsPerPage) {
                 next = false;
+
+                // We had at least some session
+                if (sess.size() > 0) {
+                    pageCount++;
+                }
             } else {
                 pageCount++;
-                start += sess.size();
-                result.addAll(sess);
+
+                UserSessionModel lastSession = sess.get(sess.size() - 1);
+                lastCreatedOn = lastSession.getStarted();
+                lastSessionId = lastSession.getId();
             }
+
+            result.addAll(sess);
         }
 
         Assert.assertEquals(pageCount, expectedPageCount);
diff --git a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java
index 4f3ca20..381de75 100644
--- a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java
+++ b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java
@@ -363,24 +363,45 @@ public class UserSessionProviderOfflineTest {
         // sessions are in persister too
         Assert.assertEquals(3, persister.getUserSessionsCount(true));
 
-        // Set lastSessionRefresh to session[0] to 0
-        session0.setLastSessionRefresh(0);
+        // Increase timeOffset - 5 minutes
+        Time.setOffset(300);
+        try {
 
-        resetSession();
+            // Update lastSessionRefresh of session0. This will update lastSessionRefresh of all the sessions to DB as they were not yet updated to DB
+            session0.setLastSessionRefresh(Time.currentTime());
 
-        session.sessions().removeExpired(realm);
+            resetSession();
 
-        resetSession();
+            // Increase timeOffset - 20 days
+            Time.setOffset(1728000);
 
-        // assert session0 not found now
-        Assert.assertNull(session.sessions().getOfflineUserSession(realm, origSessions[0].getId()));
+            session0 = session.sessions().getOfflineUserSession(realm, origSessions[0].getId());
+            session0.setLastSessionRefresh(Time.currentTime());
 
-        Assert.assertEquals(2, persister.getUserSessionsCount(true));
+            resetSession();
+
+            // Increase timeOffset - 40 days
+            Time.setOffset(3456000);
+
+            // Expire and ensure that all sessions despite session0 were removed
+
+            session.sessions().removeExpired(realm);
+            persister.removeExpired(realm);
+
+            resetSession();
+
+            // assert session0 is the only session found
+            Assert.assertNotNull(session.sessions().getOfflineUserSession(realm, origSessions[0].getId()));
+            Assert.assertNull(session.sessions().getOfflineUserSession(realm, origSessions[1].getId()));
+            Assert.assertNull(session.sessions().getOfflineUserSession(realm, origSessions[2].getId()));
+
+            Assert.assertEquals(1, persister.getUserSessionsCount(true));
+
+            // Expire everything and assert nothing found
+            Time.setOffset(6000000);
 
-        // Expire everything and assert nothing found
-        Time.setOffset(3000000);
-        try {
             session.sessions().removeExpired(realm);
+            persister.removeExpired(realm);
 
             resetSession();
 
diff --git a/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/LoadPersistentSessionsCommand.java b/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/LoadPersistentSessionsCommand.java
index 0c1a506..796e3c6 100644
--- a/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/LoadPersistentSessionsCommand.java
+++ b/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/LoadPersistentSessionsCommand.java
@@ -17,9 +17,17 @@
 
 package org.keycloak.testsuite.util.cli;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.keycloak.models.KeycloakSession;
-import org.keycloak.models.UserSessionProvider;
-import org.keycloak.models.UserSessionProviderFactory;
+import org.keycloak.models.KeycloakSessionFactory;
+import org.keycloak.models.UserSessionModel;
+import org.keycloak.models.session.UserSessionPersisterProvider;
+import org.keycloak.models.utils.KeycloakModelUtils;
 
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -33,15 +41,97 @@ public class LoadPersistentSessionsCommand extends AbstractCommand {
 
     @Override
     protected void doRunCommand(KeycloakSession session) {
-        int sessionsPerSegment = getIntArg(0);
-        UserSessionProviderFactory sessionProviderFactory = (UserSessionProviderFactory) sessionFactory.getProviderFactory(UserSessionProvider.class);
-        sessionProviderFactory.loadPersistentSessions(sessionFactory, 10, sessionsPerSegment);
+        final int workersCount = getIntArg(0);
+        final int limit = getIntArg(1);
+        //int workersCount = 8;
+        //int limit = 64;
+
+        AtomicInteger lastCreatedOn = new AtomicInteger(0);
+        AtomicReference<String> lastSessionId = new AtomicReference<>("abc");
+
+        AtomicBoolean finished = new AtomicBoolean(false);
+        int i=0;
+
+        while (!finished.get()) {
+            if (i % 16 == 0) {
+                log.infof("Starting iteration: %s . lastCreatedOn: %d, lastSessionId: %s", i, lastCreatedOn.get(), lastSessionId.get());
+            }
+
+            i = i + workersCount;
+            List<Thread> workers = new LinkedList<>();
+            MyWorker lastWorker = null;
+
+            for (int workerId = 0 ; workerId < workersCount ; workerId++) {
+                lastWorker = new MyWorker(workerId, lastCreatedOn.get(), lastSessionId.get(), limit, sessionFactory);
+                Thread worker = new Thread(lastWorker);
+                workers.add(worker);
+            }
+
+            for (Thread worker : workers) {
+                worker.start();
+            }
+            for (Thread worker : workers) {
+                try {
+                    worker.join();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            List<UserSessionModel> lastWorkerSessions = lastWorker.getLoadedSessions();
+
+            if (lastWorkerSessions.size() < limit) {
+                finished.set(true);
+            } else {
+                UserSessionModel lastSession = lastWorkerSessions.get(lastWorkerSessions.size() - 1);
+                lastCreatedOn.set(lastSession.getStarted());
+                lastSessionId.set(lastSession.getId());
+            }
+
+
+        }
 
         log.info("All persistent sessions loaded successfully");
     }
 
     @Override
     public String printUsage() {
-        return super.printUsage() + " <sessions-count-per-segment>";
+        return super.printUsage() + " <workers-count (for example 8)> <limit (for example 64)>";
+    }
+
+
+    private class MyWorker implements Runnable {
+
+        private final int workerId;
+        private final int lastCreatedOn;
+        private final String lastSessionId;
+        private final int limit;
+        private final KeycloakSessionFactory sessionFactory;
+
+        private List<UserSessionModel> loadedSessions = new LinkedList<>();
+
+        public MyWorker(int workerId, int lastCreatedOn, String lastSessionId, int limit, KeycloakSessionFactory sessionFactory) {
+            this.workerId = workerId;
+            this.lastCreatedOn = lastCreatedOn;
+            this.lastSessionId = lastSessionId;
+            this.limit = limit;
+            this.sessionFactory = sessionFactory;
+        }
+
+        @Override
+        public void run() {
+            KeycloakModelUtils.runJobInTransaction(sessionFactory, (keycloakSession) -> {
+                int offset = workerId * limit;
+
+                UserSessionPersisterProvider persister = keycloakSession.getProvider(UserSessionPersisterProvider.class);
+                loadedSessions = persister.loadUserSessions(offset, limit, true, lastCreatedOn, lastSessionId);
+
+            });
+        }
+
+
+        private List<UserSessionModel> getLoadedSessions() {
+            return loadedSessions;
+        }
     }
 }
diff --git a/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/PersistSessionsCommand.java b/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/PersistSessionsCommand.java
index 1e4cb48..6b8dd25 100644
--- a/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/PersistSessionsCommand.java
+++ b/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/PersistSessionsCommand.java
@@ -19,6 +19,7 @@ package org.keycloak.testsuite.util.cli;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.keycloak.models.AuthenticatedClientSessionModel;
 import org.keycloak.models.ClientModel;
@@ -36,6 +37,8 @@ import org.keycloak.models.utils.KeycloakModelUtils;
  */
 public class PersistSessionsCommand extends AbstractCommand {
 
+    private AtomicInteger userCounter = new AtomicInteger();
+
     @Override
     public String getName() {
         return "persistSessions";
@@ -75,12 +78,18 @@ public class PersistSessionsCommand extends AbstractCommand {
             @Override
             public void run(KeycloakSession session) {
                 RealmModel realm = session.realms().getRealmByName("master");
-                UserModel john = session.users().getUserByUsername("admin", realm);
+
                 ClientModel testApp = realm.getClientByClientId("security-admin-console");
                 UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
 
                 for (int i = 0; i < countInThisBatch; i++) {
-                    UserSessionModel userSession = session.sessions().createUserSession(realm, john, "john-doh@localhost", "127.0.0.2", "form", true, null, null);
+                    String username = "john-" + userCounter.incrementAndGet();
+                    UserModel john = session.users().getUserByUsername(username, realm);
+                    if (john == null) {
+                        john = session.users().addUser(realm, username);
+                    }
+
+                    UserSessionModel userSession = session.sessions().createUserSession(realm, john, username, "127.0.0.2", "form", true, null, null);
                     AuthenticatedClientSessionModel clientSession = session.sessions().createClientSession(realm, testApp, userSession);
                     clientSession.setRedirectUri("http://redirect");
                     clientSession.setNote("foo", "bar-" + i);