ConcurrencyJDGSessionsCacheTest.java

400 lines | 15.365 kB Blame History Raw Download
/*
 * Copyright 2016 Red Hat, Inc. and/or its affiliates
 * and other contributors as indicated by the @author tags.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.keycloak.cluster.infinispan;

import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;

import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.VersionedValue;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.context.Flag;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.RemoteStore;
import org.infinispan.persistence.remote.configuration.ExhaustedAction;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.models.sessions.infinispan.remotestore.KcRemoteStore;
import org.keycloak.models.sessions.infinispan.remotestore.KcRemoteStoreConfigurationBuilder;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;

/**
 * Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG. Especially tests "replaceWithVersion" contract.
 *
 * Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
 *
 * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
 */
public class ConcurrencyJDGSessionsCacheTest {

    protected static final Logger logger = Logger.getLogger(KcRemoteStore.class);

    private static final int ITERATION_PER_WORKER = 1000;

    private static RemoteCache remoteCache1;
    private static RemoteCache remoteCache2;

    private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
    private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);

    private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0);
    private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0);

    //private static Map<String, EntryInfo> state = new HashMap<>();

    public static void main(String[] args) throws Exception {
        Cache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createManager(1).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
        Cache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createManager(2).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);

        // Create initial item
        UserSessionEntity session = new UserSessionEntity();
        session.setId("123");
        session.setRealm("foo");
        session.setBrokerSessionId("!23123123");
        session.setBrokerUserId(null);
        session.setUser("foo");
        session.setLoginUsername("foo");
        session.setIpAddress("123.44.143.178");
        session.setStarted(Time.currentTime());
        session.setLastSessionRefresh(Time.currentTime());

        AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity();
        clientSession.setAuthMethod("saml");
        clientSession.setAction("something");
        clientSession.setTimestamp(1234);
        clientSession.setProtocolMappers(new HashSet<>(Arrays.asList("mapper1", "mapper2")));
        clientSession.setRoles(new HashSet<>(Arrays.asList("role1", "role2")));
        session.getAuthenticatedClientSessions().put("client1", clientSession);

        SessionEntityWrapper<UserSessionEntity> wrappedSession = new SessionEntityWrapper<>(session);

        // Some dummy testing of remoteStore behaviour
        logger.info("Before put");

        cache1
                .getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL) // will still invoke remoteStore . Just doesn't propagate to cluster
                .put("123", wrappedSession);

        logger.info("After put");

        cache1.replace("123",  wrappedSession);

        logger.info("After replace");

        cache1.get("123");

        logger.info("After cache1.get");

        cache2.get("123");

        logger.info("After cache2.get");

        cache1.get("123");

        logger.info("After cache1.get - second call");

        cache2.get("123");

        logger.info("After cache2.get - second call");

        cache2.replace("123",  wrappedSession);

        logger.info("After replace - second call");

        cache1.get("123");

        logger.info("After cache1.get - third call");

        cache2.get("123");

        logger.info("After cache2.get - third call");

        cache1
                .getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD)
                .entrySet().stream().forEach(e -> {
        });

        logger.info("After cache1.stream");

        // Explicitly call put on remoteCache (KcRemoteCache.write ignores remote writes)
        InfinispanUtil.getRemoteCache(cache1).put("123", session);

        // Create caches, listeners and finally worker threads
        Thread worker1 = createWorker(cache1, 1);
        Thread worker2 = createWorker(cache2, 2);

        long start = System.currentTimeMillis();

        // Start and join workers
        worker1.start();
        worker2.start();

        worker1.join();
        worker2.join();

        long took = System.currentTimeMillis() - start;

