keycloak-memoizeit

KEYCLOAK-2412 Added ClusterProvider. Avoid concurrent federation

2/17/2016 3:01:28 AM

Changes

Details

diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProvider.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProvider.java
new file mode 100644
index 0000000..9986dad
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProvider.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+import org.infinispan.Cache;
+import org.infinispan.context.Flag;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.remoting.transport.Transport;
+import org.jboss.logging.Logger;
+import org.keycloak.cluster.ClusterEvent;
+import org.keycloak.cluster.ClusterListener;
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.cluster.ExecutionResult;
+import org.keycloak.common.util.Time;
+import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
+import org.keycloak.models.KeycloakSession;
+
+/**
+ * Various utils related to clustering
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class InfinispanClusterProvider implements ClusterProvider {
+
+    protected static final Logger logger = Logger.getLogger(InfinispanClusterProvider.class);
+
+    public static final String CLUSTER_STARTUP_TIME_KEY = "cluster-start-time";
+    private static final String TASK_KEY_PREFIX = "task::";
+
+    private final InfinispanClusterProviderFactory factory;
+    private final KeycloakSession session;
+    private final Cache<String, Serializable> cache;
+
+    public InfinispanClusterProvider(InfinispanClusterProviderFactory factory, KeycloakSession session, Cache<String, Serializable> cache) {
+        this.factory = factory;
+        this.session = session;
+        this.cache = cache;
+    }
+
+
+    @Override
+    public int getClusterStartupTime() {
+        Integer existingClusterStartTime = (Integer) cache.get(InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY);
+        if (existingClusterStartTime != null) {
+            return existingClusterStartTime;
+        } else {
+            // clusterStartTime not yet initialized. Let's try to put our startupTime
+            int serverStartTime = (int) (session.getKeycloakSessionFactory().getServerStartupTimestamp() / 1000);
+
+            existingClusterStartTime = (Integer) cache.putIfAbsent(InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY, serverStartTime);
+            if (existingClusterStartTime == null) {
+                logger.infof("Initialized cluster startup time to %s", Time.toDate(serverStartTime).toString());
+                return serverStartTime;
+            } else {
+                return existingClusterStartTime;
+            }
+        }
+    }
+
+
+    @Override
+    public void close() {
+    }
+
+
+    @Override
+    public <T> ExecutionResult<T> executeIfNotExecuted(String taskKey, int taskTimeoutInSeconds, Callable<T> task) {
+        String cacheKey = TASK_KEY_PREFIX + taskKey;
+        boolean locked = tryLock(cacheKey, taskTimeoutInSeconds);
+        if (locked) {
+            try {
+                try {
+                    T result = task.call();
+                    return ExecutionResult.executed(result);
+                } catch (RuntimeException re) {
+                    throw re;
+                } catch (Exception e) {
+                    throw new RuntimeException("Unexpected exception when executed task " + taskKey, e);
+                }
+            } finally {
+                removeFromCache(cacheKey);
+            }
+        } else {
+            return ExecutionResult.notExecuted();
+        }
+    }
+
+
+    @Override
+    public void registerListener(String taskKey, ClusterListener task) {
+        factory.registerListener(taskKey, task);
+    }
+
+
+    @Override
+    public void notify(String taskKey, ClusterEvent event) {
+        // Put the value to the cache to notify listeners on all the nodes
+        cache.put(taskKey, event);
+    }
+
+
+    private String getCurrentNode(Cache<String, Serializable> cache) {
+        Transport transport = cache.getCacheManager().getTransport();
+        return transport==null ? "local" : transport.getAddress().toString();
+    }
+
+
+    private LockEntry createLockEntry(Cache<String, Serializable> cache) {
+        LockEntry lock = new LockEntry();
+        lock.setNode(getCurrentNode(cache));
+        lock.setTimestamp(Time.currentTime());
+        return lock;
+    }
+
+
+    private boolean tryLock(String cacheKey, int taskTimeoutInSeconds) {
+        LockEntry myLock = createLockEntry(cache);
+
+        LockEntry existingLock = (LockEntry) cache.putIfAbsent(cacheKey, myLock);
+        if (existingLock != null) {
+            // Task likely already in progress. Check if timestamp is not outdated
+            int thatTime = existingLock.getTimestamp();
+            int currentTime = Time.currentTime();
+            if (thatTime + taskTimeoutInSeconds < currentTime) {
+                logger.infof("Task %s outdated when in progress by node %s. Will try to replace task with our node %s", cacheKey, existingLock.getNode(), myLock.getNode());
+                boolean replaced = cache.replace(cacheKey, existingLock, myLock);
+                // TODO: trace
+                if (!replaced) {
+                    logger.infof("Failed to replace the task %s. Other thread replaced in the meantime. Ignoring task.", cacheKey);
+                }
+                return replaced;
+            } else {
+                logger.infof("Task %s in progress already by node %s. Ignoring task.", cacheKey, existingLock.getNode());
+                return false;
+            }
+        } else {
+            logger.infof("Successfully acquired lock for task %s. Our node is %s", cacheKey, myLock.getNode());
+            return true;
+        }
+    }
+
+
+    private void removeFromCache(String cacheKey) {
+        // 3 attempts to send the message (it may fail if some node fails in the meantime)
+        int retry = 3;
+        while (true) {
+            try {
+                cache.getAdvancedCache()
+                        .withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FORCE_SYNCHRONOUS)
+                        .remove(cacheKey);
+                logger.infof("Task %s removed from the cache", cacheKey);
+                return;
+            } catch (RuntimeException e) {
+                ComponentStatus status = cache.getStatus();
+                if (status.isStopping() || status.isTerminated()) {
+                    logger.warnf("Failed to remove task %s from the cache. Cache is already terminating", cacheKey);
+                    logger.debug(e.getMessage(), e);
+                    return;
+                }
+                retry--;
+                if (retry == 0) {
+                    throw e;
+                }
+            }
+        }
+    }
+
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProviderFactory.java
new file mode 100644
index 0000000..48adfe1
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanClusterProviderFactory.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.infinispan.Cache;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
+import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
+import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
+import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.remoting.transport.Transport;
+import org.jboss.logging.Logger;
+import org.keycloak.Config;
+import org.keycloak.cluster.ClusterEvent;
+import org.keycloak.cluster.ClusterListener;
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.cluster.ClusterProviderFactory;
+import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.KeycloakSessionFactory;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class InfinispanClusterProviderFactory implements ClusterProviderFactory {
+
+    public static final String PROVIDER_ID = "infinispan";
+
+    protected static final Logger logger = Logger.getLogger(InfinispanClusterProviderFactory.class);
+
+    private volatile Cache<String, Serializable> workCache;
+
+    private Map<String, ClusterListener> listeners = new HashMap<>();
+
+    @Override
+    public ClusterProvider create(KeycloakSession session) {
+        lazyInit(session);
+        return new InfinispanClusterProvider(this, session, workCache);
+    }
+
+    private void lazyInit(KeycloakSession session) {
+        if (workCache == null) {
+            synchronized (this) {
+                if (workCache == null) {
+                    workCache = session.getProvider(InfinispanConnectionProvider.class).getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
+                    workCache.getCacheManager().addListener(new ViewChangeListener());
+                    workCache.addListener(new CacheEntryListener());
+                }
+            }
+        }
+    }
+
+    @Override
+    public void init(Config.Scope config) {
+    }
+
+    @Override
+    public void postInit(KeycloakSessionFactory factory) {
+    }
+
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public String getId() {
+        return PROVIDER_ID;
+    }
+
+
+    @Listener
+    public class ViewChangeListener {
+
+        @ViewChanged
+        public void viewChanged(ViewChangedEvent event) {
+            EmbeddedCacheManager cacheManager = event.getCacheManager();
+            Transport transport = cacheManager.getTransport();
+
+            // Coordinator makes sure that entries for outdated nodes are cleaned up
+            if (transport != null && transport.isCoordinator()) {
+
+                Set<String> newAddresses = convertAddresses(event.getNewMembers());
+                Set<String> removedNodesAddresses = convertAddresses(event.getOldMembers());
+                removedNodesAddresses.removeAll(newAddresses);
+
+                if (removedNodesAddresses.isEmpty()) {
+                    return;
+                }
+
+                logger.infof("Nodes %s removed from cluster. Removing tasks locked by this nodes", removedNodesAddresses.toString());
+
+                Cache<String, Serializable> cache = cacheManager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
+
+                Iterator<String> toRemove = cache.entrySet().stream().filter(new Predicate<Map.Entry<String, Serializable>>() {
+
+                    @Override
+                    public boolean test(Map.Entry<String, Serializable> entry) {
+                        if (!(entry.getValue() instanceof LockEntry)) {
+                            return false;
+                        }
+
+                        LockEntry lock = (LockEntry) entry.getValue();
+                        return removedNodesAddresses.contains(lock.getNode());
+                    }
+
+                }).map(new Function<Map.Entry<String, Serializable>, String>() {
+
+                    @Override
+                    public String apply(Map.Entry<String, Serializable> entry) {
+                        return entry.getKey();
+                    }
+
+                }).iterator();
+
+                while (toRemove.hasNext()) {
+                    String rem = toRemove.next();
+                    logger.infof("Removing task %s due it's node left cluster", rem);
+                    cache.remove(rem);
+                }
+            }
+        }
+
+        private Set<String> convertAddresses(Collection<Address> addresses) {
+            return addresses.stream().map(new Function<Address, String>() {
+
+                @Override
+                public String apply(Address address) {
+                    return address.toString();
+                }
+
+            }).collect(Collectors.toSet());
+        }
+
+    }
+
+
+    <T> void registerListener(String taskKey, ClusterListener task) {
+        listeners.put(taskKey, task);
+    }
+
+    @Listener
+    public class CacheEntryListener {
+
+        @CacheEntryCreated
+        public void cacheEntryCreated(CacheEntryCreatedEvent<String, Object> event) {
+            if (!event.isPre()) {
+                trigger(event.getKey(), event.getValue());
+            }
+        }
+
+        @CacheEntryModified
+        public void cacheEntryModified(CacheEntryModifiedEvent<String, Object> event) {
+            if (!event.isPre()) {
+                trigger(event.getKey(), event.getValue());
+            }
+        }
+
+        private void trigger(String key, Object value) {
+            ClusterListener task = listeners.get(key);
+            if (task != null) {
+                ClusterEvent event = (ClusterEvent) value;
+                task.run(event);
+            }
+        }
+    }
+
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/LockEntry.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/LockEntry.java
new file mode 100644
index 0000000..f6a795a
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/LockEntry.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster.infinispan;
+
+import java.io.Serializable;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class LockEntry implements Serializable {
+
+    private String node;
+    private int timestamp;
+
+    public String getNode() {
+        return node;
+    }
+
+    public void setNode(String node) {
+        this.node = node;
+    }
+
+    public int getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(int timestamp) {
+        this.timestamp = timestamp;
+    }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java
index b90ef33..01d229f 100755
--- a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java
+++ b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProviderFactory.java
@@ -23,6 +23,8 @@ import org.infinispan.configuration.cache.ConfigurationBuilder;
 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
 import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.transaction.LockingMode;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
 import org.jboss.logging.Logger;
 import org.keycloak.Config;
 import org.keycloak.models.KeycloakSession;
@@ -145,6 +147,13 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
         cacheManager.defineConfiguration(InfinispanConnectionProvider.SESSION_CACHE_NAME, sessionCacheConfiguration);
         cacheManager.defineConfiguration(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME, sessionCacheConfiguration);
         cacheManager.defineConfiguration(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME, sessionCacheConfiguration);
+
+        ConfigurationBuilder replicationConfigBuilder = new ConfigurationBuilder();
+        if (clustered) {
+            replicationConfigBuilder.clustering().cacheMode(async ? CacheMode.REPL_ASYNC : CacheMode.REPL_SYNC);
+        }
+        Configuration replicationCacheConfiguration = replicationConfigBuilder.build();
+        cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, replicationCacheConfiguration);
     }
 
 }
diff --git a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java
index 2c3ca14..b37beb9 100644
--- a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java
@@ -30,6 +30,7 @@ public interface InfinispanConnectionProvider extends Provider {
     static final String SESSION_CACHE_NAME = "sessions";
     static final String OFFLINE_SESSION_CACHE_NAME = "offlineSessions";
     static final String LOGIN_FAILURE_CACHE_NAME = "loginFailures";
+    static final String WORK_CACHE_NAME = "work";
 
     <K, V> Cache<K, V> getCache(String name);
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
index 1fc04de..8602834 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java
@@ -21,15 +21,41 @@ import org.infinispan.Cache;
 import org.infinispan.CacheStream;
 import org.jboss.logging.Logger;
 import org.keycloak.common.util.Time;
-import org.keycloak.models.*;
+import org.keycloak.models.ClientInitialAccessModel;
+import org.keycloak.models.ClientModel;
+import org.keycloak.models.ClientSessionModel;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.KeycloakTransaction;
+import org.keycloak.models.RealmModel;
+import org.keycloak.models.UserModel;
+import org.keycloak.models.UserSessionModel;
+import org.keycloak.models.UserSessionProvider;
+import org.keycloak.models.UsernameLoginFailureModel;
 import org.keycloak.models.session.UserSessionPersisterProvider;
-import org.keycloak.models.sessions.infinispan.entities.*;
-import org.keycloak.models.sessions.infinispan.initializer.TimeAwareInitializerState;
-import org.keycloak.models.sessions.infinispan.stream.*;
+import org.keycloak.models.sessions.infinispan.entities.ClientInitialAccessEntity;
+import org.keycloak.models.sessions.infinispan.entities.ClientSessionEntity;
+import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
+import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
+import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
+import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+import org.keycloak.models.sessions.infinispan.stream.ClientInitialAccessPredicate;
+import org.keycloak.models.sessions.infinispan.stream.ClientSessionPredicate;
+import org.keycloak.models.sessions.infinispan.stream.Comparators;
+import org.keycloak.models.sessions.infinispan.stream.Mappers;
+import org.keycloak.models.sessions.infinispan.stream.SessionPredicate;
+import org.keycloak.models.sessions.infinispan.stream.UserLoginFailurePredicate;
+import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
 import org.keycloak.models.utils.KeycloakModelUtils;
 import org.keycloak.models.utils.RealmInfoUtil;
 
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
@@ -412,19 +438,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
     }
 
     @Override
-    public int getClusterStartupTime() {
-        TimeAwareInitializerState state = (TimeAwareInitializerState) offlineSessionCache.get(InfinispanUserSessionProviderFactory.SESSION_INITIALIZER_STATE_KEY);
-        int startTime;
-        if (state == null) {
-            log.warn("Cluster startup time not yet available. Fallback to local startup time");
-            startTime = (int)(session.getKeycloakSessionFactory().getServerStartupTimestamp() / 1000);
-        } else {
-            startTime = state.getClusterStartupTime();
-        }
-        return startTime;
-    }
-
-    @Override
     public void close() {
     }
 
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java
index 1bb82a1..a7c312f 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java
@@ -17,6 +17,8 @@
 
 package org.keycloak.models.sessions.infinispan;
 
+import java.io.Serializable;
+
 import org.infinispan.Cache;
 import org.jboss.logging.Logger;
 import org.keycloak.Config;
@@ -34,9 +36,6 @@ import org.keycloak.provider.ProviderEventListener;
 
 public class InfinispanUserSessionProviderFactory implements UserSessionProviderFactory {
 
-    private static final String STATE_KEY_PREFIX = "initializerState";
-    public static final String SESSION_INITIALIZER_STATE_KEY = STATE_KEY_PREFIX + "::offlineUserSessions";
-
     private static final Logger log = Logger.getLogger(InfinispanUserSessionProviderFactory.class);
 
     private Config.Scope config;
@@ -85,9 +84,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
             @Override
             public void run(KeycloakSession session) {
                 InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
-                Cache<String, SessionEntity> cache = connections.getCache(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME);
+                Cache<String, Serializable> cache = connections.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
 
-                InfinispanUserSessionInitializer initializer = new InfinispanUserSessionInitializer(sessionFactory, cache, new OfflineUserSessionLoader(), maxErrors, sessionsPerSegment, SESSION_INITIALIZER_STATE_KEY);
+                InfinispanUserSessionInitializer initializer = new InfinispanUserSessionInitializer(sessionFactory, cache, new OfflineUserSessionLoader(), maxErrors, sessionsPerSegment, "offlineUserSessions");
                 initializer.initCache();
                 initializer.loadPersistentSessions();
             }
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanUserSessionInitializer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanUserSessionInitializer.java
index deae897..8dd9575 100755
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanUserSessionInitializer.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanUserSessionInitializer.java
@@ -20,57 +20,55 @@ package org.keycloak.models.sessions.infinispan.initializer;
 import java.io.Serializable;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 import org.infinispan.Cache;
 import org.infinispan.context.Flag;
 import org.infinispan.distexec.DefaultExecutorService;
-import org.infinispan.notifications.Listener;
-import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
-import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.lifecycle.ComponentStatus;
 import org.infinispan.remoting.transport.Transport;
 import org.jboss.logging.Logger;
-import org.keycloak.common.util.Time;
 import org.keycloak.models.KeycloakSession;
 import org.keycloak.models.KeycloakSessionFactory;
 import org.keycloak.models.KeycloakSessionTask;
-import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
 import org.keycloak.models.utils.KeycloakModelUtils;
 
 /**
  * Startup initialization for reading persistent userSessions/clientSessions to be filled into infinispan/memory . In cluster,
  * the initialization is distributed among all cluster nodes, so the startup time is even faster
  *
+ * TODO: Move to clusterService. Implementation is already pretty generic and doesn't contain any "userSession" specific stuff. All logic is in the SessionLoader implementation
+ *
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
  */
 public class InfinispanUserSessionInitializer {
 
+    private static final String STATE_KEY_PREFIX = "distributed::";
+
     private static final Logger log = Logger.getLogger(InfinispanUserSessionInitializer.class);
 
     private final KeycloakSessionFactory sessionFactory;
-    private final Cache<String, SessionEntity> cache;
+    private final Cache<String, Serializable> workCache;
     private final SessionLoader sessionLoader;
     private final int maxErrors;
     private final int sessionsPerSegment;
     private final String stateKey;
 
 
-    public InfinispanUserSessionInitializer(KeycloakSessionFactory sessionFactory, Cache<String, SessionEntity> cache, SessionLoader sessionLoader, int maxErrors, int sessionsPerSegment, String stateKey) {
+    public InfinispanUserSessionInitializer(KeycloakSessionFactory sessionFactory, Cache<String, Serializable> workCache, SessionLoader sessionLoader, int maxErrors, int sessionsPerSegment, String stateKeySuffix) {
         this.sessionFactory = sessionFactory;
-        this.cache = cache;
+        this.workCache = workCache;
         this.sessionLoader = sessionLoader;
         this.maxErrors = maxErrors;
         this.sessionsPerSegment = sessionsPerSegment;
-        this.stateKey = stateKey;
+        this.stateKey = STATE_KEY_PREFIX + stateKeySuffix;
     }
 
     public void initCache() {
-        this.cache.getAdvancedCache().getComponentRegistry().registerComponent(sessionFactory, KeycloakSessionFactory.class);
+        this.workCache.getAdvancedCache().getComponentRegistry().registerComponent(sessionFactory, KeycloakSessionFactory.class);
     }
 
 
@@ -94,16 +92,14 @@ public class InfinispanUserSessionInitializer {
 
 
     private boolean isFinished() {
-        InitializerState state = (InitializerState) cache.get(stateKey);
+        InitializerState state = (InitializerState) workCache.get(stateKey);
         return state != null && state.isFinished();
     }
 
 
     private InitializerState getOrCreateInitializerState() {
-        TimeAwareInitializerState state = (TimeAwareInitializerState) cache.get(stateKey);
+        InitializerState state = (InitializerState) workCache.get(stateKey);
         if (state == null) {
-            int startTime = (int)(sessionFactory.getServerStartupTimestamp() / 1000);
-
             final int[] count = new int[1];
 
             // Rather use separate transactions for update and counting
@@ -111,7 +107,7 @@ public class InfinispanUserSessionInitializer {
             KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
                 @Override
                 public void run(KeycloakSession session) {
-                    sessionLoader.init(session, startTime);
+                    sessionLoader.init(session);
                 }
 
             });
@@ -124,9 +120,8 @@ public class InfinispanUserSessionInitializer {
 
             });
 
-            state = new TimeAwareInitializerState();
+            state = new InitializerState();
             state.init(count[0], sessionsPerSegment);
-            state.setClusterStartupTime(startTime);
             saveStateToCache(state);
         }
         return state;
@@ -143,7 +138,7 @@ public class InfinispanUserSessionInitializer {
             public void run() {
 
                 // Save this synchronously to ensure all nodes read correct state
-                InfinispanUserSessionInitializer.this.cache.getAdvancedCache().
+                InfinispanUserSessionInitializer.this.workCache.getAdvancedCache().
                         withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FORCE_SYNCHRONOUS)
                         .put(stateKey, state);
             }
@@ -153,7 +148,7 @@ public class InfinispanUserSessionInitializer {
 
 
     private boolean isCoordinator() {
-        Transport transport = cache.getCacheManager().getTransport();
+        Transport transport = workCache.getCacheManager().getTransport();
         return transport == null || transport.isCoordinator();
     }
 
@@ -166,9 +161,9 @@ public class InfinispanUserSessionInitializer {
         int processors = Runtime.getRuntime().availableProcessors();
 
         ExecutorService localExecutor = Executors.newCachedThreadPool();
-        Transport transport = cache.getCacheManager().getTransport();
+        Transport transport = workCache.getCacheManager().getTransport();
         boolean distributed = transport != null;
-        ExecutorService executorService = distributed ? new DefaultExecutorService(cache, localExecutor) : localExecutor;
+        ExecutorService executorService = distributed ? new DefaultExecutorService(workCache, localExecutor) : localExecutor;
 
         int errors = 0;
 
@@ -190,7 +185,7 @@ public class InfinispanUserSessionInitializer {
                     SessionInitializerWorker worker = new SessionInitializerWorker();
                     worker.setWorkerEnvironment(segment, sessionsPerSegment, sessionLoader);
                     if (!distributed) {
-                        worker.setEnvironment(cache, null);
+                        worker.setEnvironment(workCache, null);
                     }
 
                     Future<WorkerResult> future = executorService.submit(worker);
@@ -242,6 +237,12 @@ public class InfinispanUserSessionInitializer {
                 runnable.run();
                 return;
             } catch (RuntimeException e) {
+                ComponentStatus status = workCache.getStatus();
+                if (status.isStopping() || status.isTerminated()) {
+                    log.warn("Failed to put initializerState to the cache. Cache is already terminating");
+                    log.debug(e.getMessage(), e);
+                    return;
+                }
                 retry--;
                 if (retry == 0) {
                     throw e;
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflineUserSessionLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflineUserSessionLoader.java
index 8b651c0..352372c 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflineUserSessionLoader.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflineUserSessionLoader.java
@@ -20,6 +20,7 @@ package org.keycloak.models.sessions.infinispan.initializer;
 import java.util.List;
 
 import org.jboss.logging.Logger;
+import org.keycloak.cluster.ClusterProvider;
 import org.keycloak.models.ClientSessionModel;
 import org.keycloak.models.KeycloakSession;
 import org.keycloak.models.UserSessionModel;
@@ -33,9 +34,12 @@ public class OfflineUserSessionLoader implements SessionLoader {
     private static final Logger log = Logger.getLogger(OfflineUserSessionLoader.class);
 
     @Override
-    public void init(KeycloakSession session, int clusterStartupTime) {
+    public void init(KeycloakSession session) {
         UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
 
+        // TODO: check if update of timestamps in persister can be skipped entirely
+        int clusterStartupTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
+
         log.debugf("Clearing detached sessions from persistent storage and updating timestamps to %d", clusterStartupTime);
 
         persister.clearDetachedUserSessions();
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java
index eba3cea..b5b8557 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java
@@ -32,7 +32,7 @@ import org.keycloak.models.utils.KeycloakModelUtils;
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
  */
-public class SessionInitializerWorker implements DistributedCallable<String, SessionEntity, InfinispanUserSessionInitializer.WorkerResult>, Serializable {
+public class SessionInitializerWorker implements DistributedCallable<String, Serializable, InfinispanUserSessionInitializer.WorkerResult>, Serializable {
 
     private static final Logger log = Logger.getLogger(SessionInitializerWorker.class);
 
@@ -40,7 +40,7 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ses
     private int sessionsPerSegment;
     private SessionLoader sessionLoader;
 
-    private transient Cache<String, SessionEntity> cache;
+    private transient Cache<String, Serializable> workCache;
 
     public void setWorkerEnvironment(int segment, int sessionsPerSegment, SessionLoader sessionLoader) {
         this.segment = segment;
@@ -49,8 +49,8 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ses
     }
 
     @Override
-    public void setEnvironment(Cache<String, SessionEntity> cache, Set<String> inputKeys) {
-        this.cache = cache;
+    public void setEnvironment(Cache<String, Serializable> workCache, Set<String> inputKeys) {
+        this.workCache = workCache;
     }
 
     @Override
@@ -59,7 +59,7 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ses
             log.tracef("Running computation for segment: %d", segment);
         }
 
-        KeycloakSessionFactory sessionFactory = cache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
+        KeycloakSessionFactory sessionFactory = workCache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
         if (sessionFactory == null) {
             log.warnf("KeycloakSessionFactory not yet set in cache. Worker skipped");
             return InfinispanUserSessionInitializer.WorkerResult.create(segment, false);
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java
index b8aa0f8..3185a39 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionLoader.java
@@ -26,7 +26,7 @@ import org.keycloak.models.KeycloakSession;
  */
 public interface SessionLoader extends Serializable {
 
-    void init(KeycloakSession session, int clusterStartupTime);
+    void init(KeycloakSession session);
 
     int getSessionsCount(KeycloakSession session);
 
diff --git a/model/infinispan/src/main/resources/META-INF/services/org.keycloak.cluster.ClusterProviderFactory b/model/infinispan/src/main/resources/META-INF/services/org.keycloak.cluster.ClusterProviderFactory
new file mode 100644
index 0000000..c4c555f
--- /dev/null
+++ b/model/infinispan/src/main/resources/META-INF/services/org.keycloak.cluster.ClusterProviderFactory
@@ -0,0 +1,18 @@
+#
+# Copyright 2016 Red Hat, Inc. and/or its affiliates
+# and other contributors as indicated by the @author tags.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.keycloak.cluster.infinispan.InfinispanClusterProviderFactory
\ No newline at end of file
diff --git a/server-spi/src/main/java/org/keycloak/cluster/ClusterListener.java b/server-spi/src/main/java/org/keycloak/cluster/ClusterListener.java
new file mode 100644
index 0000000..b29848c
--- /dev/null
+++ b/server-spi/src/main/java/org/keycloak/cluster/ClusterListener.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster;
+
+import java.io.Serializable;
+
+/**
+ * Task to be executed on all cluster nodes once it's notified.
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public interface ClusterListener {
+
+    /**
+     * Registered task to be executed on all cluster nodes once it's notified from cache.
+     *
+     * @param event value of notification (Object added into the cache)
+     */
+    void run(ClusterEvent event);
+
+}
diff --git a/server-spi/src/main/java/org/keycloak/cluster/ClusterProvider.java b/server-spi/src/main/java/org/keycloak/cluster/ClusterProvider.java
new file mode 100644
index 0000000..70b64a1
--- /dev/null
+++ b/server-spi/src/main/java/org/keycloak/cluster/ClusterProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster;
+
+
+import java.util.concurrent.Callable;
+
+import org.keycloak.provider.Provider;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public interface ClusterProvider extends Provider {
+
+    /**
+     * Will use startup time of this server in non-cluster environment. Otherwise the value is same for all cluster nodes
+     */
+    int getClusterStartupTime();
+
+
+    /**
+     * Execute given task just if it's not already in progress (either on this or any other cluster node).
+     *
+     * @param taskKey
+     * @param taskTimeoutInSeconds timeout for given task. If there is existing task in progress for longer time, it's considered outdated so we will start our task.
+     * @param task
+     * @param <T>
+     * @return result with "executed" flag specifying if execution was executed or ignored.
+     */
+    <T> ExecutionResult<T> executeIfNotExecuted(String taskKey, int taskTimeoutInSeconds, Callable<T> task);
+
+
+    /**
+     * Register task (listener) under given key. When this key will be put to the cache on any cluster node, the task will be executed
+     *
+     * @param taskKey
+     * @param task
+     */
+    void registerListener(String taskKey, ClusterListener task);
+
+
+    /**
+     * Notify registered listeners on all cluster nodes
+     *
+     * @param taskKey
+     * @param event
+     */
+    void notify(String taskKey, ClusterEvent event);
+}
diff --git a/server-spi/src/main/java/org/keycloak/cluster/ClusterProviderFactory.java b/server-spi/src/main/java/org/keycloak/cluster/ClusterProviderFactory.java
new file mode 100644
index 0000000..41c00f4
--- /dev/null
+++ b/server-spi/src/main/java/org/keycloak/cluster/ClusterProviderFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster;
+
+import org.keycloak.provider.ProviderFactory;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public interface ClusterProviderFactory extends ProviderFactory<ClusterProvider> {
+}
diff --git a/server-spi/src/main/java/org/keycloak/cluster/ClusterSpi.java b/server-spi/src/main/java/org/keycloak/cluster/ClusterSpi.java
new file mode 100644
index 0000000..7eba4e3
--- /dev/null
+++ b/server-spi/src/main/java/org/keycloak/cluster/ClusterSpi.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster;
+
+import org.keycloak.provider.Provider;
+import org.keycloak.provider.ProviderFactory;
+import org.keycloak.provider.Spi;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ClusterSpi implements Spi {
+
+    @Override
+    public boolean isInternal() {
+        return true;
+    }
+
+    @Override
+    public String getName() {
+        return "cluster";
+    }
+
+    @Override
+    public Class<? extends Provider> getProviderClass() {
+        return ClusterProvider.class;
+    }
+
+    @Override
+    public Class<? extends ProviderFactory> getProviderFactoryClass() {
+        return ClusterProviderFactory.class;
+    }
+}
diff --git a/server-spi/src/main/java/org/keycloak/cluster/ExecutionResult.java b/server-spi/src/main/java/org/keycloak/cluster/ExecutionResult.java
new file mode 100644
index 0000000..b70e024
--- /dev/null
+++ b/server-spi/src/main/java/org/keycloak/cluster/ExecutionResult.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.cluster;
+
+/**
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ExecutionResult<T> {
+
+    private final boolean executed;
+    private final T result;
+
+    private ExecutionResult(boolean executed, T result) {
+        this.executed = executed;
+        this.result = result;
+    }
+
+    public static <T> ExecutionResult<T> executed(T result) {
+        return new ExecutionResult<>(true, result);
+    }
+
+    public static <T> ExecutionResult<T> notExecuted() {
+        return new ExecutionResult<>(false, null);
+    }
+
+    public boolean isExecuted() {
+        return executed;
+    }
+
+    public T getResult() {
+        return result;
+    }
+}
diff --git a/server-spi/src/main/java/org/keycloak/models/UserFederationSyncResult.java b/server-spi/src/main/java/org/keycloak/models/UserFederationSyncResult.java
index b8911f2..a9a32f9 100644
--- a/server-spi/src/main/java/org/keycloak/models/UserFederationSyncResult.java
+++ b/server-spi/src/main/java/org/keycloak/models/UserFederationSyncResult.java
@@ -112,4 +112,10 @@ public class UserFederationSyncResult {
     public static UserFederationSyncResult empty() {
         return new UserFederationSyncResult();
     }
+
+    public static UserFederationSyncResult ignored() {
+        UserFederationSyncResult result = new UserFederationSyncResult();
+        result.setIgnored(true);
+        return result;
+    }
 }
diff --git a/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java b/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java
index 9d7c413..57cd49c 100755
--- a/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java
+++ b/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java
@@ -82,9 +82,6 @@ public interface UserSessionProvider extends Provider {
     void removeClientInitialAccessModel(RealmModel realm, String id);
     List<ClientInitialAccessModel> listClientInitialAccess(RealmModel realm);
 
-    // Will use startup time of this server in non-cluster environment
-    int getClusterStartupTime();
-
     void close();
 
 }
diff --git a/server-spi/src/main/resources/META-INF/services/org.keycloak.provider.Spi b/server-spi/src/main/resources/META-INF/services/org.keycloak.provider.Spi
index 9e7403b..7bb58fc 100755
--- a/server-spi/src/main/resources/META-INF/services/org.keycloak.provider.Spi
+++ b/server-spi/src/main/resources/META-INF/services/org.keycloak.provider.Spi
@@ -49,5 +49,6 @@ org.keycloak.authentication.ClientAuthenticatorSpi
 org.keycloak.authentication.RequiredActionSpi
 org.keycloak.authentication.FormAuthenticatorSpi
 org.keycloak.authentication.FormActionSpi
+org.keycloak.cluster.ClusterSpi
 
 
diff --git a/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java b/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java
index e4174d9..e580af6 100755
--- a/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java
+++ b/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java
@@ -17,6 +17,7 @@
 
 package org.keycloak.protocol.oidc;
 
+import org.keycloak.cluster.ClusterProvider;
 import org.keycloak.common.ClientConnection;
 import org.keycloak.OAuth2Constants;
 import org.keycloak.OAuthErrorException;
@@ -193,7 +194,7 @@ public class TokenManager {
         int currentTime = Time.currentTime();
 
         if (realm.isRevokeRefreshToken()) {
-            int clusterStartupTime = session.sessions().getClusterStartupTime();
+            int clusterStartupTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
 
             if (refreshToken.getIssuedAt() < validation.clientSession.getTimestamp() && (clusterStartupTime != validation.clientSession.getTimestamp())) {
                 throw new OAuthErrorException(OAuthErrorException.INVALID_GRANT, "Stale token");
diff --git a/services/src/main/java/org/keycloak/services/managers/RealmManager.java b/services/src/main/java/org/keycloak/services/managers/RealmManager.java
index 55695a0..8b6a732 100755
--- a/services/src/main/java/org/keycloak/services/managers/RealmManager.java
+++ b/services/src/main/java/org/keycloak/services/managers/RealmManager.java
@@ -231,7 +231,7 @@ public class RealmManager implements RealmImporter {
             // Remove all periodic syncs for configured federation providers
             UsersSyncManager usersSyncManager = new UsersSyncManager();
             for (final UserFederationProviderModel fedProvider : federationProviders) {
-                usersSyncManager.removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), fedProvider);
+                usersSyncManager.notifyToRefreshPeriodicSync(session, realm, fedProvider, true);
             }
         }
         return removed;
@@ -434,7 +434,7 @@ public class RealmManager implements RealmImporter {
         List<UserFederationProviderModel> federationProviders = realm.getUserFederationProviders();
         UsersSyncManager usersSyncManager = new UsersSyncManager();
         for (final UserFederationProviderModel fedProvider : federationProviders) {
-            usersSyncManager.refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), fedProvider, realm.getId());
+            usersSyncManager.notifyToRefreshPeriodicSync(session, realm, fedProvider, false);
         }
         return realm;
     }
diff --git a/services/src/main/java/org/keycloak/services/managers/UsersSyncManager.java b/services/src/main/java/org/keycloak/services/managers/UsersSyncManager.java
index c44bca5..2177d04 100755
--- a/services/src/main/java/org/keycloak/services/managers/UsersSyncManager.java
+++ b/services/src/main/java/org/keycloak/services/managers/UsersSyncManager.java
@@ -16,6 +16,10 @@
  */
 package org.keycloak.services.managers;
 
+import org.keycloak.cluster.ClusterEvent;
+import org.keycloak.cluster.ClusterListener;
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.cluster.ExecutionResult;
 import org.keycloak.common.util.Time;
 import org.keycloak.models.KeycloakSession;
 import org.keycloak.models.KeycloakSessionFactory;
@@ -30,13 +34,17 @@ import org.keycloak.services.ServicesLogger;
 import org.keycloak.timer.TimerProvider;
 
 
+import java.io.Serializable;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 /**
  * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
  */
 public class UsersSyncManager {
 
+    private static final String FEDERATION_TASK_KEY = "federation";
+
     protected static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
 
     /**
@@ -57,26 +65,106 @@ public class UsersSyncManager {
                         refreshPeriodicSyncForProvider(sessionFactory, timer, fedProvider, realm.getId());
                     }
                 }
+
+                ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
+                clusterProvider.registerListener(FEDERATION_TASK_KEY, new UserFederationClusterListener(sessionFactory));
             }
         });
     }
 
-    public UserFederationSyncResult syncAllUsers(final KeycloakSessionFactory sessionFactory, String realmId, final UserFederationProviderModel fedProvider) {
-        final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
-        updateLastSyncInterval(sessionFactory, fedProvider, realmId);
-        return fedProviderFactory.syncAllUsers(sessionFactory, realmId, fedProvider);
+
+    private class Holder {
+        ExecutionResult<UserFederationSyncResult> result;
+    }
+
+    public UserFederationSyncResult syncAllUsers(final KeycloakSessionFactory sessionFactory, final String realmId, final UserFederationProviderModel fedProvider) {
+        final Holder holder = new Holder();
+
+        // Ensure not executed concurrently on this or any other cluster node
+        KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
+
+            @Override
+            public void run(KeycloakSession session) {
+                ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
+                // shared key for "full" and "changed" . Improve if needed
+                String taskKey = fedProvider.getId() + "::sync";
+
+                // 30 seconds minimal timeout for now
+                int timeout = Math.max(30, fedProvider.getFullSyncPeriod());
+                holder.result = clusterProvider.executeIfNotExecuted(taskKey, timeout, new Callable<UserFederationSyncResult>() {
+
+                    @Override
+                    public UserFederationSyncResult call() throws Exception {
+                        final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
+                        updateLastSyncInterval(sessionFactory, fedProvider, realmId);
+                        return fedProviderFactory.syncAllUsers(sessionFactory, realmId, fedProvider);
+                    }
+
+                });
+            }
+
+        });
+
+        if (holder.result == null || !holder.result.isExecuted()) {
+            logger.infof("syncAllUsers for federation provider %s was ignored as it's already in progress", fedProvider.getDisplayName());
+            return UserFederationSyncResult.ignored();
+        } else {
+            return holder.result.getResult();
+        }
+    }
+
+    public UserFederationSyncResult syncChangedUsers(final KeycloakSessionFactory sessionFactory, final String realmId, final UserFederationProviderModel fedProvider) {
+        final Holder holder = new Holder();
+
+        // Ensure not executed concurrently on this or any other cluster node
+        KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
+
+            @Override
+            public void run(KeycloakSession session) {
+                ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
+                // shared key for "full" and "changed" . Improve if needed
+                String taskKey = fedProvider.getId() + "::sync";
+
+                // 30 seconds minimal timeout for now
+                int timeout = Math.max(30, fedProvider.getChangedSyncPeriod());
+                holder.result = clusterProvider.executeIfNotExecuted(taskKey, timeout, new Callable<UserFederationSyncResult>() {
+
+                    @Override
+                    public UserFederationSyncResult call() throws Exception {
+                        final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
+
+                        // See when we did last sync.
+                        int oldLastSync = fedProvider.getLastSync();
+                        updateLastSyncInterval(sessionFactory, fedProvider, realmId);
+                        return fedProviderFactory.syncChangedUsers(sessionFactory, realmId, fedProvider, Time.toDate(oldLastSync));
+                    }
+
+                });
+            }
+
+        });
+
+        if (holder.result == null || !holder.result.isExecuted()) {
+            logger.infof("syncChangedUsers for federation provider %s was ignored as it's already in progress", fedProvider.getDisplayName());
+            return UserFederationSyncResult.ignored();
+        } else {
+            return holder.result.getResult();
+        }
     }
 
-    public UserFederationSyncResult syncChangedUsers(final KeycloakSessionFactory sessionFactory, String realmId, final UserFederationProviderModel fedProvider) {
-        final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
 
-        // See when we did last sync.
-        int oldLastSync = fedProvider.getLastSync();
-        updateLastSyncInterval(sessionFactory, fedProvider, realmId);
-        return fedProviderFactory.syncChangedUsers(sessionFactory, realmId, fedProvider, Time.toDate(oldLastSync));
+    // Ensure all cluster nodes are notified
+    public void notifyToRefreshPeriodicSync(KeycloakSession session, RealmModel realm, UserFederationProviderModel federationProvider, boolean removed) {
+        FederationProviderClusterEvent event = FederationProviderClusterEvent.createEvent(removed, realm.getId(), federationProvider);
+        session.getProvider(ClusterProvider.class).notify(FEDERATION_TASK_KEY, event);
     }
 
-    public void refreshPeriodicSyncForProvider(final KeycloakSessionFactory sessionFactory, TimerProvider timer, final UserFederationProviderModel fedProvider, final String realmId) {
+
+    // Executed once it receives notification that some UserFederationProvider was created or updated
+    protected void refreshPeriodicSyncForProvider(final KeycloakSessionFactory sessionFactory, TimerProvider timer, final UserFederationProviderModel fedProvider, final String realmId) {
+        logger.infof("Going to refresh periodic sync for provider '%s' . Full sync period: %d , changed users sync period: %d",
+                fedProvider.getDisplayName(), fedProvider.getFullSyncPeriod(), fedProvider.getChangedSyncPeriod());
+
         if (fedProvider.getFullSyncPeriod() > 0) {
             // We want periodic full sync for this provider
             timer.schedule(new Runnable() {
@@ -84,7 +172,12 @@ public class UsersSyncManager {
                 @Override
                 public void run() {
                     try {
-                        syncAllUsers(sessionFactory, realmId, fedProvider);
+                        boolean shouldPerformSync = shouldPerformNewPeriodicSync(fedProvider.getLastSync(), fedProvider.getChangedSyncPeriod());
+                        if (shouldPerformSync) {
+                            syncAllUsers(sessionFactory, realmId, fedProvider);
+                        } else {
+                            logger.infof("Ignored periodic full sync with federation provider %s due small time since last sync", fedProvider.getDisplayName());
+                        }
                     } catch (Throwable t) {
                         logger.errorDuringFullUserSync(t);
                     }
@@ -102,7 +195,12 @@ public class UsersSyncManager {
                 @Override
                 public void run() {
                     try {
-                        syncChangedUsers(sessionFactory, realmId, fedProvider);
+                        boolean shouldPerformSync = shouldPerformNewPeriodicSync(fedProvider.getLastSync(), fedProvider.getChangedSyncPeriod());
+                        if (shouldPerformSync) {
+                            syncChangedUsers(sessionFactory, realmId, fedProvider);
+                        } else {
+                            logger.infof("Ignored periodic changed-users sync with federation provider %s due small time since last sync", fedProvider.getDisplayName());
+                        }
                     } catch (Throwable t) {
                         logger.errorDuringChangedUserSync(t);
                     }
@@ -115,7 +213,21 @@ public class UsersSyncManager {
         }
     }
 
-    public void removePeriodicSyncForProvider(TimerProvider timer, final UserFederationProviderModel fedProvider) {
+    // Skip syncing if there is short time since last sync time.
+    private boolean shouldPerformNewPeriodicSync(int lastSyncTime, int period) {
+        if (lastSyncTime <= 0) {
+            return true;
+        }
+
+        int currentTime = Time.currentTime();
+        int timeSinceLastSync = currentTime - lastSyncTime;
+
+        return (timeSinceLastSync * 2 > period);
+    }
+
+    // Executed once it receives notification that some UserFederationProvider was removed
+    protected void removePeriodicSyncForProvider(TimerProvider timer, UserFederationProviderModel fedProvider) {
+        logger.infof("Removing periodic sync for provider %s", fedProvider.getDisplayName());
         timer.cancelTask(fedProvider.getId() + "-FULL");
         timer.cancelTask(fedProvider.getId() + "-CHANGED");
     }
@@ -144,4 +256,73 @@ public class UsersSyncManager {
         });
     }
 
+
+    private class UserFederationClusterListener implements ClusterListener {
+
+        private final KeycloakSessionFactory sessionFactory;
+
+        public UserFederationClusterListener(KeycloakSessionFactory sessionFactory) {
+            this.sessionFactory = sessionFactory;
+        }
+
+        @Override
+        public void run(ClusterEvent event) {
+            final FederationProviderClusterEvent fedEvent = (FederationProviderClusterEvent) event;
+            KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
+
+                @Override
+                public void run(KeycloakSession session) {
+                    TimerProvider timer = session.getProvider(TimerProvider.class);
+                    if (fedEvent.isRemoved()) {
+                        removePeriodicSyncForProvider(timer, fedEvent.getFederationProvider());
+                    } else {
+                        refreshPeriodicSyncForProvider(sessionFactory, timer, fedEvent.getFederationProvider(), fedEvent.getRealmId());
+                    }
+                }
+
+            });
+        }
+    }
+
+
+    // Send to cluster during each update or remove of federationProvider, so all nodes can update sync periods
+    public static class FederationProviderClusterEvent implements ClusterEvent {
+
+        private boolean removed;
+        private String realmId;
+        private UserFederationProviderModel federationProvider;
+
+        public boolean isRemoved() {
+            return removed;
+        }
+
+        public void setRemoved(boolean removed) {
+            this.removed = removed;
+        }
+
+        public String getRealmId() {
+            return realmId;
+        }
+
+        public void setRealmId(String realmId) {
+            this.realmId = realmId;
+        }
+
+        public UserFederationProviderModel getFederationProvider() {
+            return federationProvider;
+        }
+
+        public void setFederationProvider(UserFederationProviderModel federationProvider) {
+            this.federationProvider = federationProvider;
+        }
+
+        public static FederationProviderClusterEvent createEvent(boolean removed, String realmId, UserFederationProviderModel fedProvider) {
+            FederationProviderClusterEvent notification = new FederationProviderClusterEvent();
+            notification.setRemoved(removed);
+            notification.setRealmId(realmId);
+            notification.setFederationProvider(fedProvider);
+            return notification;
+        }
+    }
+
 }
diff --git a/services/src/main/java/org/keycloak/services/resources/admin/RealmAdminResource.java b/services/src/main/java/org/keycloak/services/resources/admin/RealmAdminResource.java
index 15b0e81..f0cbe8b 100644
--- a/services/src/main/java/org/keycloak/services/resources/admin/RealmAdminResource.java
+++ b/services/src/main/java/org/keycloak/services/resources/admin/RealmAdminResource.java
@@ -242,7 +242,7 @@ public class RealmAdminResource {
             List<UserFederationProviderModel> federationProviders = realm.getUserFederationProviders();
             UsersSyncManager usersSyncManager = new UsersSyncManager();
             for (final UserFederationProviderModel fedProvider : federationProviders) {
-                usersSyncManager.refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), fedProvider, realm.getId());
+                usersSyncManager.notifyToRefreshPeriodicSync(session, realm, fedProvider, false);
             }
 
             adminEvent.operation(OperationType.UPDATE).representation(rep).success();
diff --git a/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProviderResource.java b/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProviderResource.java
index 1000ead..0eb1748 100755
--- a/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProviderResource.java
+++ b/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProviderResource.java
@@ -106,7 +106,7 @@ public class UserFederationProviderResource {
         UserFederationProviderModel model = new UserFederationProviderModel(rep.getId(), rep.getProviderName(), rep.getConfig(), rep.getPriority(), displayName,
                 rep.getFullSyncPeriod(), rep.getChangedSyncPeriod(), rep.getLastSync());
         realm.updateUserFederationProvider(model);
-        new UsersSyncManager().refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), model, realm.getId());
+        new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, model, false);
         boolean kerberosCredsAdded = UserFederationProvidersResource.checkKerberosCredential(session, realm, model);
         if (kerberosCredsAdded) {
             logger.addedKerberosToRealmCredentials();
@@ -138,7 +138,7 @@ public class UserFederationProviderResource {
         auth.requireManage();
 
         realm.removeUserFederationProvider(this.federationProviderModel);
-        new UsersSyncManager().removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), this.federationProviderModel);
+        new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, this.federationProviderModel, true);
 
         adminEvent.operation(OperationType.DELETE).resourcePath(uriInfo).success();
 
diff --git a/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProvidersResource.java b/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProvidersResource.java
index 30a93e4..995cda4 100755
--- a/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProvidersResource.java
+++ b/services/src/main/java/org/keycloak/services/resources/admin/UserFederationProvidersResource.java
@@ -178,7 +178,7 @@ public class UserFederationProvidersResource {
         }
         UserFederationProviderModel model = realm.addUserFederationProvider(rep.getProviderName(), rep.getConfig(), rep.getPriority(), displayName,
                 rep.getFullSyncPeriod(), rep.getChangedSyncPeriod(), rep.getLastSync());
-        new UsersSyncManager().refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), model, realm.getId());
+        new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, model, false);
         boolean kerberosCredsAdded = checkKerberosCredential(session, realm, model);
         if (kerberosCredsAdded) {
             logger.addedKerberosToRealmCredentials();
diff --git a/services/src/main/java/org/keycloak/services/resources/KeycloakApplication.java b/services/src/main/java/org/keycloak/services/resources/KeycloakApplication.java
index 4b49d79..0ef806b 100644
--- a/services/src/main/java/org/keycloak/services/resources/KeycloakApplication.java
+++ b/services/src/main/java/org/keycloak/services/resources/KeycloakApplication.java
@@ -38,6 +38,7 @@ import org.keycloak.services.managers.UsersSyncManager;
 import org.keycloak.services.resources.admin.AdminRoot;
 import org.keycloak.services.scheduled.ClearExpiredEvents;
 import org.keycloak.services.scheduled.ClearExpiredUserSessions;
+import org.keycloak.services.scheduled.ClusterAwareScheduledTaskRunner;
 import org.keycloak.services.scheduled.ScheduledTaskRunner;
 import org.keycloak.services.util.JsonConfigProvider;
 import org.keycloak.services.util.ObjectMapperResolver;
@@ -217,8 +218,8 @@ public class KeycloakApplication extends Application {
         KeycloakSession session = sessionFactory.create();
         try {
             TimerProvider timer = session.getProvider(TimerProvider.class);
-            timer.schedule(new ScheduledTaskRunner(sessionFactory, new ClearExpiredEvents()), interval, "ClearExpiredEvents");
-            timer.schedule(new ScheduledTaskRunner(sessionFactory, new ClearExpiredUserSessions()), interval, "ClearExpiredUserSessions");
+            timer.schedule(new ClusterAwareScheduledTaskRunner(sessionFactory, new ClearExpiredEvents(), interval), interval, "ClearExpiredEvents");
+            timer.schedule(new ClusterAwareScheduledTaskRunner(sessionFactory, new ClearExpiredUserSessions(), interval), interval, "ClearExpiredUserSessions");
             new UsersSyncManager().bootstrapPeriodic(sessionFactory, timer);
         } finally {
             session.close();
diff --git a/services/src/main/java/org/keycloak/services/scheduled/ClusterAwareScheduledTaskRunner.java b/services/src/main/java/org/keycloak/services/scheduled/ClusterAwareScheduledTaskRunner.java
new file mode 100644
index 0000000..7f60891
--- /dev/null
+++ b/services/src/main/java/org/keycloak/services/scheduled/ClusterAwareScheduledTaskRunner.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2016 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.services.scheduled;
+
+import java.util.concurrent.Callable;
+
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.cluster.ExecutionResult;
+import org.keycloak.models.KeycloakSession;
+import org.keycloak.models.KeycloakSessionFactory;
+
+/**
+ * Ensures that there are not concurrent executions of same task (either on this host or any other cluster host)
+ *
+ * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
+ */
+public class ClusterAwareScheduledTaskRunner extends ScheduledTaskRunner {
+
+    private final int intervalSecs;
+
+    public ClusterAwareScheduledTaskRunner(KeycloakSessionFactory sessionFactory, ScheduledTask task, long intervalMillis) {
+        super(sessionFactory, task);
+        this.intervalSecs = (int) (intervalMillis / 1000);
+    }
+
+    @Override
+    protected void runTask(final KeycloakSession session) {
+        session.getTransaction().begin();
+
+        ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
+        String taskKey = task.getClass().getSimpleName();
+
+        ExecutionResult<Void> result = clusterProvider.executeIfNotExecuted(taskKey, intervalSecs, new Callable<Void>() {
+
+            @Override
+            public Void call() throws Exception {
+                task.run(session);
+                return null;
+            }
+
+        });
+
+        session.getTransaction().commit();
+
+        if (result.isExecuted()) {
+            logger.debugf("Executed scheduled task %s", taskKey);
+        } else {
+            logger.debugf("Skipped execution of task %s as other cluster node is executing it", taskKey);
+        }
+    }
+
+
+}
diff --git a/services/src/main/java/org/keycloak/services/scheduled/ScheduledTaskRunner.java b/services/src/main/java/org/keycloak/services/scheduled/ScheduledTaskRunner.java
index 51de35f..33dc91a 100644
--- a/services/src/main/java/org/keycloak/services/scheduled/ScheduledTaskRunner.java
+++ b/services/src/main/java/org/keycloak/services/scheduled/ScheduledTaskRunner.java
@@ -17,6 +17,10 @@
 
 package org.keycloak.services.scheduled;
 
+import java.util.concurrent.Callable;
+
+import org.keycloak.cluster.ClusterProvider;
+import org.keycloak.cluster.ExecutionResult;
 import org.keycloak.models.KeycloakSession;
 import org.keycloak.models.KeycloakSessionFactory;
 import org.keycloak.services.ServicesLogger;
@@ -26,10 +30,10 @@ import org.keycloak.services.ServicesLogger;
  */
 public class ScheduledTaskRunner implements Runnable {
 
-    private static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
+    protected static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
 
-    private final KeycloakSessionFactory sessionFactory;
-    private final ScheduledTask task;
+    protected final KeycloakSessionFactory sessionFactory;
+    protected final ScheduledTask task;
 
     public ScheduledTaskRunner(KeycloakSessionFactory sessionFactory, ScheduledTask task) {
         this.sessionFactory = sessionFactory;
@@ -40,11 +44,7 @@ public class ScheduledTaskRunner implements Runnable {
     public void run() {
         KeycloakSession session = sessionFactory.create();
         try {
-            session.getTransaction().begin();
-            task.run(session);
-            session.getTransaction().commit();
-
-            logger.debug("Executed scheduled task " + task.getClass().getSimpleName());
+            runTask(session);
         } catch (Throwable t) {
             logger.failedToRunScheduledTask(t, task.getClass().getSimpleName());
 
@@ -58,4 +58,12 @@ public class ScheduledTaskRunner implements Runnable {
         }
     }
 
+    protected void runTask(KeycloakSession session) {
+        session.getTransaction().begin();
+        task.run(session);
+        session.getTransaction().commit();
+
+        logger.debug("Executed scheduled task " + task.getClass().getSimpleName());
+    }
+
 }
diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncDummyUserFederationProviderFactory.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncDummyUserFederationProviderFactory.java
index 517d45e..5d327f4 100644
--- a/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncDummyUserFederationProviderFactory.java
+++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncDummyUserFederationProviderFactory.java
@@ -20,6 +20,8 @@ package org.keycloak.testsuite.federation.sync;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.jboss.logging.Logger;
 import org.keycloak.models.KeycloakSession;
@@ -37,6 +39,17 @@ import org.keycloak.testsuite.DummyUserFederationProviderFactory;
  */
 public class SyncDummyUserFederationProviderFactory extends DummyUserFederationProviderFactory {
 
+    // Used during SyncFederationTest
+    static volatile CountDownLatch latch1 = new CountDownLatch(1);
+    static volatile CountDownLatch latch2 = new CountDownLatch(1);
+
+    static void restartLatches() {
+        latch1 = new CountDownLatch(1);
+        latch2 = new CountDownLatch(1);
+    }
+
+
+
     private static final Logger logger = Logger.getLogger(SyncDummyUserFederationProviderFactory.class);
 
     public static final String SYNC_PROVIDER_ID = "sync-dummy";
@@ -68,7 +81,7 @@ public class SyncDummyUserFederationProviderFactory extends DummyUserFederationP
                 RealmModel realm = session.realms().getRealm(realmId);
 
                 // KEYCLOAK-2412 : Just remove and add some users for testing purposes
-                for (int i = 0; i<10; i++) {
+                for (int i = 0; i < 10; i++) {
                     String username = "dummyuser-" + i;
                     UserModel user = session.userStorage().getUserByUsername(username, realm);
 
@@ -83,7 +96,7 @@ public class SyncDummyUserFederationProviderFactory extends DummyUserFederationP
 
 
                 try {
-                    Thread.sleep(waitTime * 1000);
+                    latch1.await(waitTime * 1000, TimeUnit.MILLISECONDS);
                 } catch (InterruptedException ie) {
                     Thread.currentThread().interrupt();
                     throw new RuntimeException("Interrupted!", ie);
@@ -94,6 +107,9 @@ public class SyncDummyUserFederationProviderFactory extends DummyUserFederationP
 
         });
 
+        // countDown, so the SyncFederationTest can continue
+        latch2.countDown();
+
         return new UserFederationSyncResult();
     }
 
diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncFederationTest.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncFederationTest.java
index 8259fe8..f3e2ec0 100644
--- a/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncFederationTest.java
+++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/federation/sync/SyncFederationTest.java
@@ -19,6 +19,7 @@ package org.keycloak.testsuite.federation.sync;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -84,7 +85,8 @@ public class SyncFederationTest {
             sleep(1800);
 
             // Cancel timer
-            usersSyncManager.removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), dummyModel);
+            RealmModel appRealm = session.realms().getRealmByName("test");
+            usersSyncManager.notifyToRefreshPeriodicSync(session, appRealm, dummyModel, true);
 
             // Assert that DummyUserFederationProviderFactory.syncChangedUsers was invoked
             int newChanged = dummyFedFactory.getChangedSyncCounter();
@@ -111,7 +113,9 @@ public class SyncFederationTest {
     }
 
     @Test
-    public void test02ConcurrentSync() {
+    public void test02ConcurrentSync() throws Exception {
+        SyncDummyUserFederationProviderFactory.restartLatches();
+
         // Enable timer for SyncDummyUserFederationProvider
         keycloakRule.update(new KeycloakRule.KeycloakSetup() {
 
@@ -139,11 +143,17 @@ public class SyncFederationTest {
             Assert.assertTrue(syncResult.isIgnored());
 
             // Cancel timer
-            usersSyncManager.removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), dummyModel);
+            usersSyncManager.notifyToRefreshPeriodicSync(session, realm, dummyModel, true);
+
+            // Signal to factory to finish waiting
+            SyncDummyUserFederationProviderFactory.latch1.countDown();
+
         } finally {
             keycloakRule.stopSession(session, true);
         }
 
+        SyncDummyUserFederationProviderFactory.latch2.await(20000, TimeUnit.MILLISECONDS);
+
         // remove provider
         keycloakRule.update(new KeycloakRule.KeycloakSetup() {
 
diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java
index 2d763b6..d224b3e 100644
--- a/testsuite/integration/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java
+++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java
@@ -26,6 +26,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.keycloak.cluster.ClusterProvider;
 import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
 import org.keycloak.models.ClientModel;
 import org.keycloak.models.ClientSessionModel;
@@ -83,7 +84,7 @@ public class UserSessionInitializerTest {
 
         // Create and persist offline sessions
         int started = Time.currentTime();
-        int serverStartTime = session.sessions().getClusterStartupTime();
+        int serverStartTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
 
         for (UserSessionModel origSession : origSessions) {
             UserSessionModel userSession = session.sessions().getUserSession(realm, origSession.getId());
diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/util/cli/SyncDummyFederationProviderCommand.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/util/cli/SyncDummyFederationProviderCommand.java
index 7a31d6d..452fd59 100644
--- a/testsuite/integration/src/test/java/org/keycloak/testsuite/util/cli/SyncDummyFederationProviderCommand.java
+++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/util/cli/SyncDummyFederationProviderCommand.java
@@ -47,10 +47,11 @@ public class SyncDummyFederationProviderCommand extends AbstractCommand {
         } else {
             Map<String, String> cfg = fedProviderModel.getConfig();
             updateConfig(cfg, waitTime);
+            fedProviderModel.setChangedSyncPeriod(changedSyncPeriod);
             realm.updateUserFederationProvider(fedProviderModel);
         }
 
-        new UsersSyncManager().refreshPeriodicSyncForProvider(sessionFactory, session.getProvider(TimerProvider.class), fedProviderModel, "master");
+        new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, fedProviderModel, false);
 
         log.infof("User federation provider created and sync was started", waitTime);
     }