keycloak-memoizeit
Changes
model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProvider.java 186(+186 -0)
model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProviderFactory.java 199(+199 -0)
model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java 9(+9 -0)
model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java 1(+1 -0)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java 49(+31 -18)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java 9(+4 -5)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanUserSessionInitializer.java 49(+25 -24)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflineUserSessionLoader.java 6(+5 -1)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java 10(+5 -5)
model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java 2(+1 -1)
model/infinispan/src/main/resources/META-INF/services/org.keycloak.cluster.ClusterProviderFactory 18(+18 -0)
services/src/main/java/org/keycloak/services/resources/admin/UserFederationProviderResource.java 4(+2 -2)
services/src/main/java/org/keycloak/services/resources/admin/UserFederationProvidersResource.java 2(+1 -1)
services/src/main/java/org/keycloak/services/scheduled/ClusterAwareScheduledTaskRunner.java 68(+68 -0)
testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncDummyUserFederationProviderFactory.java 20(+18 -2)
testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncFederationTest.java 16(+13 -3)
Details
diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProvider.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProvider.java
new file mode 100644
index 0000000..9986dad
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProvider.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+import org.infinispan.Cache;
+import org.infinispan.context.Flag;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.remoting.transport.Transport;
+import org.jboss.logging.Logger;
+import org.keycloak.cluster.ClusterEvent;
+import org.keycloak.cluster.ClusterListener;
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.cluster.ExecutionResult;
+import org.keycloak.common.util.Time;
+import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
+import org.keycloak.models.KeycloakSession;
+
+/**
+ * Various utils related to clustering
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class InfinispanClusterProvider implements ClusterProvider {
+
+ protected static final Logger logger = Logger.getLogger(InfinispanClusterProvider.class);
+
+ public static final String CLUSTER_STARTUP_TIME_KEY = "cluster-start-time";
+ private static final String TASK_KEY_PREFIX = "task::";
+
+ private final InfinispanClusterProviderFactory factory;
+ private final KeycloakSession session;
+ private final Cache<String, Serializable> cache;
+
+ public InfinispanClusterProvider(InfinispanClusterProviderFactory factory, KeycloakSession session, Cache<String, Serializable> cache) {
+ this.factory = factory;
+ this.session = session;
+ this.cache = cache;
+ }
+
+
+ @Override
+ public int getClusterStartupTime() {
+ Integer existingClusterStartTime = (Integer) cache.get(InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY);
+ if (existingClusterStartTime != null) {
+ return existingClusterStartTime;
+ } else {
+ // clusterStartTime not yet initialized. Let's try to put our startupTime
+ int serverStartTime = (int) (session.getKeycloakSessionFactory().getServerStartupTimestamp() / 1000);
+
+ existingClusterStartTime = (Integer) cache.putIfAbsent(InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY, serverStartTime);
+ if (existingClusterStartTime == null) {
+ logger.infof("Initialized cluster startup time to %s", Time.toDate(serverStartTime).toString());
+ return serverStartTime;
+ } else {
+ return existingClusterStartTime;
+ }
+ }
+ }
+
+
+ @Override
+ public void close() {
+ }
+
+
+ @Override
+ public <T> ExecutionResult<T> executeIfNotExecuted(String taskKey, int taskTimeoutInSeconds, Callable<T> task) {
+ String cacheKey = TASK_KEY_PREFIX + taskKey;
+ boolean locked = tryLock(cacheKey, taskTimeoutInSeconds);
+ if (locked) {
+ try {
+ try {
+ T result = task.call();
+ return ExecutionResult.executed(result);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException("Unexpected exception when executed task " + taskKey, e);
+ }
+ } finally {
+ removeFromCache(cacheKey);
+ }
+ } else {
+ return ExecutionResult.notExecuted();
+ }
+ }
+
+
+ @Override
+ public void registerListener(String taskKey, ClusterListener task) {
+ factory.registerListener(taskKey, task);
+ }
+
+
+ @Override
+ public void notify(String taskKey, ClusterEvent event) {
+ // Put the value to the cache to notify listeners on all the nodes
+ cache.put(taskKey, event);
+ }
+
+
+ private String getCurrentNode(Cache<String, Serializable> cache) {
+ Transport transport = cache.getCacheManager().getTransport();
+ return transport==null ? "local" : transport.getAddress().toString();
+ }
+
+
+ private LockEntry createLockEntry(Cache<String, Serializable> cache) {
+ LockEntry lock = new LockEntry();
+ lock.setNode(getCurrentNode(cache));
+ lock.setTimestamp(Time.currentTime());
+ return lock;
+ }
+
+
+ private boolean tryLock(String cacheKey, int taskTimeoutInSeconds) {
+ LockEntry myLock = createLockEntry(cache);
+
+ LockEntry existingLock = (LockEntry) cache.putIfAbsent(cacheKey, myLock);
+ if (existingLock != null) {
+ // Task likely already in progress. Check if timestamp is not outdated
+ int thatTime = existingLock.getTimestamp();
+ int currentTime = Time.currentTime();
+ if (thatTime + taskTimeoutInSeconds < currentTime) {
+ logger.infof("Task %s outdated when in progress by node %s. Will try to replace task with our node %s", cacheKey, existingLock.getNode(), myLock.getNode());
+ boolean replaced = cache.replace(cacheKey, existingLock, myLock);
+ // TODO: trace
+ if (!replaced) {
+ logger.infof("Failed to replace the task %s. Other thread replaced in the meantime. Ignoring task.", cacheKey);
+ }
+ return replaced;
+ } else {
+ logger.infof("Task %s in progress already by node %s. Ignoring task.", cacheKey, existingLock.getNode());
+ return false;
+ }
+ } else {
+ logger.infof("Successfully acquired lock for task %s. Our node is %s", cacheKey, myLock.getNode());
+ return true;
+ }
+ }
+
+
+ private void removeFromCache(String cacheKey) {
+ // 3 attempts to send the message (it may fail if some node fails in the meantime)
+ int retry = 3;
+ while (true) {
+ try {
+ cache.getAdvancedCache()
+ .withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FORCE_SYNCHRONOUS)
+ .remove(cacheKey);
+ logger.infof("Task %s removed from the cache", cacheKey);
+ return;
+ } catch (RuntimeException e) {
+ ComponentStatus status = cache.getStatus();
+ if (status.isStopping() || status.isTerminated()) {
+ logger.warnf("Failed to remove task %s from the cache. Cache is already terminating", cacheKey);
+ logger.debug(e.getMessage(), e);
+ return;
+ }
+ retry--;
+ if (retry == 0) {
+ throw e;
+ }
+ }
+ }
+ }
+
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProviderFactory.java
new file mode 100644
index 0000000..48adfe1
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProviderFactory.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.infinispan.Cache;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
+import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
+import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
+import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.remoting.transport.Transport;
+import org.jboss.logging.Logger;
+import org.keycloak.Config;
+import org.keycloak.cluster.ClusterEvent;
+import org.keycloak.cluster.ClusterListener;
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.cluster.ClusterProviderFactory;
+import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.KeycloakSessionFactory;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class InfinispanClusterProviderFactory implements ClusterProviderFactory {
+
+ public static final String PROVIDER_ID = "infinispan";
+
+ protected static final Logger logger = Logger.getLogger(InfinispanClusterProviderFactory.class);
+
+ private volatile Cache<String, Serializable> workCache;
+
+ private Map<String, ClusterListener> listeners = new HashMap<>();
+
+ @Override
+ public ClusterProvider create(KeycloakSession session) {
+ lazyInit(session);
+ return new InfinispanClusterProvider(this, session, workCache);
+ }
+
+ private void lazyInit(KeycloakSession session) {
+ if (workCache == null) {
+ synchronized (this) {
+ if (workCache == null) {
+ workCache = session.getProvider(InfinispanConnectionProvider.class).getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
+ workCache.getCacheManager().addListener(new ViewChangeListener());
+ workCache.addListener(new CacheEntryListener());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void init(Config.Scope config) {
+ }
+
+ @Override
+ public void postInit(KeycloakSessionFactory factory) {
+ }
+
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public String getId() {
+ return PROVIDER_ID;
+ }
+
+
+ @Listener
+ public class ViewChangeListener {
+
+ @ViewChanged
+ public void viewChanged(ViewChangedEvent event) {
+ EmbeddedCacheManager cacheManager = event.getCacheManager();
+ Transport transport = cacheManager.getTransport();
+
+ // Coordinator makes sure that entries for outdated nodes are cleaned up
+ if (transport != null && transport.isCoordinator()) {
+
+ Set<String> newAddresses = convertAddresses(event.getNewMembers());
+ Set<String> removedNodesAddresses = convertAddresses(event.getOldMembers());
+ removedNodesAddresses.removeAll(newAddresses);
+
+ if (removedNodesAddresses.isEmpty()) {
+ return;
+ }
+
+ logger.infof("Nodes %s removed from cluster. Removing tasks locked by this nodes", removedNodesAddresses.toString());
+
+ Cache<String, Serializable> cache = cacheManager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
+
+ Iterator<String> toRemove = cache.entrySet().stream().filter(new Predicate<Map.Entry<String, Serializable>>() {
+
+ @Override
+ public boolean test(Map.Entry<String, Serializable> entry) {
+ if (!(entry.getValue() instanceof LockEntry)) {
+ return false;
+ }
+
+ LockEntry lock = (LockEntry) entry.getValue();
+ return removedNodesAddresses.contains(lock.getNode());
+ }
+
+ }).map(new Function<Map.Entry<String, Serializable>, String>() {
+
+ @Override
+ public String apply(Map.Entry<String, Serializable> entry) {
+ return entry.getKey();
+ }
+
+ }).iterator();
+
+ while (toRemove.hasNext()) {
+ String rem = toRemove.next();
+ logger.infof("Removing task %s due it's node left cluster", rem);
+ cache.remove(rem);
+ }
+ }
+ }
+
+ private Set<String> convertAddresses(Collection<Address> addresses) {
+ return addresses.stream().map(new Function<Address, String>() {
+
+ @Override
+ public String apply(Address address) {
+ return address.toString();
+ }
+
+ }).collect(Collectors.toSet());
+ }
+
+ }
+
+
+ <T> void registerListener(String taskKey, ClusterListener task) {
+ listeners.put(taskKey, task);
+ }
+
+ @Listener
+ public class CacheEntryListener {
+
+ @CacheEntryCreated
+ public void cacheEntryCreated(CacheEntryCreatedEvent<String, Object> event) {
+ if (!event.isPre()) {
+ trigger(event.getKey(), event.getValue());
+ }
+ }
+
+ @CacheEntryModified
+ public void cacheEntryModified(CacheEntryModifiedEvent<String, Object> event) {
+ if (!event.isPre()) {
+ trigger(event.getKey(), event.getValue());
+ }
+ }
+
+ private void trigger(String key, Object value) {
+ ClusterListener task = listeners.get(key);
+ if (task != null) {
+ ClusterEvent event = (ClusterEvent) value;
+ task.run(event);
+ }
+ }
+ }
+
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/LockEntry.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/LockEntry.java
new file mode 100644
index 0000000..f6a795a
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/LockEntry.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import java.io.Serializable;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class LockEntry implements Serializable {
+
+ private String node;
+ private int timestamp;
+
+ public String getNode() {
+ return node;
+ }
+
+ public void setNode(String node) {
+ this.node = node;
+ }
+
+ public int getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(int timestamp) {
+ this.timestamp = timestamp;
+ }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java
index b90ef33..01d229f 100755
--- a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java
+++ b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java
@@ -23,6 +23,8 @@ import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.transaction.LockingMode;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.models.KeycloakSession;
@@ -145,6 +147,13 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
cacheManager.defineConfiguration(InfinispanConnectionProvider.SESSION_CACHE_NAME, sessionCacheConfiguration);
cacheManager.defineConfiguration(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME, sessionCacheConfiguration);
cacheManager.defineConfiguration(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME, sessionCacheConfiguration);
+
+ ConfigurationBuilder replicationConfigBuilder = new ConfigurationBuilder();
+ if (clustered) {
+ replicationConfigBuilder.clustering().cacheMode(async ? CacheMode.REPL_ASYNC : CacheMode.REPL_SYNC);
+ }
+ Configuration replicationCacheConfiguration = replicationConfigBuilder.build();
+ cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, replicationCacheConfiguration);
}
}
diff --git a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java
index 2c3ca14..b37beb9 100644
--- a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java
@@ -30,6 +30,7 @@ public interface InfinispanConnectionProvider extends Provider {
static final String SESSION_CACHE_NAME = "sessions";
static final String OFFLINE_SESSION_CACHE_NAME = "offlineSessions";
static final String LOGIN_FAILURE_CACHE_NAME = "loginFailures";
+ static final String WORK_CACHE_NAME = "work";
<K, V> Cache<K, V> getCache(String name);
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
index 1fc04de..8602834 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
@@ -21,15 +21,41 @@ import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Time;
-import org.keycloak.models.*;
+import org.keycloak.models.ClientInitialAccessModel;
+import org.keycloak.models.ClientModel;
+import org.keycloak.models.ClientSessionModel;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.KeycloakTransaction;
+import org.keycloak.models.RealmModel;
+import org.keycloak.models.UserModel;
+import org.keycloak.models.UserSessionModel;
+import org.keycloak.models.UserSessionProvider;
+import org.keycloak.models.UsernameLoginFailureModel;
import org.keycloak.models.session.UserSessionPersisterProvider;
-import org.keycloak.models.sessions.infinispan.entities.*;
-import org.keycloak.models.sessions.infinispan.initializer.TimeAwareInitializerState;
-import org.keycloak.models.sessions.infinispan.stream.*;
+import org.keycloak.models.sessions.infinispan.entities.ClientInitialAccessEntity;
+import org.keycloak.models.sessions.infinispan.entities.ClientSessionEntity;
+import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
+import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
+import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
+import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+import org.keycloak.models.sessions.infinispan.stream.ClientInitialAccessPredicate;
+import org.keycloak.models.sessions.infinispan.stream.ClientSessionPredicate;
+import org.keycloak.models.sessions.infinispan.stream.Comparators;
+import org.keycloak.models.sessions.infinispan.stream.Mappers;
+import org.keycloak.models.sessions.infinispan.stream.SessionPredicate;
+import org.keycloak.models.sessions.infinispan.stream.UserLoginFailurePredicate;
+import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.models.utils.RealmInfoUtil;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
@@ -412,19 +438,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
}
@Override
- public int getClusterStartupTime() {
- TimeAwareInitializerState state = (TimeAwareInitializerState) offlineSessionCache.get(InfinispanUserSessionProviderFactory.SESSION_INITIALIZER_STATE_KEY);
- int startTime;
- if (state == null) {
- log.warn("Cluster startup time not yet available. Fallback to local startup time");
- startTime = (int)(session.getKeycloakSessionFactory().getServerStartupTimestamp() / 1000);
- } else {
- startTime = state.getClusterStartupTime();
- }
- return startTime;
- }
-
- @Override
public void close() {
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java
index 1bb82a1..a7c312f 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java
@@ -17,6 +17,8 @@
package org.keycloak.models.sessions.infinispan;
+import java.io.Serializable;
+
import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.keycloak.Config;
@@ -34,9 +36,6 @@ import org.keycloak.provider.ProviderEventListener;
public class InfinispanUserSessionProviderFactory implements UserSessionProviderFactory {
- private static final String STATE_KEY_PREFIX = "initializerState";
- public static final String SESSION_INITIALIZER_STATE_KEY = STATE_KEY_PREFIX + "::offlineUserSessions";
-
private static final Logger log = Logger.getLogger(InfinispanUserSessionProviderFactory.class);
private Config.Scope config;
@@ -85,9 +84,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
@Override
public void run(KeycloakSession session) {
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
- Cache<String, SessionEntity> cache = connections.getCache(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME);
+ Cache<String, Serializable> cache = connections.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
- InfinispanUserSessionInitializer initializer = new InfinispanUserSessionInitializer(sessionFactory, cache, new OfflineUserSessionLoader(), maxErrors, sessionsPerSegment, SESSION_INITIALIZER_STATE_KEY);
+ InfinispanUserSessionInitializer initializer = new InfinispanUserSessionInitializer(sessionFactory, cache, new OfflineUserSessionLoader(), maxErrors, sessionsPerSegment, "offlineUserSessions");
initializer.initCache();
initializer.loadPersistentSessions();
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanUserSessionInitializer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanUserSessionInitializer.java
index deae897..8dd9575 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanUserSessionInitializer.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanUserSessionInitializer.java
@@ -20,57 +20,55 @@ package org.keycloak.models.sessions.infinispan.initializer;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.infinispan.distexec.DefaultExecutorService;
-import org.infinispan.notifications.Listener;
-import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
-import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.remoting.transport.Transport;
import org.jboss.logging.Logger;
-import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.KeycloakSessionTask;
-import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.utils.KeycloakModelUtils;
/**
* Startup initialization for reading persistent userSessions/clientSessions to be filled into infinispan/memory . In cluster,
* the initialization is distributed among all cluster nodes, so the startup time is even faster
*
+ * TODO: Move to clusterService. Implementation is already pretty generic and doesn't contain any "userSession" specific stuff. All logic is in the SessionLoader implementation
+ *
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class InfinispanUserSessionInitializer {
+ private static final String STATE_KEY_PREFIX = "distributed::";
+
private static final Logger log = Logger.getLogger(InfinispanUserSessionInitializer.class);
private final KeycloakSessionFactory sessionFactory;
- private final Cache<String, SessionEntity> cache;
+ private final Cache<String, Serializable> workCache;
private final SessionLoader sessionLoader;
private final int maxErrors;
private final int sessionsPerSegment;
private final String stateKey;
- public InfinispanUserSessionInitializer(KeycloakSessionFactory sessionFactory, Cache<String, SessionEntity> cache, SessionLoader sessionLoader, int maxErrors, int sessionsPerSegment, String stateKey) {
+ public InfinispanUserSessionInitializer(KeycloakSessionFactory sessionFactory, Cache<String, Serializable> workCache, SessionLoader sessionLoader, int maxErrors, int sessionsPerSegment, String stateKeySuffix) {
this.sessionFactory = sessionFactory;
- this.cache = cache;
+ this.workCache = workCache;
this.sessionLoader = sessionLoader;
this.maxErrors = maxErrors;
this.sessionsPerSegment = sessionsPerSegment;
- this.stateKey = stateKey;
+ this.stateKey = STATE_KEY_PREFIX + stateKeySuffix;
}
public void initCache() {
- this.cache.getAdvancedCache().getComponentRegistry().registerComponent(sessionFactory, KeycloakSessionFactory.class);
+ this.workCache.getAdvancedCache().getComponentRegistry().registerComponent(sessionFactory, KeycloakSessionFactory.class);
}
@@ -94,16 +92,14 @@ public class InfinispanUserSessionInitializer {
private boolean isFinished() {
- InitializerState state = (InitializerState) cache.get(stateKey);
+ InitializerState state = (InitializerState) workCache.get(stateKey);
return state != null && state.isFinished();
}
private InitializerState getOrCreateInitializerState() {
- TimeAwareInitializerState state = (TimeAwareInitializerState) cache.get(stateKey);
+ InitializerState state = (InitializerState) workCache.get(stateKey);
if (state == null) {
- int startTime = (int)(sessionFactory.getServerStartupTimestamp() / 1000);
-
final int[] count = new int[1];
// Rather use separate transactions for update and counting
@@ -111,7 +107,7 @@ public class InfinispanUserSessionInitializer {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
- sessionLoader.init(session, startTime);
+ sessionLoader.init(session);
}
});
@@ -124,9 +120,8 @@ public class InfinispanUserSessionInitializer {
});
- state = new TimeAwareInitializerState();
+ state = new InitializerState();
state.init(count[0], sessionsPerSegment);
- state.setClusterStartupTime(startTime);
saveStateToCache(state);
}
return state;
@@ -143,7 +138,7 @@ public class InfinispanUserSessionInitializer {
public void run() {
// Save this synchronously to ensure all nodes read correct state
- InfinispanUserSessionInitializer.this.cache.getAdvancedCache().
+ InfinispanUserSessionInitializer.this.workCache.getAdvancedCache().
withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FORCE_SYNCHRONOUS)
.put(stateKey, state);
}
@@ -153,7 +148,7 @@ public class InfinispanUserSessionInitializer {
private boolean isCoordinator() {
- Transport transport = cache.getCacheManager().getTransport();
+ Transport transport = workCache.getCacheManager().getTransport();
return transport == null || transport.isCoordinator();
}
@@ -166,9 +161,9 @@ public class InfinispanUserSessionInitializer {
int processors = Runtime.getRuntime().availableProcessors();
ExecutorService localExecutor = Executors.newCachedThreadPool();
- Transport transport = cache.getCacheManager().getTransport();
+ Transport transport = workCache.getCacheManager().getTransport();
boolean distributed = transport != null;
- ExecutorService executorService = distributed ? new DefaultExecutorService(cache, localExecutor) : localExecutor;
+ ExecutorService executorService = distributed ? new DefaultExecutorService(workCache, localExecutor) : localExecutor;
int errors = 0;
@@ -190,7 +185,7 @@ public class InfinispanUserSessionInitializer {
SessionInitializerWorker worker = new SessionInitializerWorker();
worker.setWorkerEnvironment(segment, sessionsPerSegment, sessionLoader);
if (!distributed) {
- worker.setEnvironment(cache, null);
+ worker.setEnvironment(workCache, null);
}
Future<WorkerResult> future = executorService.submit(worker);
@@ -242,6 +237,12 @@ public class InfinispanUserSessionInitializer {
runnable.run();
return;
} catch (RuntimeException e) {
+ ComponentStatus status = workCache.getStatus();
+ if (status.isStopping() || status.isTerminated()) {
+ log.warn("Failed to put initializerState to the cache. Cache is already terminating");
+ log.debug(e.getMessage(), e);
+ return;
+ }
retry--;
if (retry == 0) {
throw e;
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflineUserSessionLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflineUserSessionLoader.java
index 8b651c0..352372c 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflineUserSessionLoader.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflineUserSessionLoader.java
@@ -20,6 +20,7 @@ package org.keycloak.models.sessions.infinispan.initializer;
import java.util.List;
import org.jboss.logging.Logger;
+import org.keycloak.cluster.ClusterProvider;
import org.keycloak.models.ClientSessionModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.UserSessionModel;
@@ -33,9 +34,12 @@ public class OfflineUserSessionLoader implements SessionLoader {
private static final Logger log = Logger.getLogger(OfflineUserSessionLoader.class);
@Override
- public void init(KeycloakSession session, int clusterStartupTime) {
+ public void init(KeycloakSession session) {
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
+ // TODO: check if update of timestamps in persister can be skipped entirely
+ int clusterStartupTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
+
log.debugf("Clearing detached sessions from persistent storage and updating timestamps to %d", clusterStartupTime);
persister.clearDetachedUserSessions();
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java
index eba3cea..b5b8557 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java
@@ -32,7 +32,7 @@ import org.keycloak.models.utils.KeycloakModelUtils;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
-public class SessionInitializerWorker implements DistributedCallable<String, SessionEntity, InfinispanUserSessionInitializer.WorkerResult>, Serializable {
+public class SessionInitializerWorker implements DistributedCallable<String, Serializable, InfinispanUserSessionInitializer.WorkerResult>, Serializable {
private static final Logger log = Logger.getLogger(SessionInitializerWorker.class);
@@ -40,7 +40,7 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ses
private int sessionsPerSegment;
private SessionLoader sessionLoader;
- private transient Cache<String, SessionEntity> cache;
+ private transient Cache<String, Serializable> workCache;
public void setWorkerEnvironment(int segment, int sessionsPerSegment, SessionLoader sessionLoader) {
this.segment = segment;
@@ -49,8 +49,8 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ses
}
@Override
- public void setEnvironment(Cache<String, SessionEntity> cache, Set<String> inputKeys) {
- this.cache = cache;
+ public void setEnvironment(Cache<String, Serializable> workCache, Set<String> inputKeys) {
+ this.workCache = workCache;
}
@Override
@@ -59,7 +59,7 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ses
log.tracef("Running computation for segment: %d", segment);
}
- KeycloakSessionFactory sessionFactory = cache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
+ KeycloakSessionFactory sessionFactory = workCache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
if (sessionFactory == null) {
log.warnf("KeycloakSessionFactory not yet set in cache. Worker skipped");
return InfinispanUserSessionInitializer.WorkerResult.create(segment, false);
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java
index b8aa0f8..3185a39 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java
@@ -26,7 +26,7 @@ import org.keycloak.models.KeycloakSession;
*/
public interface SessionLoader extends Serializable {
- void init(KeycloakSession session, int clusterStartupTime);
+ void init(KeycloakSession session);
int getSessionsCount(KeycloakSession session);
diff --git a/model/infinispan/src/main/resources/META-INF/services/org.keycloak.cluster.ClusterProviderFactory b/model/infinispan/src/main/resources/META-INF/services/org.keycloak.cluster.ClusterProviderFactory
new file mode 100644
index 0000000..c4c555f
--- /dev/null
+++ b/model/infinispan/src/main/resources/META-INF/services/org.keycloak.cluster.ClusterProviderFactory
@@ -0,0 +1,18 @@
+#
+# Copyright 2016 Red Hat, Inc. and/or its affiliates
+# and other contributors as indicated by the @author tags.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.keycloak.cluster.infinispan.InfinispanClusterProviderFactory
\ No newline at end of file
diff --git a/server-spi/src/main/java/org/keycloak/cluster/ClusterListener.java b/server-spi/src/main/java/org/keycloak/cluster/ClusterListener.java
new file mode 100644
index 0000000..b29848c
--- /dev/null
+++ b/server-spi/src/main/java/org/keycloak/cluster/ClusterListener.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster;
+
+import java.io.Serializable;
+
+/**
+ * Task to be executed on all cluster nodes once it's notified.
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public interface ClusterListener {
+
+ /**
+ * Registered task to be executed on all cluster nodes once it's notified from cache.
+ *
+ * @param event value of notification (Object added into the cache)
+ */
+ void run(ClusterEvent event);
+
+}
diff --git a/server-spi/src/main/java/org/keycloak/cluster/ClusterProvider.java b/server-spi/src/main/java/org/keycloak/cluster/ClusterProvider.java
new file mode 100644
index 0000000..70b64a1
--- /dev/null
+++ b/server-spi/src/main/java/org/keycloak/cluster/ClusterProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster;
+
+
+import java.util.concurrent.Callable;
+
+import org.keycloak.provider.Provider;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public interface ClusterProvider extends Provider {
+
+ /**
+ * Will use startup time of this server in non-cluster environment. Otherwise the value is same for all cluster nodes
+ */
+ int getClusterStartupTime();
+
+
+ /**
+ * Execute given task just if it's not already in progress (either on this or any other cluster node).
+ *
+ * @param taskKey
+ * @param taskTimeoutInSeconds timeout for given task. If there is existing task in progress for longer time, it's considered outdated so we will start our task.
+ * @param task
+ * @param <T>
+ * @return result with "executed" flag specifying if execution was executed or ignored.
+ */
+ <T> ExecutionResult<T> executeIfNotExecuted(String taskKey, int taskTimeoutInSeconds, Callable<T> task);
+
+
+ /**
+ * Register task (listener) under given key. When this key will be put to the cache on any cluster node, the task will be executed
+ *
+ * @param taskKey
+ * @param task
+ */
+ void registerListener(String taskKey, ClusterListener task);
+
+
+ /**
+ * Notify registered listeners on all cluster nodes
+ *
+ * @param taskKey
+ * @param event
+ */
+ void notify(String taskKey, ClusterEvent event);
+}
diff --git a/server-spi/src/main/java/org/keycloak/cluster/ClusterProviderFactory.java b/server-spi/src/main/java/org/keycloak/cluster/ClusterProviderFactory.java
new file mode 100644
index 0000000..41c00f4
--- /dev/null
+++ b/server-spi/src/main/java/org/keycloak/cluster/ClusterProviderFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster;
+
+import org.keycloak.provider.ProviderFactory;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public interface ClusterProviderFactory extends ProviderFactory<ClusterProvider> {
+}
diff --git a/server-spi/src/main/java/org/keycloak/cluster/ClusterSpi.java b/server-spi/src/main/java/org/keycloak/cluster/ClusterSpi.java
new file mode 100644
index 0000000..7eba4e3
--- /dev/null
+++ b/server-spi/src/main/java/org/keycloak/cluster/ClusterSpi.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster;
+
+import org.keycloak.provider.Provider;
+import org.keycloak.provider.ProviderFactory;
+import org.keycloak.provider.Spi;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ClusterSpi implements Spi {
+
+ @Override
+ public boolean isInternal() {
+ return true;
+ }
+
+ @Override
+ public String getName() {
+ return "cluster";
+ }
+
+ @Override
+ public Class<? extends Provider> getProviderClass() {
+ return ClusterProvider.class;
+ }
+
+ @Override
+ public Class<? extends ProviderFactory> getProviderFactoryClass() {
+ return ClusterProviderFactory.class;
+ }
+}
diff --git a/server-spi/src/main/java/org/keycloak/cluster/ExecutionResult.java b/server-spi/src/main/java/org/keycloak/cluster/ExecutionResult.java
new file mode 100644
index 0000000..b70e024
--- /dev/null
+++ b/server-spi/src/main/java/org/keycloak/cluster/ExecutionResult.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ExecutionResult<T> {
+
+ private final boolean executed;
+ private final T result;
+
+ private ExecutionResult(boolean executed, T result) {
+ this.executed = executed;
+ this.result = result;
+ }
+
+ public static <T> ExecutionResult<T> executed(T result) {
+ return new ExecutionResult<>(true, result);
+ }
+
+ public static <T> ExecutionResult<T> notExecuted() {
+ return new ExecutionResult<>(false, null);
+ }
+
+ public boolean isExecuted() {
+ return executed;
+ }
+
+ public T getResult() {
+ return result;
+ }
+}
diff --git a/server-spi/src/main/java/org/keycloak/models/UserFederationSyncResult.java b/server-spi/src/main/java/org/keycloak/models/UserFederationSyncResult.java
index b8911f2..a9a32f9 100644
--- a/server-spi/src/main/java/org/keycloak/models/UserFederationSyncResult.java
+++ b/server-spi/src/main/java/org/keycloak/models/UserFederationSyncResult.java
@@ -112,4 +112,10 @@ public class UserFederationSyncResult {
public static UserFederationSyncResult empty() {
return new UserFederationSyncResult();
}
+
+ public static UserFederationSyncResult ignored() {
+ UserFederationSyncResult result = new UserFederationSyncResult();
+ result.setIgnored(true);
+ return result;
+ }
}
diff --git a/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java b/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java
index 9d7c413..57cd49c 100755
--- a/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java
+++ b/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java
@@ -82,9 +82,6 @@ public interface UserSessionProvider extends Provider {
void removeClientInitialAccessModel(RealmModel realm, String id);
List<ClientInitialAccessModel> listClientInitialAccess(RealmModel realm);
- // Will use startup time of this server in non-cluster environment
- int getClusterStartupTime();
-
void close();
}
diff --git a/server-spi/src/main/resources/META-INF/services/org.keycloak.provider.Spi b/server-spi/src/main/resources/META-INF/services/org.keycloak.provider.Spi
index 9e7403b..7bb58fc 100755
--- a/server-spi/src/main/resources/META-INF/services/org.keycloak.provider.Spi
+++ b/server-spi/src/main/resources/META-INF/services/org.keycloak.provider.Spi
@@ -49,5 +49,6 @@ org.keycloak.authentication.ClientAuthenticatorSpi
org.keycloak.authentication.RequiredActionSpi
org.keycloak.authentication.FormAuthenticatorSpi
org.keycloak.authentication.FormActionSpi
+org.keycloak.cluster.ClusterSpi
diff --git a/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java b/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java
index e4174d9..e580af6 100755
--- a/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java
+++ b/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java
@@ -17,6 +17,7 @@
package org.keycloak.protocol.oidc;
+import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.ClientConnection;
import org.keycloak.OAuth2Constants;
import org.keycloak.OAuthErrorException;
@@ -193,7 +194,7 @@ public class TokenManager {
int currentTime = Time.currentTime();
if (realm.isRevokeRefreshToken()) {
- int clusterStartupTime = session.sessions().getClusterStartupTime();
+ int clusterStartupTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
if (refreshToken.getIssuedAt() < validation.clientSession.getTimestamp() && (clusterStartupTime != validation.clientSession.getTimestamp())) {
throw new OAuthErrorException(OAuthErrorException.INVALID_GRANT, "Stale token");
diff --git a/services/src/main/java/org/keycloak/services/managers/RealmManager.java b/services/src/main/java/org/keycloak/services/managers/RealmManager.java
index 55695a0..8b6a732 100755
--- a/services/src/main/java/org/keycloak/services/managers/RealmManager.java
+++ b/services/src/main/java/org/keycloak/services/managers/RealmManager.java
@@ -231,7 +231,7 @@ public class RealmManager implements RealmImporter {
// Remove all periodic syncs for configured federation providers
UsersSyncManager usersSyncManager = new UsersSyncManager();
for (final UserFederationProviderModel fedProvider : federationProviders) {
- usersSyncManager.removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), fedProvider);
+ usersSyncManager.notifyToRefreshPeriodicSync(session, realm, fedProvider, true);
}
}
return removed;
@@ -434,7 +434,7 @@ public class RealmManager implements RealmImporter {
List<UserFederationProviderModel> federationProviders = realm.getUserFederationProviders();
UsersSyncManager usersSyncManager = new UsersSyncManager();
for (final UserFederationProviderModel fedProvider : federationProviders) {
- usersSyncManager.refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), fedProvider, realm.getId());
+ usersSyncManager.notifyToRefreshPeriodicSync(session, realm, fedProvider, false);
}
return realm;
}
diff --git a/services/src/main/java/org/keycloak/services/managers/UsersSyncManager.java b/services/src/main/java/org/keycloak/services/managers/UsersSyncManager.java
index c44bca5..2177d04 100755
--- a/services/src/main/java/org/keycloak/services/managers/UsersSyncManager.java
+++ b/services/src/main/java/org/keycloak/services/managers/UsersSyncManager.java
@@ -16,6 +16,10 @@
*/
package org.keycloak.services.managers;
+import org.keycloak.cluster.ClusterEvent;
+import org.keycloak.cluster.ClusterListener;
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.cluster.ExecutionResult;
import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
@@ -30,13 +34,17 @@ import org.keycloak.services.ServicesLogger;
import org.keycloak.timer.TimerProvider;
+import java.io.Serializable;
import java.util.List;
+import java.util.concurrent.Callable;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class UsersSyncManager {
+ private static final String FEDERATION_TASK_KEY = "federation";
+
protected static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
/**
@@ -57,26 +65,106 @@ public class UsersSyncManager {
refreshPeriodicSyncForProvider(sessionFactory, timer, fedProvider, realm.getId());
}
}
+
+ ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
+ clusterProvider.registerListener(FEDERATION_TASK_KEY, new UserFederationClusterListener(sessionFactory));
}
});
}
- public UserFederationSyncResult syncAllUsers(final KeycloakSessionFactory sessionFactory, String realmId, final UserFederationProviderModel fedProvider) {
- final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
- updateLastSyncInterval(sessionFactory, fedProvider, realmId);
- return fedProviderFactory.syncAllUsers(sessionFactory, realmId, fedProvider);
+
+ private class Holder {
+ ExecutionResult<UserFederationSyncResult> result;
+ }
+
+ public UserFederationSyncResult syncAllUsers(final KeycloakSessionFactory sessionFactory, final String realmId, final UserFederationProviderModel fedProvider) {
+ final Holder holder = new Holder();
+
+ // Ensure not executed concurrently on this or any other cluster node
+ KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
+
+ @Override
+ public void run(KeycloakSession session) {
+ ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
+ // shared key for "full" and "changed" . Improve if needed
+ String taskKey = fedProvider.getId() + "::sync";
+
+ // 30 seconds minimal timeout for now
+ int timeout = Math.max(30, fedProvider.getFullSyncPeriod());
+ holder.result = clusterProvider.executeIfNotExecuted(taskKey, timeout, new Callable<UserFederationSyncResult>() {
+
+ @Override
+ public UserFederationSyncResult call() throws Exception {
+ final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
+ updateLastSyncInterval(sessionFactory, fedProvider, realmId);
+ return fedProviderFactory.syncAllUsers(sessionFactory, realmId, fedProvider);
+ }
+
+ });
+ }
+
+ });
+
+ if (holder.result == null || !holder.result.isExecuted()) {
+ logger.infof("syncAllUsers for federation provider %s was ignored as it's already in progress", fedProvider.getDisplayName());
+ return UserFederationSyncResult.ignored();
+ } else {
+ return holder.result.getResult();
+ }
+ }
+
+ public UserFederationSyncResult syncChangedUsers(final KeycloakSessionFactory sessionFactory, final String realmId, final UserFederationProviderModel fedProvider) {
+ final Holder holder = new Holder();
+
+ // Ensure not executed concurrently on this or any other cluster node
+ KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
+
+ @Override
+ public void run(KeycloakSession session) {
+ ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
+ // shared key for "full" and "changed" . Improve if needed
+ String taskKey = fedProvider.getId() + "::sync";
+
+ // 30 seconds minimal timeout for now
+ int timeout = Math.max(30, fedProvider.getChangedSyncPeriod());
+ holder.result = clusterProvider.executeIfNotExecuted(taskKey, timeout, new Callable<UserFederationSyncResult>() {
+
+ @Override
+ public UserFederationSyncResult call() throws Exception {
+ final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
+
+ // See when we did last sync.
+ int oldLastSync = fedProvider.getLastSync();
+ updateLastSyncInterval(sessionFactory, fedProvider, realmId);
+ return fedProviderFactory.syncChangedUsers(sessionFactory, realmId, fedProvider, Time.toDate(oldLastSync));
+ }
+
+ });
+ }
+
+ });
+
+ if (holder.result == null || !holder.result.isExecuted()) {
+ logger.infof("syncChangedUsers for federation provider %s was ignored as it's already in progress", fedProvider.getDisplayName());
+ return UserFederationSyncResult.ignored();
+ } else {
+ return holder.result.getResult();
+ }
}
- public UserFederationSyncResult syncChangedUsers(final KeycloakSessionFactory sessionFactory, String realmId, final UserFederationProviderModel fedProvider) {
- final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
- // See when we did last sync.
- int oldLastSync = fedProvider.getLastSync();
- updateLastSyncInterval(sessionFactory, fedProvider, realmId);
- return fedProviderFactory.syncChangedUsers(sessionFactory, realmId, fedProvider, Time.toDate(oldLastSync));
+ // Ensure all cluster nodes are notified
+ public void notifyToRefreshPeriodicSync(KeycloakSession session, RealmModel realm, UserFederationProviderModel federationProvider, boolean removed) {
+ FederationProviderClusterEvent event = FederationProviderClusterEvent.createEvent(removed, realm.getId(), federationProvider);
+ session.getProvider(ClusterProvider.class).notify(FEDERATION_TASK_KEY, event);
}
- public void refreshPeriodicSyncForProvider(final KeycloakSessionFactory sessionFactory, TimerProvider timer, final UserFederationProviderModel fedProvider, final String realmId) {
+
+ // Executed once it receives notification that some UserFederationProvider was created or updated
+ protected void refreshPeriodicSyncForProvider(final KeycloakSessionFactory sessionFactory, TimerProvider timer, final UserFederationProviderModel fedProvider, final String realmId) {
+ logger.infof("Going to refresh periodic sync for provider '%s' . Full sync period: %d , changed users sync period: %d",
+ fedProvider.getDisplayName(), fedProvider.getFullSyncPeriod(), fedProvider.getChangedSyncPeriod());
+
if (fedProvider.getFullSyncPeriod() > 0) {
// We want periodic full sync for this provider
timer.schedule(new Runnable() {
@@ -84,7 +172,12 @@ public class UsersSyncManager {
@Override
public void run() {
try {
- syncAllUsers(sessionFactory, realmId, fedProvider);
+ boolean shouldPerformSync = shouldPerformNewPeriodicSync(fedProvider.getLastSync(), fedProvider.getChangedSyncPeriod());
+ if (shouldPerformSync) {
+ syncAllUsers(sessionFactory, realmId, fedProvider);
+ } else {
+ logger.infof("Ignored periodic full sync with federation provider %s due small time since last sync", fedProvider.getDisplayName());
+ }
} catch (Throwable t) {
logger.errorDuringFullUserSync(t);
}
@@ -102,7 +195,12 @@ public class UsersSyncManager {
@Override
public void run() {
try {
- syncChangedUsers(sessionFactory, realmId, fedProvider);
+ boolean shouldPerformSync = shouldPerformNewPeriodicSync(fedProvider.getLastSync(), fedProvider.getChangedSyncPeriod());
+ if (shouldPerformSync) {
+ syncChangedUsers(sessionFactory, realmId, fedProvider);
+ } else {
+ logger.infof("Ignored periodic changed-users sync with federation provider %s due small time since last sync", fedProvider.getDisplayName());
+ }
} catch (Throwable t) {
logger.errorDuringChangedUserSync(t);
}
@@ -115,7 +213,21 @@ public class UsersSyncManager {
}
}
- public void removePeriodicSyncForProvider(TimerProvider timer, final UserFederationProviderModel fedProvider) {
+ // Skip syncing if there is short time since last sync time.
+ private boolean shouldPerformNewPeriodicSync(int lastSyncTime, int period) {
+ if (lastSyncTime <= 0) {
+ return true;
+ }
+
+ int currentTime = Time.currentTime();
+ int timeSinceLastSync = currentTime - lastSyncTime;
+
+ return (timeSinceLastSync * 2 > period);
+ }
+
+ // Executed once it receives notification that some UserFederationProvider was removed
+ protected void removePeriodicSyncForProvider(TimerProvider timer, UserFederationProviderModel fedProvider) {
+ logger.infof("Removing periodic sync for provider %s", fedProvider.getDisplayName());
timer.cancelTask(fedProvider.getId() + "-FULL");
timer.cancelTask(fedProvider.getId() + "-CHANGED");
}
@@ -144,4 +256,73 @@ public class UsersSyncManager {
});
}
+
+ private class UserFederationClusterListener implements ClusterListener {
+
+ private final KeycloakSessionFactory sessionFactory;
+
+ public UserFederationClusterListener(KeycloakSessionFactory sessionFactory) {
+ this.sessionFactory = sessionFactory;
+ }
+
+ @Override
+ public void run(ClusterEvent event) {
+ final FederationProviderClusterEvent fedEvent = (FederationProviderClusterEvent) event;
+ KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
+
+ @Override
+ public void run(KeycloakSession session) {
+ TimerProvider timer = session.getProvider(TimerProvider.class);
+ if (fedEvent.isRemoved()) {
+ removePeriodicSyncForProvider(timer, fedEvent.getFederationProvider());
+ } else {
+ refreshPeriodicSyncForProvider(sessionFactory, timer, fedEvent.getFederationProvider(), fedEvent.getRealmId());
+ }
+ }
+
+ });
+ }
+ }
+
+
+ // Send to cluster during each update or remove of federationProvider, so all nodes can update sync periods
+ public static class FederationProviderClusterEvent implements ClusterEvent {
+
+ private boolean removed;
+ private String realmId;
+ private UserFederationProviderModel federationProvider;
+
+ public boolean isRemoved() {
+ return removed;
+ }
+
+ public void setRemoved(boolean removed) {
+ this.removed = removed;
+ }
+
+ public String getRealmId() {
+ return realmId;
+ }
+
+ public void setRealmId(String realmId) {
+ this.realmId = realmId;
+ }
+
+ public UserFederationProviderModel getFederationProvider() {
+ return federationProvider;
+ }
+
+ public void setFederationProvider(UserFederationProviderModel federationProvider) {
+ this.federationProvider = federationProvider;
+ }
+
+ public static FederationProviderClusterEvent createEvent(boolean removed, String realmId, UserFederationProviderModel fedProvider) {
+ FederationProviderClusterEvent notification = new FederationProviderClusterEvent();
+ notification.setRemoved(removed);
+ notification.setRealmId(realmId);
+ notification.setFederationProvider(fedProvider);
+ return notification;
+ }
+ }
+
}
diff --git a/services/src/main/java/org/keycloak/services/resources/admin/RealmAdminResource.java b/services/src/main/java/org/keycloak/services/resources/admin/RealmAdminResource.java
index 15b0e81..f0cbe8b 100644
--- a/services/src/main/java/org/keycloak/services/resources/admin/RealmAdminResource.java
+++ b/services/src/main/java/org/keycloak/services/resources/admin/RealmAdminResource.java
@@ -242,7 +242,7 @@ public class RealmAdminResource {
List<UserFederationProviderModel> federationProviders = realm.getUserFederationProviders();
UsersSyncManager usersSyncManager = new UsersSyncManager();
for (final UserFederationProviderModel fedProvider : federationProviders) {
- usersSyncManager.refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), fedProvider, realm.getId());
+ usersSyncManager.notifyToRefreshPeriodicSync(session, realm, fedProvider, false);
}
adminEvent.operation(OperationType.UPDATE).representation(rep).success();
diff --git a/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProviderResource.java b/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProviderResource.java
index 1000ead..0eb1748 100755
--- a/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProviderResource.java
+++ b/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProviderResource.java
@@ -106,7 +106,7 @@ public class UserFederationProviderResource {
UserFederationProviderModel model = new UserFederationProviderModel(rep.getId(), rep.getProviderName(), rep.getConfig(), rep.getPriority(), displayName,
rep.getFullSyncPeriod(), rep.getChangedSyncPeriod(), rep.getLastSync());
realm.updateUserFederationProvider(model);
- new UsersSyncManager().refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), model, realm.getId());
+ new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, model, false);
boolean kerberosCredsAdded = UserFederationProvidersResource.checkKerberosCredential(session, realm, model);
if (kerberosCredsAdded) {
logger.addedKerberosToRealmCredentials();
@@ -138,7 +138,7 @@ public class UserFederationProviderResource {
auth.requireManage();
realm.removeUserFederationProvider(this.federationProviderModel);
- new UsersSyncManager().removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), this.federationProviderModel);
+ new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, this.federationProviderModel, true);
adminEvent.operation(OperationType.DELETE).resourcePath(uriInfo).success();
diff --git a/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProvidersResource.java b/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProvidersResource.java
index 30a93e4..995cda4 100755
--- a/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProvidersResource.java
+++ b/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProvidersResource.java
@@ -178,7 +178,7 @@ public class UserFederationProvidersResource {
}
UserFederationProviderModel model = realm.addUserFederationProvider(rep.getProviderName(), rep.getConfig(), rep.getPriority(), displayName,
rep.getFullSyncPeriod(), rep.getChangedSyncPeriod(), rep.getLastSync());
- new UsersSyncManager().refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), model, realm.getId());
+ new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, model, false);
boolean kerberosCredsAdded = checkKerberosCredential(session, realm, model);
if (kerberosCredsAdded) {
logger.addedKerberosToRealmCredentials();
diff --git a/services/src/main/java/org/keycloak/services/resources/KeycloakApplication.java b/services/src/main/java/org/keycloak/services/resources/KeycloakApplication.java
index 4b49d79..0ef806b 100644
--- a/services/src/main/java/org/keycloak/services/resources/KeycloakApplication.java
+++ b/services/src/main/java/org/keycloak/services/resources/KeycloakApplication.java
@@ -38,6 +38,7 @@ import org.keycloak.services.managers.UsersSyncManager;
import org.keycloak.services.resources.admin.AdminRoot;
import org.keycloak.services.scheduled.ClearExpiredEvents;
import org.keycloak.services.scheduled.ClearExpiredUserSessions;
+import org.keycloak.services.scheduled.ClusterAwareScheduledTaskRunner;
import org.keycloak.services.scheduled.ScheduledTaskRunner;
import org.keycloak.services.util.JsonConfigProvider;
import org.keycloak.services.util.ObjectMapperResolver;
@@ -217,8 +218,8 @@ public class KeycloakApplication extends Application {
KeycloakSession session = sessionFactory.create();
try {
TimerProvider timer = session.getProvider(TimerProvider.class);
- timer.schedule(new ScheduledTaskRunner(sessionFactory, new ClearExpiredEvents()), interval, "ClearExpiredEvents");
- timer.schedule(new ScheduledTaskRunner(sessionFactory, new ClearExpiredUserSessions()), interval, "ClearExpiredUserSessions");
+ timer.schedule(new ClusterAwareScheduledTaskRunner(sessionFactory, new ClearExpiredEvents(), interval), interval, "ClearExpiredEvents");
+ timer.schedule(new ClusterAwareScheduledTaskRunner(sessionFactory, new ClearExpiredUserSessions(), interval), interval, "ClearExpiredUserSessions");
new UsersSyncManager().bootstrapPeriodic(sessionFactory, timer);
} finally {
session.close();
diff --git a/services/src/main/java/org/keycloak/services/scheduled/ClusterAwareScheduledTaskRunner.java b/services/src/main/java/org/keycloak/services/scheduled/ClusterAwareScheduledTaskRunner.java
new file mode 100644
index 0000000..7f60891
--- /dev/null
+++ b/services/src/main/java/org/keycloak/services/scheduled/ClusterAwareScheduledTaskRunner.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.services.scheduled;
+
+import java.util.concurrent.Callable;
+
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.cluster.ExecutionResult;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.KeycloakSessionFactory;
+
+/**
+ * Ensures that there are not concurrent executions of same task (either on this host or any other cluster host)
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ClusterAwareScheduledTaskRunner extends ScheduledTaskRunner {
+
+ private final int intervalSecs;
+
+ public ClusterAwareScheduledTaskRunner(KeycloakSessionFactory sessionFactory, ScheduledTask task, long intervalMillis) {
+ super(sessionFactory, task);
+ this.intervalSecs = (int) (intervalMillis / 1000);
+ }
+
+ @Override
+ protected void runTask(final KeycloakSession session) {
+ session.getTransaction().begin();
+
+ ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
+ String taskKey = task.getClass().getSimpleName();
+
+ ExecutionResult<Void> result = clusterProvider.executeIfNotExecuted(taskKey, intervalSecs, new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ task.run(session);
+ return null;
+ }
+
+ });
+
+ session.getTransaction().commit();
+
+ if (result.isExecuted()) {
+ logger.debugf("Executed scheduled task %s", taskKey);
+ } else {
+ logger.debugf("Skipped execution of task %s as other cluster node is executing it", taskKey);
+ }
+ }
+
+
+}
diff --git a/services/src/main/java/org/keycloak/services/scheduled/ScheduledTaskRunner.java b/services/src/main/java/org/keycloak/services/scheduled/ScheduledTaskRunner.java
index 51de35f..33dc91a 100644
--- a/services/src/main/java/org/keycloak/services/scheduled/ScheduledTaskRunner.java
+++ b/services/src/main/java/org/keycloak/services/scheduled/ScheduledTaskRunner.java
@@ -17,6 +17,10 @@
package org.keycloak.services.scheduled;
+import java.util.concurrent.Callable;
+
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.cluster.ExecutionResult;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.services.ServicesLogger;
@@ -26,10 +30,10 @@ import org.keycloak.services.ServicesLogger;
*/
public class ScheduledTaskRunner implements Runnable {
- private static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
+ protected static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
- private final KeycloakSessionFactory sessionFactory;
- private final ScheduledTask task;
+ protected final KeycloakSessionFactory sessionFactory;
+ protected final ScheduledTask task;
public ScheduledTaskRunner(KeycloakSessionFactory sessionFactory, ScheduledTask task) {
this.sessionFactory = sessionFactory;
@@ -40,11 +44,7 @@ public class ScheduledTaskRunner implements Runnable {
public void run() {
KeycloakSession session = sessionFactory.create();
try {
- session.getTransaction().begin();
- task.run(session);
- session.getTransaction().commit();
-
- logger.debug("Executed scheduled task " + task.getClass().getSimpleName());
+ runTask(session);
} catch (Throwable t) {
logger.failedToRunScheduledTask(t, task.getClass().getSimpleName());
@@ -58,4 +58,12 @@ public class ScheduledTaskRunner implements Runnable {
}
}
+ protected void runTask(KeycloakSession session) {
+ session.getTransaction().begin();
+ task.run(session);
+ session.getTransaction().commit();
+
+ logger.debug("Executed scheduled task " + task.getClass().getSimpleName());
+ }
+
}
diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncDummyUserFederationProviderFactory.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncDummyUserFederationProviderFactory.java
index 517d45e..5d327f4 100644
--- a/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncDummyUserFederationProviderFactory.java
+++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncDummyUserFederationProviderFactory.java
@@ -20,6 +20,8 @@ package org.keycloak.testsuite.federation.sync;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.jboss.logging.Logger;
import org.keycloak.models.KeycloakSession;
@@ -37,6 +39,17 @@ import org.keycloak.testsuite.DummyUserFederationProviderFactory;
*/
public class SyncDummyUserFederationProviderFactory extends DummyUserFederationProviderFactory {
+ // Used during SyncFederationTest
+ static volatile CountDownLatch latch1 = new CountDownLatch(1);
+ static volatile CountDownLatch latch2 = new CountDownLatch(1);
+
+ static void restartLatches() {
+ latch1 = new CountDownLatch(1);
+ latch2 = new CountDownLatch(1);
+ }
+
+
+
private static final Logger logger = Logger.getLogger(SyncDummyUserFederationProviderFactory.class);
public static final String SYNC_PROVIDER_ID = "sync-dummy";
@@ -68,7 +81,7 @@ public class SyncDummyUserFederationProviderFactory extends DummyUserFederationP
RealmModel realm = session.realms().getRealm(realmId);
// KEYCLOAK-2412 : Just remove and add some users for testing purposes
- for (int i = 0; i<10; i++) {
+ for (int i = 0; i < 10; i++) {
String username = "dummyuser-" + i;
UserModel user = session.userStorage().getUserByUsername(username, realm);
@@ -83,7 +96,7 @@ public class SyncDummyUserFederationProviderFactory extends DummyUserFederationP
try {
- Thread.sleep(waitTime * 1000);
+ latch1.await(waitTime * 1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted!", ie);
@@ -94,6 +107,9 @@ public class SyncDummyUserFederationProviderFactory extends DummyUserFederationP
});
+ // countDown, so the SyncFederationTest can continue
+ latch2.countDown();
+
return new UserFederationSyncResult();
}
diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncFederationTest.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncFederationTest.java
index 8259fe8..f3e2ec0 100644
--- a/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncFederationTest.java
+++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncFederationTest.java
@@ -19,6 +19,7 @@ package org.keycloak.testsuite.federation.sync;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -84,7 +85,8 @@ public class SyncFederationTest {
sleep(1800);
// Cancel timer
- usersSyncManager.removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), dummyModel);
+ RealmModel appRealm = session.realms().getRealmByName("test");
+ usersSyncManager.notifyToRefreshPeriodicSync(session, appRealm, dummyModel, true);
// Assert that DummyUserFederationProviderFactory.syncChangedUsers was invoked
int newChanged = dummyFedFactory.getChangedSyncCounter();
@@ -111,7 +113,9 @@ public class SyncFederationTest {
}
@Test
- public void test02ConcurrentSync() {
+ public void test02ConcurrentSync() throws Exception {
+ SyncDummyUserFederationProviderFactory.restartLatches();
+
// Enable timer for SyncDummyUserFederationProvider
keycloakRule.update(new KeycloakRule.KeycloakSetup() {
@@ -139,11 +143,17 @@ public class SyncFederationTest {
Assert.assertTrue(syncResult.isIgnored());
// Cancel timer
- usersSyncManager.removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), dummyModel);
+ usersSyncManager.notifyToRefreshPeriodicSync(session, realm, dummyModel, true);
+
+ // Signal to factory to finish waiting
+ SyncDummyUserFederationProviderFactory.latch1.countDown();
+
} finally {
keycloakRule.stopSession(session, true);
}
+ SyncDummyUserFederationProviderFactory.latch2.await(20000, TimeUnit.MILLISECONDS);
+
// remove provider
keycloakRule.update(new KeycloakRule.KeycloakSetup() {
diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java
index 2d763b6..d224b3e 100644
--- a/testsuite/integration/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java
+++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java
@@ -26,6 +26,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.keycloak.cluster.ClusterProvider;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.ClientModel;
import org.keycloak.models.ClientSessionModel;
@@ -83,7 +84,7 @@ public class UserSessionInitializerTest {
// Create and persist offline sessions
int started = Time.currentTime();
- int serverStartTime = session.sessions().getClusterStartupTime();
+ int serverStartTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
for (UserSessionModel origSession : origSessions) {
UserSessionModel userSession = session.sessions().getUserSession(realm, origSession.getId());
diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/util/cli/SyncDummyFederationProviderCommand.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/util/cli/SyncDummyFederationProviderCommand.java
index 7a31d6d..452fd59 100644
--- a/testsuite/integration/src/test/java/org/keycloak/testsuite/util/cli/SyncDummyFederationProviderCommand.java
+++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/util/cli/SyncDummyFederationProviderCommand.java
@@ -47,10 +47,11 @@ public class SyncDummyFederationProviderCommand extends AbstractCommand {
} else {
Map<String, String> cfg = fedProviderModel.getConfig();
updateConfig(cfg, waitTime);
+ fedProviderModel.setChangedSyncPeriod(changedSyncPeriod);
realm.updateUserFederationProvider(fedProviderModel);
}
- new UsersSyncManager().refreshPeriodicSyncForProvider(sessionFactory, session.getProvider(TimerProvider.class), fedProviderModel, "master");
+ new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, fedProviderModel, false);
log.infof("User federation provider created and sync was started", waitTime);
}