keycloak-uncached
Merge pull request #4385 from mposolda/ispn-clientListeners-bugs KEYCLOAK-3298 …
8/14/2017 11:46:30 AM
Changes
Details
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
index c1a8ed6..1eb58b5 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
@@ -52,6 +52,7 @@ import org.keycloak.models.sessions.infinispan.stream.Mappers;
import org.keycloak.models.sessions.infinispan.stream.SessionPredicate;
import org.keycloak.models.sessions.infinispan.stream.UserLoginFailurePredicate;
import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
+import org.keycloak.models.sessions.infinispan.util.FuturesHelper;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
import java.util.Iterator;
@@ -59,6 +60,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
@@ -396,11 +398,11 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
int expired = Time.currentTime() - realm.getSsoSessionMaxLifespan();
int expiredRefresh = Time.currentTime() - realm.getSsoSessionIdleTimeout();
+ FuturesHelper futures = new FuturesHelper();
+
// Each cluster node cleanups just local sessions, which are those owned by himself (+ few more taking l1 cache into account)
Cache<String, SessionEntityWrapper<UserSessionEntity>> localCache = CacheDecorators.localCache(sessionCache);
- int[] counter = { 0 };
-
Cache<String, SessionEntityWrapper<UserSessionEntity>> localCacheStoreIgnore = CacheDecorators.skipCacheLoaders(localCache);
// Ignore remoteStore for stream iteration. But we will invoke remoteStore for userSession removal propagate
@@ -413,14 +415,15 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
@Override
public void accept(String sessionId) {
- counter[0]++;
- tx.remove(localCache, sessionId);
+ Future future = localCache.removeAsync(sessionId);
+ futures.addTask(future);
}
});
+ futures.waitForAllToFinish();
- log.debugf("Removed %d expired user sessions for realm '%s'", counter[0], realm.getName());
+ log.debugf("Removed %d expired user sessions for realm '%s'", futures.size(), realm.getName());
}
private void removeExpiredOfflineUserSessions(RealmModel realm) {
@@ -432,7 +435,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
UserSessionPredicate predicate = UserSessionPredicate.create(realm.getId()).expired(null, expiredOffline);
- final int[] counter = { 0 };
+ FuturesHelper futures = new FuturesHelper();
Cache<String, SessionEntityWrapper<UserSessionEntity>> localCacheStoreIgnore = CacheDecorators.skipCacheLoaders(localCache);
@@ -446,8 +449,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
@Override
public void accept(UserSessionEntity userSessionEntity) {
- counter[0]++;
- tx.remove(localCache, userSessionEntity.getId());
+ Future future = localCache.removeAsync(userSessionEntity.getId());
+ futures.addTask(future);
// TODO:mposolda can be likely optimized to delete all expired at one step
persister.removeUserSession( userSessionEntity.getId(), true);
@@ -459,7 +462,9 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
}
});
- log.debugf("Removed %d expired offline user sessions for realm '%s'", counter, realm.getName());
+ futures.waitForAllToFinish();
+
+ log.debugf("Removed %d expired offline user sessions for realm '%s'", futures.size(), realm.getName());
}
@Override
@@ -475,6 +480,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
}
private void removeLocalUserSessions(String realmId, boolean offline) {
+ FuturesHelper futures = new FuturesHelper();
+
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = getCache(offline);
Cache<String, SessionEntityWrapper<UserSessionEntity>> localCache = CacheDecorators.localCache(cache);
@@ -489,11 +496,16 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
@Override
public void accept(String sessionId) {
- // Remove session from remoteCache too
- localCache.remove(sessionId);
+ // Remove session from remoteCache too. Use removeAsync for better perf
+ Future future = localCache.removeAsync(sessionId);
+ futures.addTask(future);
}
});
+
+ futures.waitForAllToFinish();
+
+ log.debugf("Removed %d sessions in realm %s. Offline: %b", (Object) futures.size(), realmId, offline);
}
@Override
@@ -529,6 +541,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
}
private void removeAllLocalUserLoginFailuresEvent(String realmId) {
+ FuturesHelper futures = new FuturesHelper();
+
Cache<LoginFailureKey, LoginFailureEntity> localCache = CacheDecorators.localCache(loginFailureCache);
Cache<LoginFailureKey, LoginFailureEntity> localCacheStoreIgnore = CacheDecorators.skipCacheLoaders(localCache);
@@ -539,9 +553,14 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
.filter(UserLoginFailurePredicate.create(realmId))
.map(Mappers.loginFailureId())
.forEach(loginFailureKey -> {
- // Remove loginFailure from remoteCache too
- localCache.remove(loginFailureKey);
+ // Remove loginFailure from remoteCache too. Use removeAsync for better perf
+ Future future = localCache.removeAsync(loginFailureKey);
+ futures.addTask(future);
});
+
+ futures.waitForAllToFinish();
+
+ log.debugf("Removed %d login failures in realm %s", futures.size(), realmId);
}
@Override
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/FuturesHelper.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/FuturesHelper.java
new file mode 100644
index 0000000..6f088dc
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/FuturesHelper.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.util;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.jboss.logging.Logger;
+
+/**
+ * Not thread-safe. Assumes tasks are added from single thread.
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class FuturesHelper {
+
+ private static final Logger log = Logger.getLogger(FuturesHelper.class);
+
+ private final Queue<Future> futures = new LinkedList<>();
+
+
+ public void addTask(Future future) {
+ this.futures.add(future);
+ }
+
+
+ public void waitForAllToFinish() {
+ for (Future future : futures) {
+ try {
+ future.get();
+ } catch (ExecutionException | InterruptedException ee) {
+ log.error("Exception when waiting for future", ee); // TODO Possibly some good mechanism to avoid swamp log with many same exceptions?
+ }
+ }
+ }
+
+
+ public int size() {
+ return futures.size();
+ }
+
+}