keycloak-aplcache

Merge pull request #4647 from mposolda/crossdc KEYCLOAK-5371

11/8/2017 8:14:37 AM

Details

diff --git a/common/src/main/java/org/keycloak/common/util/Retry.java b/common/src/main/java/org/keycloak/common/util/Retry.java
index 50f916e..1a6191e 100644
--- a/common/src/main/java/org/keycloak/common/util/Retry.java
+++ b/common/src/main/java/org/keycloak/common/util/Retry.java
@@ -17,13 +17,16 @@
 
 package org.keycloak.common.util;
 
+import java.util.Random;
+
 /**
  * @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
  */
 public class Retry {
 
+
     /**
-     * Runs the given {@code runnable} at most {@code retryCount} times until it passes,
+     * Runs the given {@code runnable} at most {@code attemptsCount} times until it passes,
      * leaving {@code intervalMillis} milliseconds between the invocations.
      * The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
      * @param runnable
@@ -32,14 +35,14 @@ public class Retry {
      * @return Index of the first successful invocation, starting from 0.
      */
     public static int execute(Runnable runnable, int attemptsCount, long intervalMillis) {
-        int executionIndex = 0;
+        int iteration = 0;
         while (true) {
             try {
                 runnable.run();
-                return executionIndex;
+                return iteration;
             } catch (RuntimeException | AssertionError e) {
                 attemptsCount--;
-                executionIndex++;
+                iteration++;
                 if (attemptsCount > 0) {
                     try {
                         if (intervalMillis > 0) {
@@ -56,8 +59,56 @@ public class Retry {
         }
     }
 
+
+    /**
+     * Runs the given {@code runnable} at most {@code attemptsCount} times until it passes,
+     * leaving some increasing random delay milliseconds between the invocations. It uses Exponential backoff + jitter algorithm
+     * to compute the delay. More details https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
+     *
+     * The base for delay is specified by {@code intervalBaseMillis} number.
+     *
+     * The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
+     *
+     * @param runnable
+     * @param attemptsCount Total number of attempts to execute the {@code runnable}
+     * @param intervalBaseMillis base for the exponential backoff + jitter
+     *
+     * @return Index of the first successful invocation, starting from 0.
+     */
+    public static int executeWithBackoff(AdvancedRunnable runnable, int attemptsCount, int intervalBaseMillis) {
+        int iteration = 0;
+        while (true) {
+            try {
+                runnable.run(iteration);
+                return iteration;
+            } catch (RuntimeException | AssertionError e) {
+                attemptsCount--;
+                iteration++;
+                if (attemptsCount > 0) {
+                    try {
+                        if (intervalBaseMillis > 0) {
+                            int delay = computeBackoffInterval(intervalBaseMillis, iteration);
+                            Thread.sleep(delay);
+                        }
+                    } catch (InterruptedException ie) {
+                        ie.addSuppressed(e);
+                        throw new RuntimeException(ie);
+                    }
+                } else {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    private static int computeBackoffInterval(int base, int iteration) {
+        int iterationBase = base * (int)Math.pow(2, iteration);
+        return new Random().nextInt(iterationBase);
+    }
+
+
     /**
-     * Runs the given {@code runnable} at most {@code retryCount} times until it passes,
+     * Runs the given {@code runnable} at most {@code attemptsCount} times until it passes,
      * leaving {@code intervalMillis} milliseconds between the invocations.
      * The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
      * @param supplier
@@ -66,11 +117,13 @@ public class Retry {
      * @return Value generated by the {@code supplier}.
      */
     public static <T> T call(Supplier<T> supplier, int attemptsCount, long intervalMillis) {
+        int iteration = 0;
         while (true) {
             try {
-                return supplier.get();
+                return supplier.get(iteration);
             } catch (RuntimeException | AssertionError e) {
                 attemptsCount--;
+                iteration++;
                 if (attemptsCount > 0) {
                     try {
                         if (intervalMillis > 0) {
@@ -89,7 +142,18 @@ public class Retry {
 
 
     /**
-     * Needed here just because java.util.function.Supplier defined from Java 8
+     * Runnable, which provides some additional info (iteration for now)
+     */
+    public interface AdvancedRunnable {
+
+        void run(int iteration);
+
+    }
+
+    /**
+     * Needed here because:
+     * - java.util.function.Supplier defined from Java 8
+     * - Adds some additional info (current iteration)
      */
     public interface Supplier<T> {
 
@@ -98,7 +162,7 @@ public class Retry {
          *
          * @return a result
          */
-        T get();
+        T get(int iteration);
     }
 
 
diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java
index 65946a4..847e144 100644
--- a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java
+++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java
@@ -222,11 +222,8 @@ public class InfinispanNotificationsManager {
 
                 });
             } catch (RejectedExecutionException ree) {
-                logger.warnf("Rejected submitting of the event for key: %s. Probably server going to shutdown", key);
-
-                if (logger.isDebugEnabled()) {
-                    logger.debug(ree.getMessage(), ree);
-                }
+                logger.errorf("Rejected submitting of the event for key: %s. Value: %s, Server going to shutdown or pool exhausted. Pool: %s", key, workCache.get(key), listenersExecutor.toString());
+                throw ree;
             }
         }
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanCodeToTokenStoreProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanCodeToTokenStoreProvider.java
index 1928733..463b777 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanCodeToTokenStoreProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanCodeToTokenStoreProvider.java
@@ -21,6 +21,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
 import org.infinispan.commons.api.BasicCache;
 import org.jboss.logging.Logger;
 import org.keycloak.common.util.Retry;
@@ -49,24 +50,20 @@ public class InfinispanCodeToTokenStoreProvider implements CodeToTokenStoreProvi
 
         int lifespanInSeconds = session.getContext().getRealm().getAccessCodeLifespan();
 
-        boolean codeAlreadyExists = Retry.call(() -> {
-
-            try {
-                BasicCache<UUID, ActionTokenValueEntity> cache = codeCache.get();
-                ActionTokenValueEntity existing = cache.putIfAbsent(codeId, tokenValue, lifespanInSeconds, TimeUnit.SECONDS);
-                return existing == null;
-            } catch (RuntimeException re) {
-                if (logger.isDebugEnabled()) {
-                    logger.debugf(re, "Failed when adding code %s", codeId);
-                }
-
-                // Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
-                throw re;
+        try {
+            BasicCache<UUID, ActionTokenValueEntity> cache = codeCache.get();
+            ActionTokenValueEntity existing = cache.putIfAbsent(codeId, tokenValue, lifespanInSeconds, TimeUnit.SECONDS);
+            return existing == null;
+        } catch (HotRodClientException re) {
+            // No need to retry. The hotrod (remoteCache) has some retries in itself in case of some random network error happened.
+            // In case of lock conflict, we don't want to retry anyway as there was likely an attempt to use the code from different place.
+            if (logger.isDebugEnabled()) {
+                logger.debugf(re, "Failed when adding code %s", codeId);
             }
 
-        }, 3, 0);
+            return false;
+        }
 
-        return codeAlreadyExists;
     }
 
     @Override
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/ClientListenerExecutorDecorator.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/ClientListenerExecutorDecorator.java
index 58b83a0..f2221cb 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/ClientListenerExecutorDecorator.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/ClientListenerExecutorDecorator.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
 import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
@@ -28,6 +29,7 @@ import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
 import org.infinispan.client.hotrod.event.ClientEvent;
 import org.jboss.logging.Logger;
 import org.keycloak.common.util.MultivaluedHashMap;
+import org.keycloak.common.util.Time;
 
 import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_CREATED;
 import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_REMOVED;
@@ -94,24 +96,40 @@ public class ClientListenerExecutorDecorator<K> {
 
     // Assume it's called from the synchronized block
     private void submitImpl(K key, MyClientEvent event, Runnable r) {
-        logger.debugf("Submitting event to the executor: %s", event.toString());
+        logger.debugf("Submitting event to the executor: %s . eventsInProgress size: %d, eventsQueue size: %d", event.toString(), eventsInProgress.size(), eventsQueue.size());
 
         eventsInProgress.put(key, event);
 
         Runnable decoratedRunnable = () -> {
+            Long start = null;
             try {
+                if (logger.isDebugEnabled()) {
+                    start = Time.currentTimeMillis();
+                }
+
                 r.run();
             } finally {
                 synchronized (lock) {
-                    logger.debugf("Finished processing event by the executor: %s", event.toString());
                     eventsInProgress.remove(key);
 
+                    if (logger.isDebugEnabled()) {
+                        long took = Time.currentTimeMillis() - start;
+                        logger.debugf("Finished processing event by the executor: %s, took: %d ms. EventsInProgress size: %d", event.toString(), took, eventsInProgress.size());
+                    }
+
                     pollQueue(key);
                 }
             }
         };
 
-        decorated.submit(decoratedRunnable);
+        try {
+            decorated.submit(decoratedRunnable);
+        } catch (RejectedExecutionException ree) {
+            eventsInProgress.remove(key);
+
+            logger.errorf("Rejected execution of task for the event '%s' . Try to increase the pool size. Pool is '%s'", event.toString(), decorated.toString());
+            throw ree;
+        }
     }
 
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
index 49024ac..404d3ca 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java
@@ -17,6 +17,7 @@
 
 package org.keycloak.models.sessions.infinispan.remotestore;
 
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
 import org.keycloak.common.util.Retry;
 import org.keycloak.common.util.Time;
 import java.util.Collections;
@@ -69,7 +70,9 @@ public class RemoteCacheInvoker {
         SessionUpdateTask.CrossDCMessageStatus status = task.getCrossDCMessageStatus(sessionWrapper);
 
         if (status == SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED) {
-            logger.debugf("Skip writing to remoteCache for entity '%s' of cache '%s' and operation '%s'", key, cacheName, operation);
+            if (logger.isTraceEnabled()) {
+                logger.tracef("Skip writing to remoteCache for entity '%s' of cache '%s' and operation '%s'", key, cacheName, operation);
+            }
             return;
         }
 
@@ -78,23 +81,25 @@ public class RemoteCacheInvoker {
         // Double the timeout to ensure that entry won't expire on remoteCache in case that write of some entities to remoteCache is postponed (eg. userSession.lastSessionRefresh)
         final long maxIdleTimeMs = loadedMaxIdleTimeMs * 2;
 
-        logger.debugf("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key);
+        if (logger.isTraceEnabled()) {
+            logger.tracef("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key);
+        }
 
-        Retry.execute(() -> {
+        Retry.executeWithBackoff((int iteration) -> {
 
             try {
                 runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, sessionWrapper);
-            } catch (RuntimeException re) {
+            } catch (HotRodClientException re) {
                 if (logger.isDebugEnabled()) {
-                    logger.debugf(re, "Failed running task '%s' on remote cache '%s' . Key: '%s' . Will try to retry the task",
-                            operation, cacheName, key);
+                    logger.debugf(re, "Failed running task '%s' on remote cache '%s' . Key: '%s', iteration '%s'. Will try to retry the task",
+                            operation, cacheName, key, iteration);
                 }
 
                 // Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
                 throw re;
             }
 
-        }, 10, 0);
+        }, 10, 10);
     }
 
 
@@ -146,15 +151,17 @@ public class RemoteCacheInvoker {
             // Run task on the remote session
             task.runUpdate(session);
 
-            logger.debugf("Before replaceWithVersion. Entity to write version %d: %s", versioned.getVersion(), session);
+            if (logger.isTraceEnabled()) {
+                logger.tracef("Before replaceWithVersion. Entity to write version %d: %s", versioned.getVersion(), session);
+            }
 
             replaced = remoteCache.replaceWithVersion(key, SessionEntityWrapper.forTransport(session), versioned.getVersion(), lifespanMs, TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
 
             if (!replaced) {
                 logger.debugf("Failed to replace entity '%s' version %d. Will retry again", key, versioned.getVersion());
             } else {
-                if (logger.isDebugEnabled()) {
-                    logger.debugf("Replaced entity version %d in remote cache: %s", versioned.getVersion(), session);
+                if (logger.isTraceEnabled()) {
+                    logger.tracef("Replaced entity version %d in remote cache: %s", versioned.getVersion(), session);
                 }
             }
         }
diff --git a/services/src/main/java/org/keycloak/executors/DefaultExecutorsProviderFactory.java b/services/src/main/java/org/keycloak/executors/DefaultExecutorsProviderFactory.java
index adc761d..4d7ccb4 100644
--- a/services/src/main/java/org/keycloak/executors/DefaultExecutorsProviderFactory.java
+++ b/services/src/main/java/org/keycloak/executors/DefaultExecutorsProviderFactory.java
@@ -45,7 +45,7 @@ public class DefaultExecutorsProviderFactory implements ExecutorsProviderFactory
     protected static final Logger logger = Logger.getLogger(DefaultExecutorsProviderFactory.class);
 
     private static final int DEFAULT_MIN_THREADS = 4;
-    private static final int DEFAULT_MAX_THREADS = 16;
+    private static final int DEFAULT_MAX_THREADS = 64;
 
     private static final String MANAGED_EXECUTORS_SERVICE_JNDI_PREFIX = "java:jboss/ee/concurrency/executor/";