//        // Output
//        for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
//            System.out.println(entry.getKey() + ":::" + entry.getValue());
//            worker1.cache.remove(entry.getKey());
//        }

        System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
                ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
                ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() );

        System.out.println("Sleeping before other report");

        Thread.sleep(1000);

        System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
                ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
                ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());



        // Finish JVM
        cache1.getCacheManager().stop();
        cache2.getCacheManager().stop();
    }

    private static Thread createWorker(Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, int threadId) {
        System.out.println("Retrieved cache: " + threadId);

        RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);

        if (threadId == 1) {
            remoteCache1 = remoteCache;
        } else {
            remoteCache2 = remoteCache;
        }

        AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2;
        HotRodListener listener = new HotRodListener(cache, remoteCache, counter);
        remoteCache.addClientListener(listener);

        return new RemoteCacheWorker(remoteCache, threadId);
        //return new CacheWorker(cache, threadId);
    }

    private static EmbeddedCacheManager createManager(int threadId) {
        System.setProperty("java.net.preferIPv4Stack", "true");
        System.setProperty("jgroups.tcp.port", "53715");
        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();

        boolean clustered = false;
        boolean async = false;
        boolean allowDuplicateJMXDomains = true;

        if (clustered) {
            gcb = gcb.clusteredDefault();
            gcb.transport().clusterName("test-clustering");
        }

        gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);

        EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());

        Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);

        cacheManager.defineConfiguration(InfinispanConnectionProvider.SESSION_CACHE_NAME, invalidationCacheConfiguration);
        return cacheManager;

    }

    private static Configuration getCacheBackedByRemoteStore(int threadId) {
        ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();

        int port = threadId==1 ? 12232 : 13232;
        //int port = 12232;

        return cacheConfigBuilder.persistence().addStore(KcRemoteStoreConfigurationBuilder.class)
                .fetchPersistentState(false)
                .ignoreModifications(false)
                .purgeOnStartup(false)
                .preload(false)
                .shared(true)
                .remoteCacheName(InfinispanConnectionProvider.SESSION_CACHE_NAME)
                .rawValues(true)
                .forceReturnValues(false)
                .marshaller(KeycloakHotRodMarshallerFactory.class.getName())
                .addServer()
                    .host("localhost")
                    .port(port)
                .connectionPool()
                    .maxActive(20)
                    .exhaustedAction(ExhaustedAction.CREATE_NEW)
                .async()
                    .enabled(false).build();
    }

    @ClientListener
    public static class HotRodListener {

        private Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache;
        private RemoteCache remoteCache;
        private AtomicInteger listenerCount;

        public HotRodListener(Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache, RemoteCache remoteCache, AtomicInteger listenerCount) {
            this.listenerCount = listenerCount;
            this.remoteCache = remoteCache;
            this.origCache = origCache;
        }

        @ClientCacheEntryCreated
        public void created(ClientCacheEntryCreatedEvent event) {
            String cacheKey = (String) event.getKey();
            listenerCount.incrementAndGet();
        }

        @ClientCacheEntryModified
        public void updated(ClientCacheEntryModifiedEvent event) {
            String cacheKey = (String) event.getKey();
            listenerCount.incrementAndGet();

            // TODO: can be optimized
            SessionEntity session = (SessionEntity) remoteCache.get(cacheKey);
            SessionEntityWrapper sessionWrapper = new SessionEntityWrapper(session);

            // TODO: for distributed caches, ensure that it is executed just on owner OR if event.isCommandRetried
            origCache
                    .getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE)
                    .replace(cacheKey, sessionWrapper);
        }




    }

    private static class RemoteCacheWorker extends Thread {

        private final RemoteCache<String, UserSessionEntity> remoteCache;

        private final int myThreadId;

        private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) {
            this.remoteCache = remoteCache;
            this.myThreadId = myThreadId;
        }

        @Override
        public void run() {

            for (int i=0 ; i<ITERATION_PER_WORKER ; i++) {

                String noteKey = "n-" + myThreadId + "-" + i;

                boolean replaced = false;
                while (!replaced) {
                    VersionedValue<UserSessionEntity> versioned = remoteCache.getVersioned("123");
                    UserSessionEntity oldSession = versioned.getValue();
                    //UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession);
                    UserSessionEntity clone = oldSession;

                    clone.getNotes().put(noteKey, "someVal");
                    //cache.replace("123", clone);
                    replaced = cacheReplace(versioned, clone);
                }

                // Try to see if remoteCache on 2nd DC is immediatelly seeing our change
                RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1;
                UserSessionEntity thatSession = (UserSessionEntity) secondDCRemoteCache.get("123");

                Assert.assertEquals("someVal", thatSession.getNotes().get(noteKey));
                //System.out.println("Passed");
            }

        }

        private boolean cacheReplace(VersionedValue<UserSessionEntity> oldSession, UserSessionEntity newSession) {
            try {
                boolean replaced = remoteCache.replaceWithVersion("123", newSession, oldSession.getVersion());
                //cache.replace("123", newSession);
                if (!replaced) {
                    failedReplaceCounter.incrementAndGet();
                    //return false;
                    //System.out.println("Replace failed!!!");
                }
                return replaced;
            } catch (Exception re) {
                failedReplaceCounter2.incrementAndGet();
                return false;
            }
            //return replaced;
        }

    }
/*
    // Worker, which operates on "classic" cache and rely on operations delegated to the second cache
    private static class CacheWorker extends Thread {

        private final Cache<String, SessionEntityWrapper<UserSessionEntity>> cache;

        private final int myThreadId;

        private CacheWorker(Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, int myThreadId) {
            this.cache = cache;
            this.myThreadId = myThreadId;
        }

        @Override
        public void run() {

            for (int i=0 ; i<ITERATION_PER_WORKER ; i++) {

                String noteKey = "n-" + myThreadId + "-" + i;

                boolean replaced = false;
                while (!replaced) {
                    VersionedValue<UserSessionEntity> versioned = cache.getVersioned("123");
                    UserSessionEntity oldSession = versioned.getValue();
                    //UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession);
                    UserSessionEntity clone = oldSession;

                    clone.getNotes().put(noteKey, "someVal");
                    //cache.replace("123", clone);
                    replaced = cacheReplace(versioned, clone);
                }
            }

        }

    }*/


}