keycloak-uncached

Merge pull request #4385 from mposolda/ispn-clientListeners-bugs KEYCLOAK-3298

8/14/2017 11:46:30 AM

Details

diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
index c1a8ed6..1eb58b5 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
@@ -52,6 +52,7 @@ import org.keycloak.models.sessions.infinispan.stream.Mappers;
 import org.keycloak.models.sessions.infinispan.stream.SessionPredicate;
 import org.keycloak.models.sessions.infinispan.stream.UserLoginFailurePredicate;
 import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
+import org.keycloak.models.sessions.infinispan.util.FuturesHelper;
 import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
 
 import java.util.Iterator;
@@ -59,6 +60,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
@@ -396,11 +398,11 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
         int expired = Time.currentTime() - realm.getSsoSessionMaxLifespan();
         int expiredRefresh = Time.currentTime() - realm.getSsoSessionIdleTimeout();
 
+        FuturesHelper futures = new FuturesHelper();
+
         // Each cluster node cleanups just local sessions, which are those owned by himself (+ few more taking l1 cache into account)
         Cache<String, SessionEntityWrapper<UserSessionEntity>> localCache = CacheDecorators.localCache(sessionCache);
 
-        int[] counter = { 0 };
-
         Cache<String, SessionEntityWrapper<UserSessionEntity>> localCacheStoreIgnore = CacheDecorators.skipCacheLoaders(localCache);
 
         // Ignore remoteStore for stream iteration. But we will invoke remoteStore for userSession removal propagate
@@ -413,14 +415,15 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
 
                     @Override
                     public void accept(String sessionId) {
-                        counter[0]++;
-                        tx.remove(localCache, sessionId);
+                        Future future = localCache.removeAsync(sessionId);
+                        futures.addTask(future);
                     }
 
                 });
 
+        futures.waitForAllToFinish();
 
-        log.debugf("Removed %d expired user sessions for realm '%s'", counter[0], realm.getName());
+        log.debugf("Removed %d expired user sessions for realm '%s'", futures.size(), realm.getName());
     }
 
     private void removeExpiredOfflineUserSessions(RealmModel realm) {
@@ -432,7 +435,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
 
         UserSessionPredicate predicate = UserSessionPredicate.create(realm.getId()).expired(null, expiredOffline);
 
-        final int[] counter = { 0 };
+        FuturesHelper futures = new FuturesHelper();
 
         Cache<String, SessionEntityWrapper<UserSessionEntity>> localCacheStoreIgnore = CacheDecorators.skipCacheLoaders(localCache);
 
@@ -446,8 +449,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
 
                     @Override
                     public void accept(UserSessionEntity userSessionEntity) {
-                        counter[0]++;
-                        tx.remove(localCache, userSessionEntity.getId());
+                        Future future = localCache.removeAsync(userSessionEntity.getId());
+                        futures.addTask(future);
 
                         // TODO:mposolda can be likely optimized to delete all expired at one step
                         persister.removeUserSession( userSessionEntity.getId(), true);
@@ -459,7 +462,9 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
                     }
                 });
 
-        log.debugf("Removed %d expired offline user sessions for realm '%s'", counter, realm.getName());
+        futures.waitForAllToFinish();
+
+        log.debugf("Removed %d expired offline user sessions for realm '%s'", futures.size(), realm.getName());
     }
 
     @Override
@@ -475,6 +480,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
     }
 
     private void removeLocalUserSessions(String realmId, boolean offline) {
+        FuturesHelper futures = new FuturesHelper();
+
         Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = getCache(offline);
         Cache<String, SessionEntityWrapper<UserSessionEntity>> localCache = CacheDecorators.localCache(cache);
 
@@ -489,11 +496,16 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
 
                     @Override
                     public void accept(String sessionId) {
-                        // Remove session from remoteCache too
-                        localCache.remove(sessionId);
+                        // Remove session from remoteCache too. Use removeAsync for better perf
+                        Future future = localCache.removeAsync(sessionId);
+                        futures.addTask(future);
                     }
 
                 });
+
+        futures.waitForAllToFinish();
+
+        log.debugf("Removed %d sessions in realm %s. Offline: %b", (Object) futures.size(), realmId, offline);
     }
 
     @Override
@@ -529,6 +541,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
     }
 
     private void removeAllLocalUserLoginFailuresEvent(String realmId) {
+        FuturesHelper futures = new FuturesHelper();
+
         Cache<LoginFailureKey, LoginFailureEntity> localCache = CacheDecorators.localCache(loginFailureCache);
 
         Cache<LoginFailureKey, LoginFailureEntity> localCacheStoreIgnore = CacheDecorators.skipCacheLoaders(localCache);
@@ -539,9 +553,14 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
                 .filter(UserLoginFailurePredicate.create(realmId))
                 .map(Mappers.loginFailureId())
                 .forEach(loginFailureKey -> {
-                    // Remove loginFailure from remoteCache too
-                    localCache.remove(loginFailureKey);
+                    // Remove loginFailure from remoteCache too. Use removeAsync for better perf
+                    Future future = localCache.removeAsync(loginFailureKey);
+                    futures.addTask(future);
                 });
+
+        futures.waitForAllToFinish();
+
+        log.debugf("Removed %d login failures in realm %s", futures.size(), realmId);
     }
 
     @Override
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/FuturesHelper.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/FuturesHelper.java
new file mode 100644
index 0000000..6f088dc
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/util/FuturesHelper.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2017 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.util;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.jboss.logging.Logger;
+
+/**
+ * Not thread-safe. Assumes tasks are added from single thread.
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class FuturesHelper {
+
+    private static final Logger log = Logger.getLogger(FuturesHelper.class);
+
+    private final Queue<Future> futures = new LinkedList<>();
+
+
+    public void addTask(Future future) {
+        this.futures.add(future);
+    }
+
+
+    public void waitForAllToFinish() {
+        for (Future future : futures) {
+            try {
+                future.get();
+            } catch (ExecutionException | InterruptedException ee) {
+                log.error("Exception when waiting for future", ee); // TODO Possibly some good mechanism to avoid swamp log with many same exceptions?
+            }
+        }
+    }
+
+
+    public int size() {
+        return futures.size();
+    }
+
+}