keycloak-aplcache
Changes
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java 5(+2 -3)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/AbstractLastSessionRefreshStore.java 32(+10 -22)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/AbstractLastSessionRefreshStoreFactory.java 34(+2 -32)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshChecker.java 17(+8 -9)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshListener.java 6(+3 -3)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStore.java 58(+58 -0)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStoreFactory.java 63(+63 -0)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStore.java 77(+77 -0)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStoreFactory.java 49(+49 -0)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java 192(+153 -39)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java 21(+14 -7)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java 73(+31 -42)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentLoaderContext.java 22(+9 -13)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java 69(+47 -22)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerContext.java 42(+42 -0)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerResult.java 44(+44 -0)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java 24(+13 -11)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java 108(+101 -7)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java 29(+21 -8)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java 25(+8 -17)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java 16(+14 -2)
model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java 3(+2 -1)
model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ConcurrencyLockingTest.java 9(+8 -1)
model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java 248(+111 -137)
model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java 11(+5 -6)
model/jpa/src/main/java/org/keycloak/connections/jpa/updater/liquibase/custom/JpaUpdate4_7_0_OfflineSessionsTimestamps.java 54(+54 -0)
model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java 138(+68 -70)
server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java 12(+4 -8)
server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java 7(+5 -2)
server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java 9(+9 -0)
server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java 13(+6 -7)
testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java 6(+3 -3)
testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java 18(+9 -9)
testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java 8(+4 -4)
testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java 222(+131 -91)
testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java 43(+32 -11)
Details
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java
index 6551bf8..a76639d 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java
@@ -20,7 +20,6 @@ package org.keycloak.models.sessions.infinispan;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
@@ -33,7 +32,7 @@ import org.keycloak.models.sessions.infinispan.changes.ClientSessionUpdateTask;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.changes.Tasks;
import org.keycloak.models.sessions.infinispan.changes.UserSessionUpdateTask;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshChecker;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshChecker;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import java.util.UUID;
@@ -150,7 +149,7 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
@Override
public CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<AuthenticatedClientSessionEntity> sessionWrapper) {
- return new LastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
+ return new CrossDCLastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
.shouldSaveClientSessionToRemoteCache(kcSession, client.getRealm(), sessionWrapper, userSession, offline, timestamp);
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStore.java
new file mode 100644
index 0000000..81669d9
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStore.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.changes.sessions;
+
+import java.util.Map;
+
+import org.jboss.logging.Logger;
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.models.KeycloakSession;
+
+/**
+ * Cross-DC based CrossDCLastSessionRefreshStore
+ *
+ * Tracks the queue of lastSessionRefreshes, which were updated on this host. Those will be sent to the second DC in bulk, so second DC can update
+ * lastSessionRefreshes on it's side. Message is sent either periodically or if there are lots of stored lastSessionRefreshes.
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class CrossDCLastSessionRefreshStore extends AbstractLastSessionRefreshStore {
+
+ protected static final Logger logger = Logger.getLogger(CrossDCLastSessionRefreshStore.class);
+
+ private final String eventKey;
+
+ protected CrossDCLastSessionRefreshStore(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
+ super(maxIntervalBetweenMessagesSeconds, maxCount);
+ this.eventKey = eventKey;
+ }
+
+
+ protected void sendMessage(KeycloakSession kcSession, Map<String, SessionData> refreshesToSend) {
+ LastSessionRefreshEvent event = new LastSessionRefreshEvent(refreshesToSend);
+
+ if (logger.isDebugEnabled()) {
+ logger.debugf("Sending lastSessionRefreshes for key '%s'. Refreshes: %s", eventKey, event.getLastSessionRefreshes().toString());
+ }
+
+ // Don't notify local DC about the lastSessionRefreshes. They were processed here already
+ ClusterProvider cluster = kcSession.getProvider(ClusterProvider.class);
+ cluster.notify(eventKey, event, true, ClusterProvider.DCNotify.ALL_BUT_LOCAL_DC);
+ }
+
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStoreFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStoreFactory.java
new file mode 100644
index 0000000..b0d5702
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStoreFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.changes.sessions;
+
+import org.infinispan.Cache;
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
+import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class CrossDCLastSessionRefreshStoreFactory extends AbstractLastSessionRefreshStoreFactory {
+
+ // Name of periodic tasks to send events to the other DCs
+ public static final String LSR_PERIODIC_TASK_NAME = "lastSessionRefreshes";
+ public static final String LSR_OFFLINE_PERIODIC_TASK_NAME = "lastSessionRefreshes-offline";
+
+
+ public CrossDCLastSessionRefreshStore createAndInit(KeycloakSession kcSession, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, boolean offline) {
+ return createAndInit(kcSession, cache, DEFAULT_TIMER_INTERVAL_MS, DEFAULT_MAX_INTERVAL_BETWEEN_MESSAGES_SECONDS, DEFAULT_MAX_COUNT, offline);
+ }
+
+
+ public CrossDCLastSessionRefreshStore createAndInit(KeycloakSession kcSession, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache,
+ long timerIntervalMs, int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
+ String eventKey = offline ? LSR_OFFLINE_PERIODIC_TASK_NAME : LSR_PERIODIC_TASK_NAME;
+ CrossDCLastSessionRefreshStore store = createStoreInstance(maxIntervalBetweenMessagesSeconds, maxCount, eventKey);
+
+ // Register listener
+ ClusterProvider cluster = kcSession.getProvider(ClusterProvider.class);
+ cluster.registerListener(eventKey, new CrossDCLastSessionRefreshListener(kcSession, cache, offline));
+
+ // Setup periodic timer check
+ setupPeriodicTimer(kcSession, store, timerIntervalMs, eventKey);
+
+ return store;
+ }
+
+
+ protected CrossDCLastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
+ return new CrossDCLastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey);
+ }
+
+
+
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStore.java
new file mode 100644
index 0000000..cecc417
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStore.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.changes.sessions;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.jboss.logging.Logger;
+import org.keycloak.common.util.Time;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.RealmModel;
+import org.keycloak.models.session.UserSessionPersisterProvider;
+import org.keycloak.models.utils.SessionTimeoutHelper;
+
+/**
+ * The store is supposed to do periodic bulk update of lastSessionRefresh times of all userSessions, which were refreshed during some period
+ * of time. The updates are sent to UserSessionPersisterProvider (DB)
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class PersisterLastSessionRefreshStore extends AbstractLastSessionRefreshStore {
+
+ protected static final Logger logger = Logger.getLogger(PersisterLastSessionRefreshStore.class);
+
+ private final boolean offline;
+
+ protected PersisterLastSessionRefreshStore(int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
+ super(maxIntervalBetweenMessagesSeconds, maxCount);
+ this.offline = offline;
+ }
+
+
+ protected void sendMessage(KeycloakSession kcSession, Map<String, SessionData> refreshesToSend) {
+ Map<String, Set<String>> sessionIdsByRealm =
+ refreshesToSend.entrySet().stream().collect(
+ Collectors.groupingBy(entry -> entry.getValue().getRealmId(),
+ Collectors.mapping(Map.Entry::getKey, Collectors.toSet())));
+
+ // Update DB with a bit lower value than current time to ensure 'revokeRefreshToken' will work correctly taking server
+ int lastSessionRefresh = Time.currentTime() - SessionTimeoutHelper.PERIODIC_TASK_INTERVAL_SECONDS;
+
+ if (logger.isDebugEnabled()) {
+ logger.debugf("Updating %d userSessions with lastSessionRefresh: %d", refreshesToSend.size(), lastSessionRefresh);
+ }
+
+ UserSessionPersisterProvider persister = kcSession.getProvider(UserSessionPersisterProvider.class);
+
+ for (Map.Entry<String, Set<String>> entry : sessionIdsByRealm.entrySet()) {
+ RealmModel realm = kcSession.realms().getRealm(entry.getKey());
+
+ // Case when realm was deleted in the meantime. UserSessions were already deleted as well (callback for realm deletion)
+ if (realm == null) {
+ continue;
+ }
+
+ Set<String> userSessionIds = entry.getValue();
+
+ persister.updateLastSessionRefreshes(realm, lastSessionRefresh, userSessionIds, offline);
+ }
+ }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStoreFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStoreFactory.java
new file mode 100644
index 0000000..d720ef8
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStoreFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.changes.sessions;
+
+import org.infinispan.Cache;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
+import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class PersisterLastSessionRefreshStoreFactory extends AbstractLastSessionRefreshStoreFactory {
+
+ public PersisterLastSessionRefreshStore createAndInit(KeycloakSession kcSession, boolean offline) {
+ return createAndInit(kcSession, DEFAULT_TIMER_INTERVAL_MS, DEFAULT_MAX_INTERVAL_BETWEEN_MESSAGES_SECONDS, DEFAULT_MAX_COUNT, offline);
+ }
+
+
+ private PersisterLastSessionRefreshStore createAndInit(KeycloakSession kcSession,
+ long timerIntervalMs, int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
+ PersisterLastSessionRefreshStore store = createStoreInstance(maxIntervalBetweenMessagesSeconds, maxCount, offline);
+
+ // Setup periodic timer check
+ setupPeriodicTimer(kcSession, store, timerIntervalMs, "db-last-session-refresh");
+
+ return store;
+ }
+
+
+ protected PersisterLastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
+ return new PersisterLastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, offline);
+ }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
index bf7b04f..b1429a6 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
@@ -19,10 +19,13 @@ package org.keycloak.models.sessions.infinispan;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.context.Flag;
import org.infinispan.stream.CacheCollectors;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.common.util.ObjectUtil;
+import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
@@ -35,7 +38,8 @@ import org.keycloak.models.UserSessionModel;
import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.sessions.infinispan.changes.Tasks;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStore;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
+import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
@@ -61,6 +65,8 @@ import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
import org.keycloak.models.utils.SessionTimeoutHelper;
import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -69,11 +75,11 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
-import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -100,15 +106,18 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
protected final SessionEventsSenderTransaction clusterEventsSenderTx;
- protected final LastSessionRefreshStore lastSessionRefreshStore;
- protected final LastSessionRefreshStore offlineLastSessionRefreshStore;
+ protected final CrossDCLastSessionRefreshStore lastSessionRefreshStore;
+ protected final CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore;
+ protected final PersisterLastSessionRefreshStore persisterLastSessionRefreshStore;
+ protected final RemoteCacheInvoker remoteCacheInvoker;
protected final InfinispanKeyGenerator keyGenerator;
public InfinispanUserSessionProvider(KeycloakSession session,
RemoteCacheInvoker remoteCacheInvoker,
- LastSessionRefreshStore lastSessionRefreshStore,
- LastSessionRefreshStore offlineLastSessionRefreshStore,
+ CrossDCLastSessionRefreshStore lastSessionRefreshStore,
+ CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore,
+ PersisterLastSessionRefreshStore persisterLastSessionRefreshStore,
InfinispanKeyGenerator keyGenerator,
Cache<String, SessionEntityWrapper<UserSessionEntity>> sessionCache,
Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionCache,
@@ -134,6 +143,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
this.lastSessionRefreshStore = lastSessionRefreshStore;
this.offlineLastSessionRefreshStore = offlineLastSessionRefreshStore;
+ this.persisterLastSessionRefreshStore = persisterLastSessionRefreshStore;
+ this.remoteCacheInvoker = remoteCacheInvoker;
this.keyGenerator = keyGenerator;
session.getTransactionManager().enlistAfterCompletion(clusterEventsSenderTx);
@@ -160,14 +171,18 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
return offline ? offlineClientSessionTx : clientSessionTx;
}
- protected LastSessionRefreshStore getLastSessionRefreshStore() {
+ protected CrossDCLastSessionRefreshStore getLastSessionRefreshStore() {
return lastSessionRefreshStore;
}
- protected LastSessionRefreshStore getOfflineLastSessionRefreshStore() {
+ protected CrossDCLastSessionRefreshStore getOfflineLastSessionRefreshStore() {
return offlineLastSessionRefreshStore;
}
+ protected PersisterLastSessionRefreshStore getPersisterLastSessionRefreshStore() {
+ return persisterLastSessionRefreshStore;
+ }
+
@Override
public AuthenticatedClientSessionModel createClientSession(RealmModel realm, ClientModel client, UserSessionModel userSession) {
final UUID clientSessionId = keyGenerator.generateKeyUUID(session, clientSessionCache);
@@ -535,7 +550,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
}
private void removeExpiredOfflineUserSessions(RealmModel realm) {
- UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
int expiredOffline = Time.currentTime() - realm.getOfflineSessionIdleTimeout() - SessionTimeoutHelper.PERIODIC_CLEANER_IDLE_TIMEOUT_WINDOW_SECONDS;
// Each cluster node cleanups just local sessions, which are those owned by himself (+ few more taking l1 cache into account)
@@ -570,8 +584,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
futures.addTask(f);
});
- // TODO:mposolda can be likely optimized to delete all expired at one step
- persister.removeUserSession( userSessionEntity.getId(), true);
}
});
@@ -796,7 +808,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
@Override
public UserSessionModel createOfflineUserSession(UserSessionModel userSession) {
- UserSessionAdapter offlineUserSession = importUserSession(userSession, true, false);
+ UserSessionAdapter offlineUserSession = importUserSession(userSession, true);
// started and lastSessionRefresh set to current time
int currentTime = Time.currentTime();
@@ -866,8 +878,117 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
return getUserSessions(realm, client, first, max, true);
}
+
@Override
- public UserSessionAdapter importUserSession(UserSessionModel userSession, boolean offline, boolean importAuthenticatedClientSessions) {
+ public void importUserSessions(Collection<UserSessionModel> persistentUserSessions, boolean offline) {
+ if (persistentUserSessions == null || persistentUserSessions.isEmpty()) {
+ return;
+ }
+
+ Map<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionsById = new HashMap<>();
+
+ Map<String, SessionEntityWrapper<UserSessionEntity>> sessionsById = persistentUserSessions.stream()
+ .map((UserSessionModel persistentUserSession) -> {
+
+ UserSessionEntity userSessionEntityToImport = createUserSessionEntityInstance(persistentUserSession);
+
+ for (Map.Entry<String, AuthenticatedClientSessionModel> entry : persistentUserSession.getAuthenticatedClientSessions().entrySet()) {
+ String clientUUID = entry.getKey();
+ AuthenticatedClientSessionModel clientSession = entry.getValue();
+ AuthenticatedClientSessionEntity clientSessionToImport = createAuthenticatedClientSessionInstance(clientSession, userSessionEntityToImport.getRealmId(), offline);
+
+ // Update timestamp to same value as userSession. LastSessionRefresh of userSession from DB will have correct value
+ clientSessionToImport.setTimestamp(userSessionEntityToImport.getLastSessionRefresh());
+
+ clientSessionsById.put(clientSessionToImport.getId(), new SessionEntityWrapper<>(clientSessionToImport));
+
+ // Update userSession entity with the clientSession
+ AuthenticatedClientSessionStore clientSessions = userSessionEntityToImport.getAuthenticatedClientSessions();
+ clientSessions.put(clientUUID, clientSessionToImport.getId());
+ }
+
+ return userSessionEntityToImport;
+ })
+ .map(SessionEntityWrapper::new)
+ .collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
+
+ // Directly put all entities to the infinispan cache
+ Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = CacheDecorators.skipCacheLoaders(getCache(offline));
+ cache.putAll(sessionsById);
+
+ // put all entities to the remoteCache (if exists)
+ RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
+ if (remoteCache != null) {
+ Map<String, SessionEntityWrapper<UserSessionEntity>> sessionsByIdForTransport = sessionsById.values().stream()
+ .map(SessionEntityWrapper::forTransport)
+ .collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
+
+ Retry.executeWithBackoff((int iteration) -> {
+
+ try {
+ remoteCache.putAll(sessionsByIdForTransport);
+ } catch (HotRodClientException re) {
+ if (log.isDebugEnabled()) {
+ log.debugf(re, "Failed to put import %d sessions to remoteCache. Iteration '%s'. Will try to retry the task",
+ sessionsByIdForTransport.size(), iteration);
+ }
+
+ // Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
+ throw re;
+ }
+
+ }, 10, 10);
+ }
+
+ // Import client sessions
+ Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessCache = offline ? offlineClientSessionCache : clientSessionCache;
+ clientSessCache = CacheDecorators.skipCacheLoaders(clientSessCache);
+
+ clientSessCache.putAll(clientSessionsById);
+
+ // put all entities to the remoteCache (if exists)
+ RemoteCache remoteCacheClientSessions = InfinispanUtil.getRemoteCache(clientSessCache);
+ if (remoteCacheClientSessions != null) {
+ Map<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> sessionsByIdForTransport = clientSessionsById.values().stream()
+ .map(SessionEntityWrapper::forTransport)
+ .collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
+
+ Retry.executeWithBackoff((int iteration) -> {
+
+ try {
+ remoteCacheClientSessions.putAll(sessionsByIdForTransport);
+ } catch (HotRodClientException re) {
+ if (log.isDebugEnabled()) {
+ log.debugf(re, "Failed to put import %d client sessions to remoteCache. Iteration '%s'. Will try to retry the task",
+ sessionsByIdForTransport.size(), iteration);
+ }
+
+ // Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
+ throw re;
+ }
+
+ }, 10, 10);
+ }
+ }
+
+
+ // Imports just userSession without it's clientSessions
+ protected UserSessionAdapter importUserSession(UserSessionModel userSession, boolean offline) {
+ UserSessionEntity entity = createUserSessionEntityInstance(userSession);
+
+ InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx = getTransaction(offline);
+ InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx = getClientSessionTransaction(offline);
+
+ SessionUpdateTask<UserSessionEntity> importTask = Tasks.addIfAbsentSync();
+ userSessionUpdateTx.addTask(userSession.getId(), importTask, entity);
+
+ UserSessionAdapter importedSession = wrap(userSession.getRealm(), entity, offline);
+
+ return importedSession;
+ }
+
+
+ private UserSessionEntity createUserSessionEntityInstance(UserSessionModel userSession) {
UserSessionEntity entity = new UserSessionEntity();
entity.setId(userSession.getId());
entity.setRealmId(userSession.getRealm().getId());
@@ -896,22 +1017,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
entity.setStarted(userSession.getStarted());
entity.setLastSessionRefresh(userSession.getLastSessionRefresh());
- InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx = getTransaction(offline);
- InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx = getClientSessionTransaction(offline);
-
- SessionUpdateTask<UserSessionEntity> importTask = Tasks.addIfAbsentSync();
- userSessionUpdateTx.addTask(userSession.getId(), importTask, entity);
-
- UserSessionAdapter importedSession = wrap(userSession.getRealm(), entity, offline);
-
- // Handle client sessions
- if (importAuthenticatedClientSessions) {
- for (AuthenticatedClientSessionModel clientSession : userSession.getAuthenticatedClientSessions().values()) {
- importClientSession(importedSession, clientSession, userSessionUpdateTx, clientSessionUpdateTx, offline);
- }
- }
-
- return importedSession;
+ return entity;
}
@@ -919,16 +1025,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx,
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx,
boolean offline) {
- final UUID clientSessionId = keyGenerator.generateKeyUUID(session, getClientSessionCache(offline));
- AuthenticatedClientSessionEntity entity = new AuthenticatedClientSessionEntity(clientSessionId);
- entity.setRealmId(sessionToImportInto.getRealm().getId());
-
- entity.setAction(clientSession.getAction());
- entity.setAuthMethod(clientSession.getProtocol());
-
- entity.setNotes(clientSession.getNotes() == null ? new ConcurrentHashMap<>() : clientSession.getNotes());
- entity.setRedirectUri(clientSession.getRedirectUri());
- entity.setTimestamp(clientSession.getTimestamp());
+ AuthenticatedClientSessionEntity entity = createAuthenticatedClientSessionInstance(clientSession, sessionToImportInto.getRealm().getId(), offline);
+ final UUID clientSessionId = entity.getId();
SessionUpdateTask<AuthenticatedClientSessionEntity> createClientSessionTask = Tasks.addIfAbsentSync();
clientSessionUpdateTx.addTask(entity.getId(), createClientSessionTask, entity);
@@ -942,6 +1040,22 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
return new AuthenticatedClientSessionAdapter(session,this, entity, clientSession.getClient(), sessionToImportInto, userSessionUpdateTx, clientSessionUpdateTx, offline);
}
+
+ private AuthenticatedClientSessionEntity createAuthenticatedClientSessionInstance(AuthenticatedClientSessionModel clientSession, String realmId, boolean offline) {
+ final UUID clientSessionId = keyGenerator.generateKeyUUID(session, getClientSessionCache(offline));
+ AuthenticatedClientSessionEntity entity = new AuthenticatedClientSessionEntity(clientSessionId);
+ entity.setRealmId(realmId);
+
+ entity.setAction(clientSession.getAction());
+ entity.setAuthMethod(clientSession.getProtocol());
+
+ entity.setNotes(clientSession.getNotes() == null ? new ConcurrentHashMap<>() : clientSession.getNotes());
+ entity.setRedirectUri(clientSession.getRedirectUri());
+ entity.setTimestamp(clientSession.getTimestamp());
+
+ return entity;
+ }
+
private static class RegisterClientSessionTask implements SessionUpdateTask<UserSessionEntity> {
private final String clientUuid;
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java
index 15d8c25..a59706a 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java
@@ -32,8 +32,10 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.UserSessionProviderFactory;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStore;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStoreFactory;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
+import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
+import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStoreFactory;
import org.keycloak.models.sessions.infinispan.initializer.CacheInitializer;
import org.keycloak.models.sessions.infinispan.initializer.DBLockBasedCacheInitializer;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
@@ -80,8 +82,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
private Config.Scope config;
private RemoteCacheInvoker remoteCacheInvoker;
- private LastSessionRefreshStore lastSessionRefreshStore;
- private LastSessionRefreshStore offlineLastSessionRefreshStore;
+ private CrossDCLastSessionRefreshStore lastSessionRefreshStore;
+ private CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore;
+ private PersisterLastSessionRefreshStore persisterLastSessionRefreshStore;
private InfinispanKeyGenerator keyGenerator;
@Override
@@ -93,7 +96,8 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionsCache = connections.getCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME);
Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> loginFailures = connections.getCache(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME);
- return new InfinispanUserSessionProvider(session, remoteCacheInvoker, lastSessionRefreshStore, offlineLastSessionRefreshStore, keyGenerator,
+ return new InfinispanUserSessionProvider(session, remoteCacheInvoker, lastSessionRefreshStore, offlineLastSessionRefreshStore,
+ persisterLastSessionRefreshStore, keyGenerator,
cache, offlineSessionsCache, clientSessionCache, offlineClientSessionsCache, loginFailures);
}
@@ -169,6 +173,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
initializer.initCache();
initializer.loadSessions();
+
+ // Initialize persister for periodically doing bulk DB updates of lastSessionRefresh timestamps of refreshed sessions
+ persisterLastSessionRefreshStore = new PersisterLastSessionRefreshStoreFactory().createAndInit(session, true);
}
});
@@ -233,7 +240,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
});
if (sessionsRemoteCache) {
- lastSessionRefreshStore = new LastSessionRefreshStoreFactory().createAndInit(session, sessionsCache, false);
+ lastSessionRefreshStore = new CrossDCLastSessionRefreshStoreFactory().createAndInit(session, sessionsCache, false);
}
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionsCache = ispn.getCache(InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME);
@@ -248,7 +255,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
});
if (offlineSessionsRemoteCache) {
- offlineLastSessionRefreshStore = new LastSessionRefreshStoreFactory().createAndInit(session, offlineSessionsCache, true);
+ offlineLastSessionRefreshStore = new CrossDCLastSessionRefreshStoreFactory().createAndInit(session, offlineSessionsCache, true);
}
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionsCache = ispn.getCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME);
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java
index 8dbcc20..afbd37e 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java
@@ -84,7 +84,6 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
});
state = new InitializerState(ctx[0].getSegmentsCount());
- saveStateToCache(state);
} else {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
@@ -102,7 +101,7 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
}
- protected void startLoadingImpl(InitializerState state, SessionLoader.LoaderContext ctx) {
+ protected void startLoadingImpl(InitializerState state, SessionLoader.LoaderContext loaderCtx) {
// Assume each worker has same processor's count
int processors = Runtime.getRuntime().availableProcessors();
@@ -114,6 +113,8 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
int errors = 0;
try {
+ List<SessionLoader.WorkerResult> previousResults = new LinkedList<>();
+
while (!state.isFinished()) {
int nodesCount = transport==null ? 1 : transport.getMembers().size();
int distributedWorkersCount = processors * nodesCount;
@@ -126,34 +127,43 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
log.trace("unfinished segments for this iteration: " + segments);
}
- List<Future<WorkerResult>> futures = new LinkedList<>();
+ List<Future<SessionLoader.WorkerResult>> futures = new LinkedList<>();
+
+ int workerId = 0;
for (Integer segment : segments) {
+ SessionLoader.WorkerContext workerCtx = sessionLoader.computeWorkerContext(loaderCtx, segment, workerId, previousResults);
+
SessionInitializerWorker worker = new SessionInitializerWorker();
- worker.setWorkerEnvironment(segment, ctx, sessionLoader);
+ worker.setWorkerEnvironment(loaderCtx, workerCtx, sessionLoader);
+
if (!distributed) {
worker.setEnvironment(workCache, null);
}
- Future<WorkerResult> future = executorService.submit(worker);
+ Future<SessionLoader.WorkerResult> future = executorService.submit(worker);
futures.add(future);
+
+ workerId++;
}
- for (Future<WorkerResult> future : futures) {
+ boolean anyFailure = false;
+ for (Future<SessionLoader.WorkerResult> future : futures) {
try {
- WorkerResult result = future.get();
+ SessionLoader.WorkerResult result = future.get();
+ previousResults.add(result);
- if (result.getSuccess()) {
- int computedSegment = result.getSegment();
- state.markSegmentFinished(computedSegment);
- } else {
+ if (!result.isSuccess()) {
if (log.isTraceEnabled()) {
log.tracef("Segment %d failed to compute", result.getSegment());
}
+ anyFailure = true;
}
} catch (InterruptedException ie) {
+ anyFailure = true;
errors++;
log.error("Interruped exception when computed future. Errors: " + errors, ie);
} catch (ExecutionException ee) {
+ anyFailure = true;
errors++;
log.error("ExecutionException when computed future. Errors: " + errors, ee);
}
@@ -163,11 +173,19 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
throw new RuntimeException("Maximum count of worker errors occured. Limit was " + maxErrors + ". See server.log for details");
}
- saveStateToCache(state);
+ // Save just if no error happened. Otherwise re-compute
+ if (!anyFailure) {
+ for (SessionLoader.WorkerResult result : previousResults) {
+ state.markSegmentFinished(result.getSegment());
+ }
- log.debugf("New initializer state pushed. The state is: %s", state);
+ log.debugf("New initializer state is: %s", state);
+ }
}
+ // Push the state after computation is finished
+ saveStateToCache(state);
+
// Loader callback after the task is finished
this.sessionLoader.afterAllSessionsLoaded(this);
@@ -179,33 +197,4 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
}
}
-
- public static class WorkerResult implements Serializable {
-
- private Integer segment;
- private Boolean success;
-
- public static WorkerResult create (Integer segment, boolean success) {
- WorkerResult res = new WorkerResult();
- res.setSegment(segment);
- res.setSuccess(success);
- return res;
- }
-
- public Integer getSegment() {
- return segment;
- }
-
- public void setSegment(Integer segment) {
- this.segment = segment;
- }
-
- public Boolean getSuccess() {
- return success;
- }
-
- public void setSuccess(Boolean success) {
- this.success = success;
- }
- }
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java
index cf62230..74ad20a 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java
@@ -21,8 +21,8 @@ import org.infinispan.Cache;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.context.Flag;
import org.jboss.logging.Logger;
-import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.Retry;
+import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.session.UserSessionPersisterProvider;
@@ -33,7 +33,11 @@ import java.util.List;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
-public class OfflinePersistentUserSessionLoader implements SessionLoader<OfflinePersistentUserSessionLoaderContext>, Serializable {
+public class OfflinePersistentUserSessionLoader implements SessionLoader<OfflinePersistentLoaderContext,
+ OfflinePersistentWorkerContext, OfflinePersistentWorkerResult>, Serializable {
+
+ // Placeholder String used in the searching conditions to identify very first session
+ private static final String FIRST_SESSION_ID = "000";
private static final Logger log = Logger.getLogger(OfflinePersistentUserSessionLoader.class);
@@ -53,46 +57,67 @@ public class OfflinePersistentUserSessionLoader implements SessionLoader<Offline
@Override
public void init(KeycloakSession session) {
- UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
+ }
- // TODO: check if update of timestamps in persister can be skipped entirely
- int clusterStartupTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
- log.debugf("Clearing detached sessions from persistent storage and updating timestamps to %d", clusterStartupTime);
+ @Override
+ public OfflinePersistentLoaderContext computeLoaderContext(KeycloakSession session) {
+ UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
+ int sessionsCount = persister.getUserSessionsCount(true);
- persister.clearDetachedUserSessions();
- persister.updateAllTimestamps(clusterStartupTime);
+ return new OfflinePersistentLoaderContext(sessionsCount, sessionsPerSegment);
}
@Override
- public OfflinePersistentUserSessionLoaderContext computeLoaderContext(KeycloakSession session) {
- UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
- int sessionsCount = persister.getUserSessionsCount(true);
+ public OfflinePersistentWorkerContext computeWorkerContext(OfflinePersistentLoaderContext loaderCtx, int segment, int workerId, List<OfflinePersistentWorkerResult> previousResults) {
+ int lastCreatedOn;
+ String lastSessionId;
+ if (previousResults.isEmpty()) {
+ lastCreatedOn = 0;
+ lastSessionId = FIRST_SESSION_ID;
+ } else {
+ OfflinePersistentWorkerResult lastResult = previousResults.get(previousResults.size() - 1);
+ lastCreatedOn = lastResult.getLastCreatedOn();
+ lastSessionId = lastResult.getLastSessionId();
+ }
- return new OfflinePersistentUserSessionLoaderContext(sessionsCount, sessionsPerSegment);
+ // We know the last loaded session. New workers iteration will start from this place
+ return new OfflinePersistentWorkerContext(segment, workerId, lastCreatedOn, lastSessionId);
}
@Override
- public boolean loadSessions(KeycloakSession session, OfflinePersistentUserSessionLoaderContext ctx, int segment) {
- int first = ctx.getSessionsPerSegment() * segment;
- int max = sessionsPerSegment;
+ public OfflinePersistentWorkerResult createFailedWorkerResult(OfflinePersistentLoaderContext loaderContext, OfflinePersistentWorkerContext workerContext) {
+ return new OfflinePersistentWorkerResult(false, workerContext.getSegment(), workerContext.getWorkerId(), -1, FIRST_SESSION_ID);
+ }
- if (log.isTraceEnabled()) {
- log.tracef("Loading sessions - first: %d, max: %d", first, max);
- }
+
+ @Override
+ public OfflinePersistentWorkerResult loadSessions(KeycloakSession session, OfflinePersistentLoaderContext loaderContext, OfflinePersistentWorkerContext ctx) {
+ int first = ctx.getWorkerId() * sessionsPerSegment;
+
+ log.tracef("Loading sessions for segment: %d", ctx.getSegment());
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
- List<UserSessionModel> sessions = persister.loadUserSessions(first, max, true);
+ List<UserSessionModel> sessions = persister.loadUserSessions(first, sessionsPerSegment, true, ctx.getLastCreatedOn(), ctx.getLastSessionId());
+
+ log.tracef("Sessions loaded from DB - segment: %d", ctx.getSegment());
- for (UserSessionModel persistentSession : sessions) {
+ UserSessionModel lastSession = null;
+ if (!sessions.isEmpty()) {
+ lastSession = sessions.get(sessions.size() - 1);
// Save to memory/infinispan
- UserSessionModel offlineUserSession = session.sessions().importUserSession(persistentSession, true, true);
+ session.sessions().importUserSessions(sessions, true);
}
- return true;
+ int lastCreatedOn = lastSession==null ? Time.currentTime() + 100000 : lastSession.getStarted();
+ String lastSessionId = lastSession==null ? FIRST_SESSION_ID : lastSession.getId();
+
+ log.tracef("Sessions imported to infinispan - segment: %d, lastCreatedOn: %d, lastSessionId: %s", ctx.getSegment(), lastCreatedOn, lastSessionId);
+
+ return new OfflinePersistentWorkerResult(true, ctx.getSegment(), ctx.getWorkerId(), lastCreatedOn, lastSessionId);
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerContext.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerContext.java
new file mode 100644
index 0000000..8d8e3f3
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerContext.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.initializer;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class OfflinePersistentWorkerContext extends SessionLoader.WorkerContext {
+
+ private final int lastCreatedOn;
+ private final String lastSessionId;
+
+ public OfflinePersistentWorkerContext(int segment, int workerId, int lastCreatedOn, String lastSessionId) {
+ super(segment, workerId);
+ this.lastCreatedOn = lastCreatedOn;
+ this.lastSessionId = lastSessionId;
+ }
+
+
+ public int getLastCreatedOn() {
+ return lastCreatedOn;
+ }
+
+ public String getLastSessionId() {
+ return lastSessionId;
+ }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerResult.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerResult.java
new file mode 100644
index 0000000..44aa2c5
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerResult.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.initializer;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class OfflinePersistentWorkerResult extends SessionLoader.WorkerResult {
+
+ private final int lastCreatedOn;
+ private final String lastSessionId;
+
+
+ public OfflinePersistentWorkerResult(boolean success, int segment, int workerId, int lastCreatedOn, String lastSessionId) {
+ super(success, segment, workerId);
+ this.lastCreatedOn = lastCreatedOn;
+ this.lastSessionId = lastSessionId;
+ }
+
+
+ public int getLastCreatedOn() {
+ return lastCreatedOn;
+ }
+
+
+ public String getLastSessionId() {
+ return lastSessionId;
+ }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java
index 7df94bf..9bfb463 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java
@@ -31,19 +31,20 @@ import java.util.Set;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
-public class SessionInitializerWorker implements DistributedCallable<String, Serializable, InfinispanCacheInitializer.WorkerResult>, Serializable {
+public class SessionInitializerWorker implements DistributedCallable<String, Serializable, SessionLoader.WorkerResult>, Serializable {
private static final Logger log = Logger.getLogger(SessionInitializerWorker.class);
- private int segment;
- private SessionLoader.LoaderContext ctx;
+
+ private SessionLoader.LoaderContext loaderCtx;
+ private SessionLoader.WorkerContext workerCtx;
private SessionLoader sessionLoader;
private transient Cache<String, Serializable> workCache;
- public void setWorkerEnvironment(int segment, SessionLoader.LoaderContext ctx, SessionLoader sessionLoader) {
- this.segment = segment;
- this.ctx = ctx;
+ public void setWorkerEnvironment(SessionLoader.LoaderContext loaderCtx, SessionLoader.WorkerContext workerCtx, SessionLoader sessionLoader) {
+ this.loaderCtx = loaderCtx;
+ this.workerCtx = workerCtx;
this.sessionLoader = sessionLoader;
}
@@ -53,27 +54,28 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ser
}
@Override
- public InfinispanCacheInitializer.WorkerResult call() throws Exception {
+ public SessionLoader.WorkerResult call() throws Exception {
if (log.isTraceEnabled()) {
- log.tracef("Running computation for segment: %d", segment);
+ log.tracef("Running computation for segment: %s", workerCtx.toString());
}
KeycloakSessionFactory sessionFactory = workCache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
if (sessionFactory == null) {
log.debugf("KeycloakSessionFactory not yet set in cache. Worker skipped");
- return InfinispanCacheInitializer.WorkerResult.create(segment, false);
+ return sessionLoader.createFailedWorkerResult(loaderCtx, workerCtx);
}
+ SessionLoader.WorkerResult[] ref = new SessionLoader.WorkerResult[1];
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
- sessionLoader.loadSessions(session, ctx, segment);
+ ref[0] = sessionLoader.loadSessions(session, loaderCtx, workerCtx);
}
});
- return InfinispanCacheInitializer.WorkerResult.create(segment, true);
+ return ref[0];
}
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java
index 71b519e..83fe4e4 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java
@@ -18,13 +18,16 @@
package org.keycloak.models.sessions.infinispan.initializer;
import java.io.Serializable;
+import java.util.List;
import org.keycloak.models.KeycloakSession;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
-public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContext> extends Serializable {
+public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContext,
+ WORKER_CONTEXT extends SessionLoader.WorkerContext,
+ WORKER_RESULT extends SessionLoader.WorkerResult> extends Serializable {
/**
* Will be triggered just once on cluster coordinator node to perform some generic initialization tasks (Eg. update DB before starting load).
@@ -38,7 +41,7 @@ public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContex
/**
*
- * Will be triggered just once on cluster coordinator node to count the number of segments and other context data specific to the worker task.
+ * Will be triggered just once on cluster coordinator node to count the number of segments and other context data specific to whole computation.
* Each segment will be then later computed in one "worker" task
*
* This method could be expensive to call, so the "computed" loaderContext object is passed among workers/loaders and needs to be serializable
@@ -50,14 +53,36 @@ public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContex
/**
+ * Compute the worker context for current iteration
+ *
+ * @param loaderCtx global loader context
+ * @param segment the current segment (page) to compute
+ * @param workerId ID of worker for current worker iteration. Usually the number 0-8 (with single cluster node)
+ * @param previousResults workerResults from previous computation. Can be empty list in case of the operation is triggered for the 1st time
+ * @return
+ */
+ WORKER_CONTEXT computeWorkerContext(LOADER_CONTEXT loaderCtx, int segment, int workerId, List<WORKER_RESULT> previousResults);
+
+
+ /**
* Will be called on all cluster nodes to load the specified page.
*
* @param session
- * @param loaderContext loaderContext object, which was already computed before
- * @param segment to be computed
+ * @param loaderContext global loaderContext object, which was already computed before
+ * @param workerContext for current iteration
* @return
*/
- boolean loadSessions(KeycloakSession session, LOADER_CONTEXT loaderContext, int segment);
+ WORKER_RESULT loadSessions(KeycloakSession session, LOADER_CONTEXT loaderContext, WORKER_CONTEXT workerContext);
+
+
+ /**
+ * Called when it's not possible to compute current iteration and load session for some reason (EG. infinispan not yet fully initialized)
+ *
+ * @param loaderContext
+ * @param workerContext
+ * @return
+ */
+ WORKER_RESULT createFailedWorkerResult(LOADER_CONTEXT loaderContext, WORKER_CONTEXT workerContext);
/**
@@ -81,9 +106,78 @@ public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContex
* Object, which contains some context data to be used by SessionLoader implementation. It's computed just once and then passed
* to each {@link SessionLoader}. It needs to be {@link Serializable}
*/
- interface LoaderContext extends Serializable {
+ class LoaderContext implements Serializable {
+
+ private final int segmentsCount;
+
+ public LoaderContext(int segmentsCount) {
+ this.segmentsCount = segmentsCount;
+ }
+
+
+ public int getSegmentsCount() {
+ return segmentsCount;
+ }
+
+ }
+
+
+ /**
+ * Object, which is computed before each worker iteration and contains some data to be used by the corresponding worker iteration.
+ * For example info about which segment/page should be loaded by current worker.
+ */
+ class WorkerContext implements Serializable {
+
+ private final int segment;
+ private final int workerId;
+
+ public WorkerContext(int segment, int workerId) {
+ this.segment = segment;
+ this.workerId = workerId;
+ }
+
+
+ public int getSegment() {
+ return this.segment;
+ }
+
+
+ public int getWorkerId() {
+ return this.workerId;
+ }
+ }
+
+
+ /**
+ * Result of single worker iteration
+ */
+ class WorkerResult implements Serializable {
+
+ private final boolean success;
+ private final int segment;
+ private final int workerId;
+
+
+ public WorkerResult(boolean success, int segment, int workerId) {
+ this.success = success;
+ this.segment = segment;
+ this.workerId = workerId;
+ }
+
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+
+ public int getSegment() {
+ return segment;
+ }
+
- int getSegmentsCount();
+ public int getWorkerId() {
+ return workerId;
+ }
}
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java
index 233a58a..2e927df 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java
@@ -19,6 +19,7 @@ package org.keycloak.models.sessions.infinispan.remotestore;
import java.io.Serializable;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -42,7 +43,7 @@ import static org.infinispan.client.hotrod.impl.Util.await;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
-public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessionsLoaderContext>, Serializable {
+public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessionsLoaderContext, SessionLoader.WorkerContext, SessionLoader.WorkerResult>, Serializable {
private static final Logger log = Logger.getLogger(RemoteCacheSessionsLoader.class);
@@ -92,26 +93,38 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
@Override
- public boolean loadSessions(KeycloakSession session, RemoteCacheSessionsLoaderContext context, int segment) {
+ public WorkerContext computeWorkerContext(RemoteCacheSessionsLoaderContext loaderCtx, int segment, int workerId, List<WorkerResult> previousResults) {
+ return new WorkerContext(segment, workerId);
+ }
+
+
+ @Override
+ public WorkerResult createFailedWorkerResult(RemoteCacheSessionsLoaderContext loaderContext, WorkerContext workerContext) {
+ return new WorkerResult(false, workerContext.getSegment(), workerContext.getWorkerId());
+ }
+
+
+ @Override
+ public WorkerResult loadSessions(KeycloakSession session, RemoteCacheSessionsLoaderContext loaderContext, WorkerContext ctx) {
Cache cache = getCache(session);
Cache decoratedCache = cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE, Flag.IGNORE_RETURN_VALUES);
RemoteCache remoteCache = getRemoteCache(session);
- Set<Integer> myIspnSegments = getMyIspnSegments(segment, context);
+ Set<Integer> myIspnSegments = getMyIspnSegments(ctx.getSegment(), loaderContext);
- log.debugf("Will do bulk load of sessions from remote cache '%s' . Segment: %d", cache.getName(), segment);
+ log.debugf("Will do bulk load of sessions from remote cache '%s' . Segment: %d", cache.getName(), ctx.getSegment());
CloseableIterator<Map.Entry> iterator = null;
int countLoaded = 0;
try {
- iterator = remoteCache.retrieveEntries(null, myIspnSegments, context.getSessionsPerSegment());
+ iterator = remoteCache.retrieveEntries(null, myIspnSegments, loaderContext.getSessionsPerSegment());
while (iterator.hasNext()) {
countLoaded++;
Map.Entry entry = iterator.next();
decoratedCache.putAsync(entry.getKey(), entry.getValue());
}
} catch (RuntimeException e) {
- log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), segment);
+ log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), ctx.getSegment());
throw e;
} finally {
if (iterator != null) {
@@ -119,9 +132,9 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
}
}
- log.debugf("Successfully finished loading sessions from cache '%s' . Segment: %d, Count of sessions loaded: %d", cache.getName(), segment, countLoaded);
+ log.debugf("Successfully finished loading sessions from cache '%s' . Segment: %d, Count of sessions loaded: %d", cache.getName(), ctx.getSegment(), countLoaded);
- return true;
+ return new WorkerResult(true, ctx.getSegment(), ctx.getWorkerId());
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java
index ec74871..bf8787c 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java
@@ -17,38 +17,34 @@
package org.keycloak.models.sessions.infinispan.remotestore;
-import java.io.Serializable;
-
import org.keycloak.models.sessions.infinispan.initializer.SessionLoader;
/**
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
-public class RemoteCacheSessionsLoaderContext implements SessionLoader.LoaderContext, Serializable {
+public class RemoteCacheSessionsLoaderContext extends SessionLoader.LoaderContext {
// Count of hash segments for remote infinispan cache. It's by default 256 for distributed/replicated caches
private final int ispnSegmentsCount;
- // Count of segments (worker iterations for distributedExecutionService executions on KC side). Each segment will be 1 worker iteration.
- // Count of segments could be lower than "ispnSegmentsCount" and depends on the size of the cache. For example if we have cache with just 500 items,
- // we don't need 256 segments and send 256 requests to remoteCache to preload thing. Instead, we will have lower number of segments (EG. 8)
- // and we will map more ispnSegments into 1 worker segment (In this case 256 / 8 = 32. So 32 ISPN segments mapped to each worker segment)
- private final int segmentsCount;
-
private final int sessionsPerSegment;
private final int sessionsTotal;
public RemoteCacheSessionsLoaderContext(int ispnSegmentsCount, int sessionsPerSegment, int sessionsTotal) {
+ super(computeSegmentsCount(sessionsTotal, sessionsPerSegment, ispnSegmentsCount));
this.ispnSegmentsCount = ispnSegmentsCount;
this.sessionsPerSegment = sessionsPerSegment;
this.sessionsTotal = sessionsTotal;
- this.segmentsCount = computeSegmentsCount(sessionsTotal, sessionsPerSegment, ispnSegmentsCount);
}
- private int computeSegmentsCount(int sessionsTotal, int sessionsPerSegment, int ispnSegments) {
+ // Count of segments (worker iterations for distributedExecutionService executions on KC side). Each segment will be 1 worker iteration.
+ // Count of segments could be lower than "ispnSegmentsCount" and depends on the size of the cache. For example if we have cache with just 500 items,
+ // we don't need 256 segments and send 256 requests to remoteCache to preload thing. Instead, we will have lower number of segments (EG. 8)
+ // and we will map more ispnSegments into 1 worker segment (In this case 256 / 8 = 32. So 32 ISPN segments mapped to each worker segment)
+ private static int computeSegmentsCount(int sessionsTotal, int sessionsPerSegment, int ispnSegments) {
// No support by remote ISPN cache for segments. This can happen if remoteCache is local (non-clustered)
if (ispnSegments < 0) {
return 1;
@@ -68,11 +64,6 @@ public class RemoteCacheSessionsLoaderContext implements SessionLoader.LoaderCon
}
- @Override
- public int getSegmentsCount() {
- return segmentsCount;
- }
-
public int getIspnSegmentsCount() {
return ispnSegmentsCount;
}
@@ -89,7 +80,7 @@ public class RemoteCacheSessionsLoaderContext implements SessionLoader.LoaderCon
@Override
public String toString() {
return new StringBuilder("RemoteCacheSessionsLoaderContext [ ")
- .append("segmentsCount: ").append(segmentsCount)
+ .append("segmentsCount: ").append(getSegmentsCount())
.append(", ispnSegmentsCount: ").append(ispnSegmentsCount)
.append(", sessionsPerSegment: ").append(sessionsPerSegment)
.append(", sessionsTotal: ").append(sessionsTotal)
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java
index a7c6c7f..7355e6c 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java
@@ -24,10 +24,11 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshChecker;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshChecker;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.Tasks;
import org.keycloak.models.sessions.infinispan.changes.UserSessionUpdateTask;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshListener;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
@@ -42,6 +43,8 @@ import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
+import static org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshListener.IGNORE_REMOTE_CACHE_UPDATE;
+
/**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
*/
@@ -207,6 +210,15 @@ public class UserSessionAdapter implements UserSessionModel {
}
public void setLastSessionRefresh(int lastSessionRefresh) {
+ if (offline) {
+ // Received the message from the other DC that we should update the lastSessionRefresh in local cluster. Don't update DB in that case.
+ // The other DC already did.
+ Boolean ignoreRemoteCacheUpdate = (Boolean) session.getAttribute(CrossDCLastSessionRefreshListener.IGNORE_REMOTE_CACHE_UPDATE);
+ if (ignoreRemoteCacheUpdate == null || !ignoreRemoteCacheUpdate) {
+ provider.getPersisterLastSessionRefreshStore().putLastSessionRefresh(session, entity.getId(), realm.getId(), lastSessionRefresh);
+ }
+ }
+
UserSessionUpdateTask task = new UserSessionUpdateTask() {
@Override
@@ -216,7 +228,7 @@ public class UserSessionAdapter implements UserSessionModel {
@Override
public CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<UserSessionEntity> sessionWrapper) {
- return new LastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
+ return new CrossDCLastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
.shouldSaveUserSessionToRemoteCache(UserSessionAdapter.this.session, UserSessionAdapter.this.realm, sessionWrapper, offline, lastSessionRefresh);
}
diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java
index 950a4c0..8ae5053 100644
--- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java
@@ -31,6 +31,7 @@ import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+import org.keycloak.models.sessions.infinispan.initializer.SessionLoader;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoader;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoaderContext;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
@@ -110,7 +111,7 @@ public class RemoteCacheSessionsLoaderTest {
Set<String> visitedKeys = new HashSet<>();
for (int currentSegment=0 ; currentSegment<ctx.getSegmentsCount() ; currentSegment++) {
logger.infof("Loading segment %d", currentSegment);
- loader.loadSessions(null, ctx, currentSegment);
+ loader.loadSessions(null, ctx, new SessionLoader.WorkerContext(currentSegment, currentSegment));
logger.infof("Loaded %d keys for segment %d", cache2.keySet().size(), currentSegment);
totalCount = totalCount + cache2.keySet().size();
diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ConcurrencyLockingTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ConcurrencyLockingTest.java
index e439b66..dd0cf32 100755
--- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ConcurrencyLockingTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ConcurrencyLockingTest.java
@@ -11,6 +11,8 @@ import org.junit.Ignore;
import org.junit.Test;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -25,7 +27,12 @@ public class ConcurrencyLockingTest {
public void testLocking() throws Exception {
final DefaultCacheManager cacheManager = getVersionedCacheManager();
Cache<String, String> cache = cacheManager.getCache("COUNTER_CACHE");
- cache.put("key", "init");
+
+ Map<String, String> map = new HashMap<>();
+ map.put("key1", "val1");
+ map.put("key2", "val2");
+ cache.putAll(map);
+
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new Runnable() {
@Override
diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java
index 12a3ee5..719a39a 100644
--- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java
@@ -17,17 +17,18 @@
package org.keycloak.models.sessions.infinispan.initializer;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashMap;
+import java.util.Map;
import org.infinispan.Cache;
+import org.infinispan.client.hotrod.ProtocolVersion;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.commons.api.BasicCache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
-import org.infinispan.context.Flag;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
@@ -37,7 +38,6 @@ import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
-import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import java.util.UUID;
@@ -49,20 +49,36 @@ import java.util.UUID;
@Ignore
public class DistributedCacheConcurrentWritesTest {
- private static final int ITERATION_PER_WORKER = 1000;
+ private static final int BATCHES_PER_WORKER = 1000;
+ private static final int ITEMS_IN_BATCH = 100;
- private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
- private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
+ public static void main(String[] args) throws Exception {
+ BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createCache("node1");
+ BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createCache("node2");
- private static final UUID CLIENT_1_UUID = UUID.randomUUID();
+ // NOTE: This setup requires infinispan servers to be up and running on localhost:12232 and localhost:13232
+// BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createRemoteCache("node1");
+// BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createRemoteCache("node2");
+
+ try {
+ testConcurrentPut(cache1, cache2);
+ } finally {
+
+ // Kill JVM
+ cache1.stop();
+ cache2.stop();
+ stopMgr(cache1);
+ stopMgr(cache2);
+
+ System.out.println("Managers killed");
+ }
+ }
- public static void main(String[] args) throws Exception {
- CacheWrapper<String, UserSessionEntity> cache1 = createCache("node1");
- CacheWrapper<String, UserSessionEntity> cache2 = createCache("node2");
+ private static SessionEntityWrapper<UserSessionEntity> createEntityInstance(String id) {
// Create initial item
UserSessionEntity session = new UserSessionEntity();
- session.setId("123");
+ session.setId(id);
session.setRealmId("foo");
session.setBrokerSessionId("!23123123");
session.setBrokerUserId(null);
@@ -76,177 +92,97 @@ public class DistributedCacheConcurrentWritesTest {
clientSession.setAuthMethod("saml");
clientSession.setAction("something");
clientSession.setTimestamp(1234);
- session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId());
-
- try {
- for (int i = 0; i < 10; i++) {
- testConcurrentReplaceWithRemove("key-" + i, session, cache1, cache2);
- }
- } finally {
-
- // Kill JVM
- cache1.getCache().stop();
- cache2.getCache().stop();
- cache1.getCache().getCacheManager().stop();
- cache2.getCache().getCacheManager().stop();
+ session.getAuthenticatedClientSessions().put("foo-client", clientSession.getId());
- System.out.println("Managers killed");
- }
+ return new SessionEntityWrapper<>(session);
}
// Reproducer for KEYCLOAK-7443 and KEYCLOAK-7489. The infinite loop can happen if cache.replace(key, old, new) is called and entity was removed on one cluster node in the meantime
- private static void testConcurrentReplaceWithRemove(String key, UserSessionEntity session, CacheWrapper<String, UserSessionEntity> cache1,
- CacheWrapper<String, UserSessionEntity> cache2) throws InterruptedException {
- cache1.put(key, session);
+ private static void testConcurrentPut(BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache1,
+ BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache2) throws InterruptedException {
- // Create 2 workers for concurrent write and start them
- Worker worker1 = new Worker(1, cache1, key);
- Worker worker2 = new Worker(2, cache2, key);
+ // Create workers for concurrent write and start them
+ Worker worker1 = new Worker(1, cache1);
+ Worker worker2 = new Worker(2, cache2);
long start = System.currentTimeMillis();
- System.out.println("Started clustering test for key " + key);
+ System.out.println("Started clustering test");
worker1.start();
//worker1.join();
worker2.start();
- Thread.sleep(1000);
- // Try to remove the entity after some sleep time.
- cache1.wrappedCache.getAdvancedCache()
- .withFlags(Flag.CACHE_MODE_LOCAL)
- .remove(key);
-
worker1.join();
worker2.join();
long took = System.currentTimeMillis() - start;
- System.out.println("Test finished for key '" + key + "'. Took: " + took + " ms");
-
-// System.out.println("Took: " + took + " ms for key . Notes count: " + session.getNotes().size() + ", failedReplaceCounter: " + failedReplaceCounter.get()
-// + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
+ System.out.println("Test finished. Took: " + took + " ms. Cache size: " + cache1.size());
-// // JGroups statistics
-// JChannel channel = (JChannel)((JGroupsTransport)cache1.wrappedCache.getAdvancedCache().getRpcManager().getTransport()).getChannel();
-// System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 +
-// ", received messages: " + channel.getReceivedMessages());
+ // JGroups statistics
+ printStats(cache1);
}
private static class Worker extends Thread {
- private final CacheWrapper<String, UserSessionEntity> cache;
- private final int threadId;
- private final String key;
+ private final BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache;
+ private final int startIndex;
- public Worker(int threadId, CacheWrapper<String, UserSessionEntity> cache, String key) {
- this.threadId = threadId;
+ public Worker(int threadId, BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache) {
this.cache = cache;
- this.key = key;
- setName("th-" + key + "-" + threadId);
+ this.startIndex = (threadId - 1) * (ITEMS_IN_BATCH * BATCHES_PER_WORKER);
+ setName("th-" + threadId);
}
@Override
public void run() {
- for (int i=0 ; i<ITERATION_PER_WORKER ; i++) {
-
- String noteKey = "n-" + threadId + "-" + i;
-
- // This code can be used to reproduce infinite loop ( KEYCLOAK-7443 )
-// boolean replaced = false;
-// while (!replaced) {
-// SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get(key);
-// oldWrapped.getEntity().getNotes().put(noteKey, "someVal");
-// replaced = cacheReplace(oldWrapped, oldWrapped.getEntity());
-// }
-
- int count = 0;
- boolean replaced = false;
- while (!replaced && count < 25) {
- count++;
- SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get(key);
- oldWrapped.getEntity().getNotes().put(noteKey, "someVal");
- replaced = cacheReplace(oldWrapped, oldWrapped.getEntity());
- }
- if (!replaced) {
- System.err.println("FAILED TO REPLACE ENTITY: " + key);
- return;
- }
- }
+ for (int page = 0; page < BATCHES_PER_WORKER ; page++) {
+ int startPageIndex = startIndex + page * ITEMS_IN_BATCH;
- }
+ putItemsClassic(startPageIndex);
+ //putItemsAll(startPageIndex);
- private boolean cacheReplace(SessionEntityWrapper<UserSessionEntity> oldSession, UserSessionEntity newSession) {
- try {
- boolean replaced = cache.replace(key, oldSession, newSession);
- //cache.replace(key, newSession);
- if (!replaced) {
- failedReplaceCounter.incrementAndGet();
- //return false;
- //System.out.println("Replace failed!!!");
- }
- return replaced;
- } catch (Exception re) {
- failedReplaceCounter2.incrementAndGet();
- return false;
+ System.out.println("Thread " + getName() + ": Saved items from " + startPageIndex + " to " + (startPageIndex + ITEMS_IN_BATCH - 1));
}
- //return replaced;
}
- }
-
- // Session clone
-
- private static UserSessionEntity cloneSession(UserSessionEntity session) {
- UserSessionEntity clone = new UserSessionEntity();
- clone.setId(session.getId());
- clone.setRealmId(session.getRealmId());
- clone.setNotes(new ConcurrentHashMap<>(session.getNotes()));
- return clone;
- }
-
- // Cache creation utils
-
- public static class CacheWrapper<K, V extends SessionEntity> {
-
- private final Cache<K, SessionEntityWrapper<V>> wrappedCache;
-
- public CacheWrapper(Cache<K, SessionEntityWrapper<V>> wrappedCache) {
- this.wrappedCache = wrappedCache;
+ // put items 1 by 1
+ private void putItemsClassic(int startPageIndex) {
+ for (int i = startPageIndex ; i < (startPageIndex + ITEMS_IN_BATCH) ; i++) {
+ String key = "key-" + startIndex + i;
+ SessionEntityWrapper<UserSessionEntity> session = createEntityInstance(key);
+ cache.put(key, session);
+ }
}
- public SessionEntityWrapper<V> get(K key) {
- SessionEntityWrapper<V> val = wrappedCache.get(key);
- return val;
- }
-
- public void put(K key, V newVal) {
- SessionEntityWrapper<V> newWrapper = new SessionEntityWrapper<>(newVal);
- wrappedCache.put(key, newWrapper);
- }
+ // put all items together
+ private void putItemsAll(int startPageIndex) {
+ Map<String, SessionEntityWrapper<UserSessionEntity>> mapp = new HashMap<>();
+ for (int i = startPageIndex ; i < (startPageIndex + ITEMS_IN_BATCH) ; i++) {
+ String key = "key-" + startIndex + i;
+ SessionEntityWrapper<UserSessionEntity> session = createEntityInstance(key);
+ mapp.put(key, session);
+ }
- public boolean replace(K key, SessionEntityWrapper<V> oldVal, V newVal) {
- SessionEntityWrapper<V> newWrapper = new SessionEntityWrapper<>(newVal);
- return wrappedCache.replace(key, oldVal, newWrapper);
+ cache.putAll(mapp);
}
+ }
- private Cache<K, SessionEntityWrapper<V>> getCache() {
- return wrappedCache;
- }
- }
+ // Cache creation utils
- public static CacheWrapper<String, UserSessionEntity> createCache(String nodeName) {
+ public static BasicCache<String, SessionEntityWrapper<UserSessionEntity>> createCache(String nodeName) {
EmbeddedCacheManager mgr = createManager(nodeName);
- Cache<String, SessionEntityWrapper<UserSessionEntity>> wrapped = mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
- return new CacheWrapper<>(wrapped);
+ Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
+ return cache;
}
@@ -272,7 +208,7 @@ public class DistributedCacheConcurrentWritesTest {
ConfigurationBuilder distConfigBuilder = new ConfigurationBuilder();
if (clustered) {
distConfigBuilder.clustering().cacheMode(async ? CacheMode.DIST_ASYNC : CacheMode.DIST_SYNC);
- distConfigBuilder.clustering().hash().numOwners(2);
+ distConfigBuilder.clustering().hash().numOwners(1);
// Disable L1 cache
distConfigBuilder.clustering().hash().l1().enabled(false);
@@ -283,4 +219,42 @@ public class DistributedCacheConcurrentWritesTest {
return cacheManager;
}
+
+
+ public static BasicCache<String, SessionEntityWrapper<UserSessionEntity>> createRemoteCache(String nodeName) {
+ int port = ("node1".equals(nodeName)) ? 12232 : 13232;
+
+ org.infinispan.client.hotrod.configuration.ConfigurationBuilder builder = new org.infinispan.client.hotrod.configuration.ConfigurationBuilder();
+ org.infinispan.client.hotrod.configuration.Configuration cfg = builder
+ .addServer().host("localhost").port(port)
+ .version(ProtocolVersion.PROTOCOL_VERSION_26)
+ .build();
+ RemoteCacheManager mgr = new RemoteCacheManager(cfg);
+ return mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
+ }
+
+ // CLEANUP METHODS
+
+ private static void stopMgr(BasicCache cache) {
+ if (cache instanceof Cache) {
+ ((Cache) cache).getCacheManager().stop();
+ } else {
+ ((RemoteCache) cache).getRemoteCacheManager().stop();
+ }
+ }
+
+
+ private static void printStats(BasicCache cache) {
+ if (cache instanceof Cache) {
+ Cache cache1 = (Cache) cache;
+
+ JChannel channel = ((JGroupsTransport)cache1.getAdvancedCache().getRpcManager().getTransport()).getChannel();
+
+ System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 +
+ ", received messages: " + channel.getReceivedMessages());
+ } else {
+ Map<String, String> stats = ((RemoteCache) cache).stats().getStatsMap();
+ System.out.println("Stats: " + stats);
+ }
+ }
}
diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java
index 1e72aa8..8a04231 100644
--- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java
+++ b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java
@@ -19,7 +19,6 @@ package org.keycloak.models.sessions.infinispan.initializer;
import org.junit.Assert;
import org.junit.Test;
-import org.keycloak.models.cache.infinispan.UserCacheSession;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoaderContext;
import org.keycloak.storage.CacheableStorageProviderModel;
@@ -35,16 +34,16 @@ public class InitializerStateTest {
@Test
public void testOfflineLoaderContext() {
- OfflinePersistentUserSessionLoaderContext ctx = new OfflinePersistentUserSessionLoaderContext(28, 5);
+ OfflinePersistentLoaderContext ctx = new OfflinePersistentLoaderContext(28, 5);
Assert.assertEquals(ctx.getSegmentsCount(), 6);
- ctx = new OfflinePersistentUserSessionLoaderContext(19, 5);
+ ctx = new OfflinePersistentLoaderContext(19, 5);
Assert.assertEquals(ctx.getSegmentsCount(), 4);
- ctx = new OfflinePersistentUserSessionLoaderContext(20, 5);
+ ctx = new OfflinePersistentLoaderContext(20, 5);
Assert.assertEquals(ctx.getSegmentsCount(), 4);
- ctx = new OfflinePersistentUserSessionLoaderContext(21, 5);
+ ctx = new OfflinePersistentLoaderContext(21, 5);
Assert.assertEquals(ctx.getSegmentsCount(), 5);
}
@@ -78,7 +77,7 @@ public class InitializerStateTest {
@Test
public void testComputationState() {
- OfflinePersistentUserSessionLoaderContext ctx = new OfflinePersistentUserSessionLoaderContext(28, 5);
+ OfflinePersistentLoaderContext ctx = new OfflinePersistentLoaderContext(28, 5);
Assert.assertEquals(ctx.getSegmentsCount(), 6);
InitializerState state = new InitializerState(ctx.getSegmentsCount());
diff --git a/model/jpa/src/main/java/org/keycloak/connections/jpa/updater/liquibase/custom/JpaUpdate4_7_0_OfflineSessionsTimestamps.java b/model/jpa/src/main/java/org/keycloak/connections/jpa/updater/liquibase/custom/JpaUpdate4_7_0_OfflineSessionsTimestamps.java
new file mode 100644
index 0000000..2aa1b11
--- /dev/null
+++ b/model/jpa/src/main/java/org/keycloak/connections/jpa/updater/liquibase/custom/JpaUpdate4_7_0_OfflineSessionsTimestamps.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.connections.jpa.updater.liquibase.custom;
+
+import liquibase.exception.CustomChangeException;
+import liquibase.statement.core.UpdateStatement;
+import liquibase.structure.core.Table;
+import org.keycloak.common.util.Time;
+
+/**
+ * Update CREATED_ON and LAST_SESSION_REFRESH columns to current startup time
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class JpaUpdate4_7_0_OfflineSessionsTimestamps extends CustomKeycloakTask {
+
+ @Override
+ protected void generateStatementsImpl() throws CustomChangeException {
+ String offlineUserSessionsTableName = database.correctObjectName("OFFLINE_USER_SESSION", Table.class);
+
+ try {
+ int currentTime = Time.currentTime();
+
+ UpdateStatement updateStatement = new UpdateStatement(null, null, offlineUserSessionsTableName)
+ .addNewColumnValue("LAST_SESSION_REFRESH", currentTime);
+
+ statements.add(updateStatement);
+
+ confirmationMessage.append("Updated column LAST_SESSION_REFRESH in OFFLINE_USER_SESSION table with time " + currentTime);
+ } catch (Exception e) {
+ throw new CustomChangeException(getTaskId() + ": Exception when updating data from previous version", e);
+ }
+ }
+
+ @Override
+ protected String getTaskId() {
+ return "Update 4.7.0.Final";
+ }
+}
diff --git a/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java b/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java
index 587ed7e..132af92 100644
--- a/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java
+++ b/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java
@@ -18,10 +18,10 @@
package org.keycloak.models.jpa.session;
import org.jboss.logging.Logger;
+import org.keycloak.common.util.Time;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
import org.keycloak.models.KeycloakSession;
-import org.keycloak.models.ModelException;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionModel;
@@ -30,17 +30,23 @@ import org.keycloak.models.session.PersistentClientSessionModel;
import org.keycloak.models.session.PersistentUserSessionAdapter;
import org.keycloak.models.session.PersistentUserSessionModel;
import org.keycloak.models.session.UserSessionPersisterProvider;
+import org.keycloak.models.utils.SessionTimeoutHelper;
import org.keycloak.storage.StorageId;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import javax.persistence.TypedQuery;
+
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -63,6 +69,7 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
PersistentUserSessionEntity entity = new PersistentUserSessionEntity();
entity.setUserSessionId(model.getUserSessionId());
+ entity.setCreatedOn(model.getStarted());
entity.setRealmId(adapter.getRealm().getId());
entity.setUserId(adapter.getUser().getId());
String offlineStr = offlineToString(offline);
@@ -100,26 +107,6 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
}
@Override
- public void updateUserSession(UserSessionModel userSession, boolean offline) {
- PersistentUserSessionAdapter adapter;
- if (userSession instanceof PersistentUserSessionAdapter) {
- adapter = (PersistentUserSessionAdapter) userSession;
- } else {
- adapter = new PersistentUserSessionAdapter(userSession);
- }
-
- PersistentUserSessionModel model = adapter.getUpdatedModel();
-
- String offlineStr = offlineToString(offline);
- PersistentUserSessionEntity entity = em.find(PersistentUserSessionEntity.class, new PersistentUserSessionEntity.Key(userSession.getId(), offlineStr));
- if (entity == null) {
- throw new ModelException("UserSession with ID " + userSession.getId() + ", offline: " + offline + " not found");
- }
- entity.setLastSessionRefresh(model.getLastSessionRefresh());
- entity.setData(model.getData());
- }
-
- @Override
public void removeUserSession(String userSessionId, boolean offline) {
String offlineStr = offlineToString(offline);
@@ -200,7 +187,6 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
.setParameter("externalClientId", clientStorageId.getExternalId())
.executeUpdate();
}
- num = em.createNamedQuery("deleteDetachedUserSessions").executeUpdate();
}
@Override
@@ -213,24 +199,53 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
num = em.createNamedQuery("deleteUserSessionsByUser").setParameter("userId", userId).executeUpdate();
}
+
@Override
- public void clearDetachedUserSessions() {
- int num = em.createNamedQuery("deleteDetachedClientSessions").executeUpdate();
- num = em.createNamedQuery("deleteDetachedUserSessions").executeUpdate();
+ public void updateLastSessionRefreshes(RealmModel realm, int lastSessionRefresh, Collection<String> userSessionIds, boolean offline) {
+ String offlineStr = offlineToString(offline);
+
+ int us = em.createNamedQuery("updateUserSessionLastSessionRefresh")
+ .setParameter("lastSessionRefresh", lastSessionRefresh)
+ .setParameter("realmId", realm.getId())
+ .setParameter("offline", offlineStr)
+ .setParameter("userSessionIds", userSessionIds)
+ .executeUpdate();
+
+ logger.debugf("Updated lastSessionRefresh of %d user sessions in realm '%s'", us, realm.getName());
}
@Override
- public void updateAllTimestamps(int time) {
- int num = em.createNamedQuery("updateClientSessionsTimestamps").setParameter("timestamp", time).executeUpdate();
- num = em.createNamedQuery("updateUserSessionsTimestamps").setParameter("lastSessionRefresh", time).executeUpdate();
+ public void removeExpired(RealmModel realm) {
+ int expiredOffline = Time.currentTime() - realm.getOfflineSessionIdleTimeout() - SessionTimeoutHelper.PERIODIC_CLEANER_IDLE_TIMEOUT_WINDOW_SECONDS;
+
+ String offlineStr = offlineToString(true);
+
+ logger.tracef("Trigger removing expired user sessions for realm '%s'", realm.getName());
+
+ int cs = em.createNamedQuery("deleteExpiredClientSessions")
+ .setParameter("realmId", realm.getId())
+ .setParameter("lastSessionRefresh", expiredOffline)
+ .setParameter("offline", offlineStr)
+ .executeUpdate();
+
+ int us = em.createNamedQuery("deleteExpiredUserSessions")
+ .setParameter("realmId", realm.getId())
+ .setParameter("lastSessionRefresh", expiredOffline)
+ .setParameter("offline", offlineStr)
+ .executeUpdate();
+
+ logger.debugf("Removed %d expired user sessions and %d expired client sessions in realm '%s'", us, cs, realm.getName());
+
}
@Override
- public List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline) {
+ public List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline, int lastCreatedOn, String lastUserSessionId) {
String offlineStr = offlineToString(offline);
TypedQuery<PersistentUserSessionEntity> query = em.createNamedQuery("findUserSessions", PersistentUserSessionEntity.class);
query.setParameter("offline", offlineStr);
+ query.setParameter("lastCreatedOn", lastCreatedOn);
+ query.setParameter("lastSessionId", lastUserSessionId);
if (firstResult != -1) {
query.setFirstResult(firstResult);
@@ -239,26 +254,14 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
query.setMaxResults(maxResults);
}
- List<PersistentUserSessionEntity> results = query.getResultList();
- List<UserSessionModel> result = new ArrayList<>();
- List<String> userSessionIds = new ArrayList<>();
- for (PersistentUserSessionEntity entity : results) {
- RealmModel realm = session.realms().getRealm(entity.getRealmId());
- try {
- UserModel user = session.users().getUserById(entity.getUserId(), realm);
- // Case when user was deleted in the meantime
- if (user == null) {
- onUserRemoved(realm, entity.getUserId());
- return loadUserSessions(firstResult, maxResults, offline);
- }
- } catch (Exception e) {
- logger.debugv(e,"Failed to load user with id {0}", entity.getUserId());
- }
+ List<PersistentUserSessionAdapter> result = query.getResultStream()
+ .map(this::toAdapter)
+ .collect(Collectors.toList());
+ Map<String, PersistentUserSessionAdapter> sessionsById = result.stream()
+ .collect(Collectors.toMap(UserSessionModel::getId, Function.identity()));
- result.add(toAdapter(realm, entity));
- userSessionIds.add(entity.getUserSessionId());
- }
+ Set<String> userSessionIds = sessionsById.keySet();
Set<String> removedClientUUIDs = new HashSet<>();
@@ -268,28 +271,17 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
query2.setParameter("offline", offlineStr);
List<PersistentClientSessionEntity> clientSessions = query2.getResultList();
- // Assume both userSessions and clientSessions ordered by userSessionId
- int j = 0;
- for (UserSessionModel ss : result) {
- PersistentUserSessionAdapter userSession = (PersistentUserSessionAdapter) ss;
- Map<String, AuthenticatedClientSessionModel> currentClientSessions = userSession.getAuthenticatedClientSessions(); // This is empty now and we want to fill it
-
- boolean next = true;
- while (next && j < clientSessions.size()) {
- PersistentClientSessionEntity clientSession = clientSessions.get(j);
- if (clientSession.getUserSessionId().equals(userSession.getId())) {
- PersistentAuthenticatedClientSessionAdapter clientSessAdapter = toAdapter(userSession.getRealm(), userSession, clientSession);
-
- // Case when client was removed in the meantime
- if (clientSessAdapter.getClient() == null) {
- removedClientUUIDs.add(clientSession.getClientId());
- } else {
- currentClientSessions.put(clientSession.getClientId(), clientSessAdapter);
- }
- j++;
- } else {
- next = false;
- }
+ for (PersistentClientSessionEntity clientSession : clientSessions) {
+ PersistentUserSessionAdapter userSession = sessionsById.get(clientSession.getUserSessionId());
+
+ PersistentAuthenticatedClientSessionAdapter clientSessAdapter = toAdapter(userSession.getRealm(), userSession, clientSession);
+ Map<String, AuthenticatedClientSessionModel> currentClientSessions = userSession.getAuthenticatedClientSessions();
+
+ // Case when client was removed in the meantime
+ if (clientSessAdapter.getClient() == null) {
+ removedClientUUIDs.add(clientSession.getClientId());
+ } else {
+ currentClientSessions.put(clientSession.getClientId(), clientSessAdapter);
}
}
}
@@ -298,12 +290,18 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
onClientRemoved(clientUUID);
}
- return result;
+ return (List) result;
+ }
+
+ private PersistentUserSessionAdapter toAdapter(PersistentUserSessionEntity entity) {
+ RealmModel realm = session.realms().getRealm(entity.getRealmId());
+ return toAdapter(realm, entity);
}
private PersistentUserSessionAdapter toAdapter(RealmModel realm, PersistentUserSessionEntity entity) {
PersistentUserSessionModel model = new PersistentUserSessionModel();
model.setUserSessionId(entity.getUserSessionId());
+ model.setStarted(entity.getCreatedOn());
model.setLastSessionRefresh(entity.getLastSessionRefresh());
model.setData(entity.getData());
model.setOffline(offlineFromString(entity.getOffline()));
diff --git a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java
index 44c3188..5703f18 100644
--- a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java
+++ b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java
@@ -36,10 +36,9 @@ import java.io.Serializable;
@NamedQuery(name="deleteClientSessionsByClientStorageProvider", query="delete from PersistentClientSessionEntity sess where sess.clientStorageProvider = :clientStorageProvider"),
@NamedQuery(name="deleteClientSessionsByUser", query="delete from PersistentClientSessionEntity sess where sess.userSessionId IN (select u.userSessionId from PersistentUserSessionEntity u where u.userId = :userId)"),
@NamedQuery(name="deleteClientSessionsByUserSession", query="delete from PersistentClientSessionEntity sess where sess.userSessionId = :userSessionId and sess.offline = :offline"),
- @NamedQuery(name="deleteDetachedClientSessions", query="delete from PersistentClientSessionEntity sess where NOT EXISTS (select u.userSessionId from PersistentUserSessionEntity u where u.userSessionId = sess.userSessionId )"),
+ @NamedQuery(name="deleteExpiredClientSessions", query="delete from PersistentClientSessionEntity sess where sess.userSessionId IN (select u.userSessionId from PersistentUserSessionEntity u where u.realmId = :realmId AND u.offline = :offline AND u.lastSessionRefresh < :lastSessionRefresh)"),
@NamedQuery(name="findClientSessionsByUserSession", query="select sess from PersistentClientSessionEntity sess where sess.userSessionId=:userSessionId and sess.offline = :offline"),
- @NamedQuery(name="findClientSessionsByUserSessions", query="select sess from PersistentClientSessionEntity sess where sess.offline = :offline and sess.userSessionId IN (:userSessionIds) order by sess.userSessionId"),
- @NamedQuery(name="updateClientSessionsTimestamps", query="update PersistentClientSessionEntity c set timestamp = :timestamp"),
+ @NamedQuery(name="findClientSessionsByUserSessions", query="select sess from PersistentClientSessionEntity sess where sess.offline = :offline and sess.userSessionId IN (:userSessionIds) order by sess.userSessionId")
})
@Table(name="OFFLINE_CLIENT_SESSION")
@Entity
diff --git a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java
index cc35be2..ade3ea0 100644
--- a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java
+++ b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java
@@ -34,10 +34,13 @@ import java.io.Serializable;
@NamedQueries({
@NamedQuery(name="deleteUserSessionsByRealm", query="delete from PersistentUserSessionEntity sess where sess.realmId = :realmId"),
@NamedQuery(name="deleteUserSessionsByUser", query="delete from PersistentUserSessionEntity sess where sess.userId = :userId"),
- @NamedQuery(name="deleteDetachedUserSessions", query="delete from PersistentUserSessionEntity sess where NOT EXISTS (select c.userSessionId from PersistentClientSessionEntity c where c.userSessionId = sess.userSessionId)"),
+ @NamedQuery(name="deleteExpiredUserSessions", query="delete from PersistentUserSessionEntity sess where sess.realmId = :realmId AND sess.offline = :offline AND sess.lastSessionRefresh < :lastSessionRefresh"),
+ @NamedQuery(name="updateUserSessionLastSessionRefresh", query="update PersistentUserSessionEntity sess set lastSessionRefresh = :lastSessionRefresh where sess.realmId = :realmId" +
+ " AND sess.offline = :offline AND sess.userSessionId IN (:userSessionIds)"),
@NamedQuery(name="findUserSessionsCount", query="select count(sess) from PersistentUserSessionEntity sess where sess.offline = :offline"),
- @NamedQuery(name="findUserSessions", query="select sess from PersistentUserSessionEntity sess where sess.offline = :offline order by sess.userSessionId"),
- @NamedQuery(name="updateUserSessionsTimestamps", query="update PersistentUserSessionEntity c set lastSessionRefresh = :lastSessionRefresh"),
+ @NamedQuery(name="findUserSessions", query="select sess from PersistentUserSessionEntity sess where sess.offline = :offline" +
+ " AND (sess.createdOn > :lastCreatedOn OR (sess.createdOn = :lastCreatedOn AND sess.userSessionId > :lastSessionId))" +
+ " order by sess.createdOn,sess.userSessionId")
})
@Table(name="OFFLINE_USER_SESSION")
@@ -55,6 +58,9 @@ public class PersistentUserSessionEntity {
@Column(name="USER_ID", length = 255)
protected String userId;
+ @Column(name = "CREATED_ON")
+ protected int createdOn;
+
@Column(name = "LAST_SESSION_REFRESH")
protected int lastSessionRefresh;
@@ -90,6 +96,14 @@ public class PersistentUserSessionEntity {
this.userId = userId;
}
+ public int getCreatedOn() {
+ return createdOn;
+ }
+
+ public void setCreatedOn(int createdOn) {
+ this.createdOn = createdOn;
+ }
+
public int getLastSessionRefresh() {
return lastSessionRefresh;
}
diff --git a/model/jpa/src/main/resources/META-INF/jpa-changelog-4.7.0.xml b/model/jpa/src/main/resources/META-INF/jpa-changelog-4.7.0.xml
index f8c9693..43a9871 100644
--- a/model/jpa/src/main/resources/META-INF/jpa-changelog-4.7.0.xml
+++ b/model/jpa/src/main/resources/META-INF/jpa-changelog-4.7.0.xml
@@ -19,9 +19,30 @@
<changeSet author="sguilhen@redhat.com" id="4.7.0-KEYCLOAK-1267">
<addColumn tableName="REALM">
- <column name="SSO_MAX_LIFESPAN_REMEMBER_ME" type="INT"/>
- <column name="SSO_IDLE_TIMEOUT_REMEMBER_ME" type="INT"/>
+ <column name="SSO_MAX_LIFESPAN_REMEMBER_ME" type="INT" defaultValueNumeric="0"/>
+ <column name="SSO_IDLE_TIMEOUT_REMEMBER_ME" type="INT" defaultValueNumeric="0"/>
</addColumn>
</changeSet>
+
+ <changeSet author="keycloak" id="4.7.0-KEYCLOAK-7275">
+ <renameColumn tableName="OFFLINE_USER_SESSION" oldColumnName="LAST_SESSION_REFRESH" newColumnName="CREATED_ON" columnDataType="INT" />
+
+ <addNotNullConstraint tableName="OFFLINE_USER_SESSION" columnName="CREATED_ON" columnDataType="INT" defaultNullValue="0" />
+
+ <addColumn tableName="OFFLINE_USER_SESSION">
+ <column name="LAST_SESSION_REFRESH" type="INT" defaultValueNumeric="0">
+ <constraints nullable="false"/>
+ </column>
+ </addColumn>
+
+ <!--Update "lastSessionRefresh" to the current time when migrating from previous version-->
+ <customChange class="org.keycloak.connections.jpa.updater.liquibase.custom.JpaUpdate4_7_0_OfflineSessionsTimestamps"/>
+
+ <createIndex indexName="IDX_OFFLINE_USS_CREATEDON" tableName="OFFLINE_USER_SESSION">
+ <column name="CREATED_ON" type="INT"/>
+ </createIndex>
+
+ </changeSet>
+
</databaseChangeLog>
diff --git a/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java b/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java
index ac1c224..61d563d 100755
--- a/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java
+++ b/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java
@@ -19,6 +19,7 @@ package org.keycloak.models;
import org.keycloak.provider.Provider;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -63,7 +64,7 @@ public interface UserSessionProvider extends Provider {
void removeUserSession(RealmModel realm, UserSessionModel session);
void removeUserSessions(RealmModel realm, UserModel user);
- /** Implementation should propagate removal of expired userSessions to userSessionPersister too **/
+ /** Implementation doesn't need to propagate removal of expired userSessions to userSessionPersister. Cleanup on persister will be called separately **/
void removeExpired(RealmModel realm);
void removeUserSessions(RealmModel realm);
@@ -89,8 +90,8 @@ public interface UserSessionProvider extends Provider {
long getOfflineSessionsCount(RealmModel realm, ClientModel client);
List<UserSessionModel> getOfflineUserSessions(RealmModel realm, ClientModel client, int first, int max);
- /** Triggered by persister during pre-load. It optionally imports authenticatedClientSessions too if requested. Otherwise the imported UserSession will have empty list of AuthenticationSessionModel **/
- UserSessionModel importUserSession(UserSessionModel persistentUserSession, boolean offline, boolean importAuthenticatedClientSessions);
+ /** Triggered by persister during pre-load. It imports authenticatedClientSessions too **/
+ void importUserSessions(Collection<UserSessionModel> persistentUserSessions, boolean offline);
void close();
diff --git a/server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java b/server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java
index 10b28a2..f818f16 100644
--- a/server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java
+++ b/server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java
@@ -26,6 +26,7 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionModel;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -75,11 +76,6 @@ public class DisabledUserSessionPersisterProvider implements UserSessionPersiste
}
@Override
- public void updateUserSession(UserSessionModel userSession, boolean offline) {
-
- }
-
- @Override
public void removeUserSession(String userSessionId, boolean offline) {
}
@@ -105,17 +101,17 @@ public class DisabledUserSessionPersisterProvider implements UserSessionPersiste
}
@Override
- public void clearDetachedUserSessions() {
+ public void updateLastSessionRefreshes(RealmModel realm, int lastSessionRefresh, Collection<String> userSessionIds, boolean offline) {
}
@Override
- public void updateAllTimestamps(int time) {
+ public void removeExpired(RealmModel realm) {
}
@Override
- public List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline) {
+ public List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline, int lastCreatedOn, String lastUserSessionId) {
return Collections.emptyList();
}
diff --git a/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java b/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java
index 61d5818..7fcec1c 100644
--- a/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java
+++ b/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java
@@ -54,12 +54,12 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
data.setIpAddress(other.getIpAddress());
data.setNotes(other.getNotes());
data.setRememberMe(other.isRememberMe());
- data.setStarted(other.getStarted());
if (other.getState() != null) {
data.setState(other.getState().toString());
}
this.model = new PersistentUserSessionModel();
+ this.model.setStarted(other.getStarted());
this.model.setUserSessionId(other.getId());
this.model.setLastSessionRefresh(other.getLastSessionRefresh());
@@ -157,7 +157,7 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
@Override
public int getStarted() {
- return getData().getStarted();
+ return model.getStarted();
}
@Override
@@ -274,6 +274,7 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
@JsonProperty("rememberMe")
private boolean rememberMe;
+ // TODO: Keeping those just for backwards compatibility. @JsonIgnoreProperties doesn't work on Wildfly - probably due to classloading issues
@JsonProperty("started")
private int started;
@@ -323,10 +324,12 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
this.rememberMe = rememberMe;
}
+ @Deprecated
public int getStarted() {
return started;
}
+ @Deprecated
public void setStarted(int started) {
this.started = started;
}
diff --git a/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java b/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java
index ced1768..508b817 100644
--- a/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java
+++ b/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java
@@ -23,6 +23,7 @@ package org.keycloak.models.session;
public class PersistentUserSessionModel {
private String userSessionId;
+ private int started;
private int lastSessionRefresh;
private boolean offline;
private String data;
@@ -35,6 +36,14 @@ public class PersistentUserSessionModel {
this.userSessionId = userSessionId;
}
+ public int getStarted() {
+ return started;
+ }
+
+ public void setStarted(int started) {
+ this.started = started;
+ }
+
public int getLastSessionRefresh() {
return lastSessionRefresh;
}
diff --git a/server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java b/server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java
index ba5a595..11c7567 100644
--- a/server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java
+++ b/server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java
@@ -24,6 +24,7 @@ import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.provider.Provider;
+import java.util.Collection;
import java.util.List;
/**
@@ -37,8 +38,6 @@ public interface UserSessionPersisterProvider extends Provider {
// Assuming that corresponding userSession is already persisted
void createClientSession(AuthenticatedClientSessionModel clientSession, boolean offline);
- void updateUserSession(UserSessionModel userSession, boolean offline);
-
// Called during logout (for online session) or during periodic expiration. It will remove all corresponding clientSessions too
void removeUserSession(String userSessionId, boolean offline);
@@ -49,14 +48,14 @@ public interface UserSessionPersisterProvider extends Provider {
void onClientRemoved(RealmModel realm, ClientModel client);
void onUserRemoved(RealmModel realm, UserModel user);
- // Called at startup to remove userSessions without any clientSession
- void clearDetachedUserSessions();
+ // Bulk update of lastSessionRefresh of all specified userSessions to the given value.
+ void updateLastSessionRefreshes(RealmModel realm, int lastSessionRefresh, Collection<String> userSessionIds, boolean offline);
- // Update "lastSessionRefresh" of all userSessions and "timestamp" of all clientSessions to specified time
- void updateAllTimestamps(int time);
+ // Remove userSessions and clientSessions, which are expired
+ void removeExpired(RealmModel realm);
// Called during startup. For each userSession, it loads also clientSessions
- List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline);
+ List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline, int lastCreatedOn, String lastUserSessionId);
int getUserSessionsCount(boolean offline);
diff --git a/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java b/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java
index fc3c584..1311635 100755
--- a/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java
+++ b/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java
@@ -309,7 +309,7 @@ public class TokenManager {
if (clientSession.getCurrentRefreshToken() != null &&
!refreshToken.getId().equals(clientSession.getCurrentRefreshToken()) &&
refreshToken.getIssuedAt() < clientSession.getTimestamp() &&
- clusterStartupTime != clientSession.getTimestamp()) {
+ clusterStartupTime <= clientSession.getTimestamp()) {
throw new OAuthErrorException(OAuthErrorException.INVALID_GRANT, "Stale token");
}
diff --git a/services/src/main/java/org/keycloak/services/scheduled/ClearExpiredUserSessions.java b/services/src/main/java/org/keycloak/services/scheduled/ClearExpiredUserSessions.java
index e61969a..4ebd599 100755
--- a/services/src/main/java/org/keycloak/services/scheduled/ClearExpiredUserSessions.java
+++ b/services/src/main/java/org/keycloak/services/scheduled/ClearExpiredUserSessions.java
@@ -20,6 +20,7 @@ package org.keycloak.services.scheduled;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserSessionProvider;
+import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.timer.ScheduledTask;
/**
@@ -35,6 +36,7 @@ public class ClearExpiredUserSessions implements ScheduledTask {
for (RealmModel realm : session.realms().getRealms()) {
sessions.removeExpired(realm);
session.authenticationSessions().removeExpired(realm);
+ session.getProvider(UserSessionPersisterProvider.class).removeExpired(realm);
}
}
diff --git a/testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java b/testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java
index fca6e02..7e93627 100644
--- a/testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java
+++ b/testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java
@@ -44,7 +44,7 @@ import org.keycloak.models.UserCredentialModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserProvider;
import org.keycloak.models.UserSessionModel;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStoreFactory;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
import org.keycloak.models.utils.ModelToRepresentation;
import org.keycloak.protocol.oidc.OIDCLoginProtocol;
import org.keycloak.protocol.oidc.mappers.AudienceProtocolMapper;
@@ -695,8 +695,8 @@ public class TestingResourceProvider implements RealmResourceProvider {
@Produces(MediaType.APPLICATION_JSON)
public Response suspendPeriodicTasks() {
suspendTask(ClearExpiredUserSessions.TASK_NAME);
- suspendTask(LastSessionRefreshStoreFactory.LSR_PERIODIC_TASK_NAME);
- suspendTask(LastSessionRefreshStoreFactory.LSR_OFFLINE_PERIODIC_TASK_NAME);
+ suspendTask(CrossDCLastSessionRefreshStoreFactory.LSR_PERIODIC_TASK_NAME);
+ suspendTask(CrossDCLastSessionRefreshStoreFactory.LSR_OFFLINE_PERIODIC_TASK_NAME);
return Response.noContent().build();
}
diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java
index f10ca46..51eb60c 100644
--- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java
+++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java
@@ -31,8 +31,8 @@ import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStore;
-import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStoreFactory;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
+import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
import org.keycloak.models.sessions.infinispan.changes.sessions.SessionData;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.representations.idm.RealmRepresentation;
@@ -69,7 +69,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
@Override
public void run(KeycloakSession session) {
- LastSessionRefreshStore customStore = createStoreInstance(session, 1000000, 1000);
+ CrossDCLastSessionRefreshStore customStore = createStoreInstance(session, 1000000, 1000);
System.out.println("sss");
int lastSessionRefresh = Time.currentTime();
@@ -113,7 +113,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
@Override
public void run(KeycloakSession session) {
// Long timer interval. No message due the timer wasn't executed
- LastSessionRefreshStore customStore1 = createStoreInstance(session, 100000, 10);
+ CrossDCLastSessionRefreshStore customStore1 = createStoreInstance(session, 100000, 10);
Time.setOffset(100);
try {
@@ -124,7 +124,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
Assert.assertEquals(0, counter.get());
// Short timer interval 10 ms. 1 message due the interval is executed and lastRun was in the past due to Time.setOffset
- LastSessionRefreshStore customStore2 = createStoreInstance(session, 10, 10);
+ CrossDCLastSessionRefreshStore customStore2 = createStoreInstance(session, 10, 10);
Time.setOffset(200);
Retry.execute(() -> {
@@ -152,12 +152,12 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
AtomicInteger counter = new AtomicInteger();
- LastSessionRefreshStore createStoreInstance(KeycloakSession session, long timerIntervalMs, int maxIntervalBetweenMessagesSeconds) {
- LastSessionRefreshStoreFactory factory = new LastSessionRefreshStoreFactory() {
+ CrossDCLastSessionRefreshStore createStoreInstance(KeycloakSession session, long timerIntervalMs, int maxIntervalBetweenMessagesSeconds) {
+ CrossDCLastSessionRefreshStoreFactory factory = new CrossDCLastSessionRefreshStoreFactory() {
@Override
- protected LastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
- return new LastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey) {
+ protected CrossDCLastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
+ return new CrossDCLastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey) {
@Override
protected void sendMessage(KeycloakSession kcSession, Map<String, SessionData> refreshesToSend) {
diff --git a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java
index 172dd3f..a95e27f 100644
--- a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java
+++ b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java
@@ -97,9 +97,9 @@ public class UserSessionInitializerTest {
List<UserSessionModel> loadedSessions = session.sessions().getOfflineUserSessions(realm, testApp, 0, 10);
UserSessionProviderTest.assertSessions(loadedSessions, origSessions);
- UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, serverStartTime, "test-app", "third-party");
- UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[1].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.2", started, serverStartTime, "test-app");
- UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[2].getId(), session.users().getUserByUsername("user2", realm), "127.0.0.3", started, serverStartTime, "test-app");
+ UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, started, "test-app", "third-party");
+ UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[1].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started, "test-app");
+ UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[2].getId(), session.users().getUserByUsername("user2", realm), "127.0.0.3", started, started, "test-app");
}
@@ -130,7 +130,7 @@ public class UserSessionInitializerTest {
List<UserSessionModel> loadedSessions = session.sessions().getOfflineUserSessions(realm, thirdparty, 0, 10);
Assert.assertEquals(1, loadedSessions.size());
- UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, serverStartTime, "third-party");
+ UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, started, "third-party");
// Revert client
realm.addClient("test-app");
diff --git a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java
index 8a3ffa9..7c10701 100644
--- a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java
+++ b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java
@@ -22,6 +22,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.Time;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
@@ -37,8 +38,12 @@ import org.keycloak.models.UserManager;
import org.keycloak.testsuite.rule.KeycloakRule;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@@ -112,51 +117,6 @@ public class UserSessionPersisterProviderTest {
assertSessionLoaded(loadedSessions, origSessions[2].getId(), session.users().getUserByUsername("user2", realm), "127.0.0.3", started, started, "test-app");
}
- @Test
- public void testUpdateTimestamps() {
- // Create some sessions in infinispan
- int started = Time.currentTime();
- UserSessionModel[] origSessions = createSessions();
-
- resetSession();
-
- // Persist 3 created userSessions and clientSessions as offline
- ClientModel testApp = realm.getClientByClientId("test-app");
- List<UserSessionModel> userSessions = session.sessions().getUserSessions(realm, testApp);
- for (UserSessionModel userSession : userSessions) {
- persistUserSession(userSession, true);
- }
-
- // Persist 1 online session
- UserSessionModel userSession = session.sessions().getUserSession(realm, origSessions[0].getId());
- persistUserSession(userSession, false);
-
- resetSession();
-
- // update timestamps
- int newTime = started + 50;
- persister.updateAllTimestamps(newTime);
-
- // Assert online session
- List<UserSessionModel> loadedSessions = loadPersistedSessionsPaginated(false, 1, 1, 1);
- Assert.assertEquals(2, assertTimestampsUpdated(loadedSessions, newTime));
-
- // Assert offline sessions
- loadedSessions = loadPersistedSessionsPaginated(true, 2, 2, 3);
- Assert.assertEquals(4, assertTimestampsUpdated(loadedSessions, newTime));
- }
-
- private int assertTimestampsUpdated(List<UserSessionModel> loadedSessions, int expectedTime) {
- int clientSessionsCount = 0;
- for (UserSessionModel loadedSession : loadedSessions) {
- Assert.assertEquals(expectedTime, loadedSession.getLastSessionRefresh());
- for (AuthenticatedClientSessionModel clientSession : loadedSession.getAuthenticatedClientSessions().values()) {
- Assert.assertEquals(expectedTime, clientSession.getTimestamp());
- clientSessionsCount++;
- }
- }
- return clientSessionsCount;
- }
@Test
public void testUpdateAndRemove() {
@@ -177,48 +137,30 @@ public class UserSessionPersisterProviderTest {
UserSessionModel persistedSession = loadedSessions.get(0);
UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started, "test-app");
- // Update userSession
- Time.setOffset(10);
- try {
- persistedSession.setLastSessionRefresh(Time.currentTime());
- persistedSession.setNote("foo", "bar");
- persistedSession.setState(UserSessionModel.State.LOGGED_IN);
- persister.updateUserSession(persistedSession, true);
-
- // create new clientSession
- AuthenticatedClientSessionModel clientSession = createClientSession(realm.getClientByClientId("third-party"), session.sessions().getUserSession(realm, persistedSession.getId()),
- "http://redirect", "state");
- persister.createClientSession(clientSession, true);
+ // create new clientSession
+ AuthenticatedClientSessionModel clientSession = createClientSession(realm.getClientByClientId("third-party"), session.sessions().getUserSession(realm, persistedSession.getId()),
+ "http://redirect", "state");
+ persister.createClientSession(clientSession, true);
- resetSession();
-
- // Assert session updated
- loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
- persistedSession = loadedSessions.get(0);
- UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started+10, "test-app", "third-party");
- Assert.assertEquals("bar", persistedSession.getNote("foo"));
- Assert.assertEquals(UserSessionModel.State.LOGGED_IN, persistedSession.getState());
+ resetSession();
- // Remove clientSession
- persister.removeClientSession(userSession.getId(), realm.getClientByClientId("third-party").getId(), true);
+ // Remove clientSession
+ persister.removeClientSession(userSession.getId(), realm.getClientByClientId("third-party").getId(), true);
- resetSession();
+ resetSession();
- // Assert clientSession removed
- loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
- persistedSession = loadedSessions.get(0);
- UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started + 10, "test-app");
+ // Assert clientSession removed
+ loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
+ persistedSession = loadedSessions.get(0);
+ UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started , "test-app");
- // Remove userSession
- persister.removeUserSession(persistedSession.getId(), true);
+ // Remove userSession
+ persister.removeUserSession(persistedSession.getId(), true);
- resetSession();
+ resetSession();
- // Assert nothing found
- loadPersistedSessionsPaginated(true, 10, 0, 0);
- } finally {
- Time.setOffset(0);
- }
+ // Assert nothing found
+ loadPersistedSessionsPaginated(true, 10, 0, 0);
}
@Test
@@ -302,8 +244,8 @@ public class UserSessionPersisterProviderTest {
resetSession();
- // Assert nothing loaded - userSession was removed as well because it was last userSession
- loadPersistedSessionsPaginated(true, 10, 0, 0);
+ // Assert loading still works - last userSession is still there, but no clientSession on it
+ loadPersistedSessionsPaginated(true, 10, 1, 1);
// Cleanup
realmMgr = new RealmManager(session);
@@ -340,23 +282,108 @@ public class UserSessionPersisterProviderTest {
UserSessionModel persistedSession = loadedSessions.get(0);
UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user2", realm), "127.0.0.3", started, started, "test-app");
- // KEYCLOAK-2431 Assert that userSessionPersister is resistent even to situation, when users are deleted "directly"
+ // KEYCLOAK-2431 Assert that userSessionPersister is resistent even to situation, when users are deleted "directly".
+ // No exception will happen. However session will be still there
UserModel user2 = session.users().getUserByUsername("user2", realm);
session.users().removeUser(realm, user2);
- loadedSessions = loadPersistedSessionsPaginated(true, 10, 0, 0);
+ loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
+ // Cleanup
+ UserSessionModel userSession = loadedSessions.get(0);
+ session.sessions().removeUserSession(realm, userSession);
+ persister.removeUserSession(userSession.getId(), userSession.isOffline());
}
// KEYCLOAK-1999
@Test
public void testNoSessions() {
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
- List<UserSessionModel> sessions = persister.loadUserSessions(0, 1, true);
+ List<UserSessionModel> sessions = persister.loadUserSessions(0, 1, true, 0, "abc");
Assert.assertEquals(0, sessions.size());
}
+ @Test
+ public void testMoreSessions() {
+ // Create 10 userSessions - each having 1 clientSession
+ List<UserSessionModel> userSessions = new ArrayList<>();
+ UserModel user = session.users().getUserByUsername("user1", realm);
+
+ for (int i=0 ; i<20 ; i++) {
+ // Having different offsets for each session (to ensure that lastSessionRefresh is also different)
+ Time.setOffset(i);
+
+ UserSessionModel userSession = session.sessions().createUserSession(realm, user, "user1", "127.0.0.1", "form", true, null, null);
+ createClientSession(realm.getClientByClientId("test-app"), userSession, "http://redirect", "state");
+ userSessions.add(userSession);
+ }
+
+ resetSession();
+
+ for (UserSessionModel userSession : userSessions) {
+ UserSessionModel userSession2 = session.sessions().getUserSession(realm, userSession.getId());
+ persistUserSession(userSession2, true);
+ }
+
+ resetSession();
+
+ List<UserSessionModel> loadedSessions = loadPersistedSessionsPaginated(true, 2, 10, 20);
+ user = session.users().getUserByUsername("user1", realm);
+ ClientModel testApp = realm.getClientByClientId("test-app");
+
+ for (UserSessionModel loadedSession : loadedSessions) {
+ assertEquals(user.getId(), loadedSession.getUser().getId());
+ assertEquals("127.0.0.1", loadedSession.getIpAddress());
+ assertEquals(user.getUsername(), loadedSession.getLoginUsername());
+
+ assertEquals(1, loadedSession.getAuthenticatedClientSessions().size());
+ assertTrue(loadedSession.getAuthenticatedClientSessions().containsKey(testApp.getId()));
+ }
+ }
+
+
+ @Test
+ public void testExpiredSessions() {
+ // Create some sessions in infinispan
+ int started = Time.currentTime();
+ UserSessionModel[] origSessions = createSessions();
+
+ resetSession();
+
+ // Persist 2 offline sessions of 2 users
+ UserSessionModel userSession1 = session.sessions().getUserSession(realm, origSessions[1].getId());
+ UserSessionModel userSession2 = session.sessions().getUserSession(realm, origSessions[2].getId());
+ persistUserSession(userSession1, true);
+ persistUserSession(userSession2, true);
+
+ resetSession();
+
+ // Update one of the sessions with lastSessionRefresh of 20 days ahead
+ int lastSessionRefresh = Time.currentTime() + 1728000;
+ persister.updateLastSessionRefreshes(realm, lastSessionRefresh, Collections.singleton(userSession1.getId()), true);
+
+ resetSession();
+
+ // Increase time offset - 40 days
+ Time.setOffset(3456000);
+ try {
+ // Run expiration thread
+ persister.removeExpired(realm);
+
+ // Test the updated session is still in persister. Not updated session is not there anymore
+ List<UserSessionModel> loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
+ UserSessionModel persistedSession = loadedSessions.get(0);
+ UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, lastSessionRefresh, "test-app");
+
+ } finally {
+ // Cleanup
+ Time.setOffset(0);
+ }
+
+ }
+
+
private AuthenticatedClientSessionModel createClientSession(ClientModel client, UserSessionModel userSession, String redirect, String state) {
AuthenticatedClientSessionModel clientSession = session.sessions().createClientSession(realm, client, userSession);
clientSession.setRedirectUri(redirect);
@@ -407,19 +434,32 @@ public class UserSessionPersisterProviderTest {
private List<UserSessionModel> loadPersistedSessionsPaginated(boolean offline, int sessionsPerPage, int expectedPageCount, int expectedSessionsCount) {
int count = persister.getUserSessionsCount(offline);
- int start = 0;
+
int pageCount = 0;
boolean next = true;
List<UserSessionModel> result = new ArrayList<>();
- while (next && start < count) {
- List<UserSessionModel> sess = persister.loadUserSessions(start, sessionsPerPage, offline);
- if (sess.size() == 0) {
+ int lastCreatedOn = 0;
+ String lastSessionId = "abc";
+
+ while (next) {
+ List<UserSessionModel> sess = persister.loadUserSessions(0, sessionsPerPage, offline, lastCreatedOn, lastSessionId);
+
+ if (sess.size() < sessionsPerPage) {
next = false;
+
+ // We had at least some session
+ if (sess.size() > 0) {
+ pageCount++;
+ }
} else {
pageCount++;
- start += sess.size();
- result.addAll(sess);
+
+ UserSessionModel lastSession = sess.get(sess.size() - 1);
+ lastCreatedOn = lastSession.getStarted();
+ lastSessionId = lastSession.getId();
}
+
+ result.addAll(sess);
}
Assert.assertEquals(pageCount, expectedPageCount);
diff --git a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java
index 4f3ca20..381de75 100644
--- a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java
+++ b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java
@@ -363,24 +363,45 @@ public class UserSessionProviderOfflineTest {
// sessions are in persister too
Assert.assertEquals(3, persister.getUserSessionsCount(true));
- // Set lastSessionRefresh to session[0] to 0
- session0.setLastSessionRefresh(0);
+ // Increase timeOffset - 5 minutes
+ Time.setOffset(300);
+ try {
- resetSession();
+ // Update lastSessionRefresh of session0. This will update lastSessionRefresh of all the sessions to DB as they were not yet updated to DB
+ session0.setLastSessionRefresh(Time.currentTime());
- session.sessions().removeExpired(realm);
+ resetSession();
- resetSession();
+ // Increase timeOffset - 20 days
+ Time.setOffset(1728000);
- // assert session0 not found now
- Assert.assertNull(session.sessions().getOfflineUserSession(realm, origSessions[0].getId()));
+ session0 = session.sessions().getOfflineUserSession(realm, origSessions[0].getId());
+ session0.setLastSessionRefresh(Time.currentTime());
- Assert.assertEquals(2, persister.getUserSessionsCount(true));
+ resetSession();
+
+ // Increase timeOffset - 40 days
+ Time.setOffset(3456000);
+
+ // Expire and ensure that all sessions despite session0 were removed
+
+ session.sessions().removeExpired(realm);
+ persister.removeExpired(realm);
+
+ resetSession();
+
+ // assert session0 is the only session found
+ Assert.assertNotNull(session.sessions().getOfflineUserSession(realm, origSessions[0].getId()));
+ Assert.assertNull(session.sessions().getOfflineUserSession(realm, origSessions[1].getId()));
+ Assert.assertNull(session.sessions().getOfflineUserSession(realm, origSessions[2].getId()));
+
+ Assert.assertEquals(1, persister.getUserSessionsCount(true));
+
+ // Expire everything and assert nothing found
+ Time.setOffset(6000000);
- // Expire everything and assert nothing found
- Time.setOffset(3000000);
- try {
session.sessions().removeExpired(realm);
+ persister.removeExpired(realm);
resetSession();
diff --git a/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/LoadPersistentSessionsCommand.java b/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/LoadPersistentSessionsCommand.java
index 0c1a506..796e3c6 100644
--- a/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/LoadPersistentSessionsCommand.java
+++ b/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/LoadPersistentSessionsCommand.java
@@ -17,9 +17,17 @@
package org.keycloak.testsuite.util.cli;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.keycloak.models.KeycloakSession;
-import org.keycloak.models.UserSessionProvider;
-import org.keycloak.models.UserSessionProviderFactory;
+import org.keycloak.models.KeycloakSessionFactory;
+import org.keycloak.models.UserSessionModel;
+import org.keycloak.models.session.UserSessionPersisterProvider;
+import org.keycloak.models.utils.KeycloakModelUtils;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@@ -33,15 +41,97 @@ public class LoadPersistentSessionsCommand extends AbstractCommand {
@Override
protected void doRunCommand(KeycloakSession session) {
- int sessionsPerSegment = getIntArg(0);
- UserSessionProviderFactory sessionProviderFactory = (UserSessionProviderFactory) sessionFactory.getProviderFactory(UserSessionProvider.class);
- sessionProviderFactory.loadPersistentSessions(sessionFactory, 10, sessionsPerSegment);
+ final int workersCount = getIntArg(0);
+ final int limit = getIntArg(1);
+ //int workersCount = 8;
+ //int limit = 64;
+
+ AtomicInteger lastCreatedOn = new AtomicInteger(0);
+ AtomicReference<String> lastSessionId = new AtomicReference<>("abc");
+
+ AtomicBoolean finished = new AtomicBoolean(false);
+ int i=0;
+
+ while (!finished.get()) {
+ if (i % 16 == 0) {
+ log.infof("Starting iteration: %s . lastCreatedOn: %d, lastSessionId: %s", i, lastCreatedOn.get(), lastSessionId.get());
+ }
+
+ i = i + workersCount;
+ List<Thread> workers = new LinkedList<>();
+ MyWorker lastWorker = null;
+
+ for (int workerId = 0 ; workerId < workersCount ; workerId++) {
+ lastWorker = new MyWorker(workerId, lastCreatedOn.get(), lastSessionId.get(), limit, sessionFactory);
+ Thread worker = new Thread(lastWorker);
+ workers.add(worker);
+ }
+
+ for (Thread worker : workers) {
+ worker.start();
+ }
+ for (Thread worker : workers) {
+ try {
+ worker.join();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ List<UserSessionModel> lastWorkerSessions = lastWorker.getLoadedSessions();
+
+ if (lastWorkerSessions.size() < limit) {
+ finished.set(true);
+ } else {
+ UserSessionModel lastSession = lastWorkerSessions.get(lastWorkerSessions.size() - 1);
+ lastCreatedOn.set(lastSession.getStarted());
+ lastSessionId.set(lastSession.getId());
+ }
+
+
+ }
log.info("All persistent sessions loaded successfully");
}
@Override
public String printUsage() {
- return super.printUsage() + " <sessions-count-per-segment>";
+ return super.printUsage() + " <workers-count (for example 8)> <limit (for example 64)>";
+ }
+
+
+ private class MyWorker implements Runnable {
+
+ private final int workerId;
+ private final int lastCreatedOn;
+ private final String lastSessionId;
+ private final int limit;
+ private final KeycloakSessionFactory sessionFactory;
+
+ private List<UserSessionModel> loadedSessions = new LinkedList<>();
+
+ public MyWorker(int workerId, int lastCreatedOn, String lastSessionId, int limit, KeycloakSessionFactory sessionFactory) {
+ this.workerId = workerId;
+ this.lastCreatedOn = lastCreatedOn;
+ this.lastSessionId = lastSessionId;
+ this.limit = limit;
+ this.sessionFactory = sessionFactory;
+ }
+
+ @Override
+ public void run() {
+ KeycloakModelUtils.runJobInTransaction(sessionFactory, (keycloakSession) -> {
+ int offset = workerId * limit;
+
+ UserSessionPersisterProvider persister = keycloakSession.getProvider(UserSessionPersisterProvider.class);
+ loadedSessions = persister.loadUserSessions(offset, limit, true, lastCreatedOn, lastSessionId);
+
+ });
+ }
+
+
+ private List<UserSessionModel> getLoadedSessions() {
+ return loadedSessions;
+ }
}
}
diff --git a/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/PersistSessionsCommand.java b/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/PersistSessionsCommand.java
index 1e4cb48..6b8dd25 100644
--- a/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/PersistSessionsCommand.java
+++ b/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/PersistSessionsCommand.java
@@ -19,6 +19,7 @@ package org.keycloak.testsuite.util.cli;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
@@ -36,6 +37,8 @@ import org.keycloak.models.utils.KeycloakModelUtils;
*/
public class PersistSessionsCommand extends AbstractCommand {
+ private AtomicInteger userCounter = new AtomicInteger();
+
@Override
public String getName() {
return "persistSessions";
@@ -75,12 +78,18 @@ public class PersistSessionsCommand extends AbstractCommand {
@Override
public void run(KeycloakSession session) {
RealmModel realm = session.realms().getRealmByName("master");
- UserModel john = session.users().getUserByUsername("admin", realm);
+
ClientModel testApp = realm.getClientByClientId("security-admin-console");
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
for (int i = 0; i < countInThisBatch; i++) {
- UserSessionModel userSession = session.sessions().createUserSession(realm, john, "john-doh@localhost", "127.0.0.2", "form", true, null, null);
+ String username = "john-" + userCounter.incrementAndGet();
+ UserModel john = session.users().getUserByUsername(username, realm);
+ if (john == null) {
+ john = session.users().addUser(realm, username);
+ }
+
+ UserSessionModel userSession = session.sessions().createUserSession(realm, john, username, "127.0.0.2", "form", true, null, null);
AuthenticatedClientSessionModel clientSession = session.sessions().createClientSession(realm, testApp, userSession);
clientSession.setRedirectUri("http://redirect");
clientSession.setNote("foo", "bar-" + i);