Concurrent update embedded caches and database

Closes #42374

Signed-off-by: Pedro Ruivo <1492066+pruivo@users.noreply.github.com>
Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
Co-authored-by: Pedro Ruivo <1492066+pruivo@users.noreply.github.com>
Co-authored-by: Alexander Schwartz <aschwart@redhat.com>
This commit is contained in:
Pedro Ruivo 2025-09-15 19:38:03 +01:00 committed by GitHub
parent cdea7d79a7
commit 714d71b4f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 710 additions and 694 deletions

View File

@ -11,14 +11,13 @@ import java.util.Objects;
public class OpenTelemetrySpan<T> implements InfinispanSpan<T> {
private final Span span;
private final Scope scope;
public OpenTelemetrySpan(Span span) {
this.span = Objects.requireNonNull(span);
// TODO: This is actually wrong if you are doing asynchronous calls, but it allows the JGroups calls to be nested
// This should be fixed in ISPN 16+ so that it is no longer needed
// https://github.com/infinispan/infinispan/issues/15287
this.scope = span.makeCurrent();
//this.scope = span.makeCurrent();
}
@Override
@ -30,7 +29,6 @@ public class OpenTelemetrySpan<T> implements InfinispanSpan<T> {
@Override
public void complete() {
scope.close();
span.end();
}

View File

@ -55,28 +55,23 @@ public class InfinispanAuthenticationSessionProvider implements AuthenticationSe
private final KeycloakSession session;
private final InfinispanKeyGenerator keyGenerator;
private final int authSessionsLimit;
protected final Cache<String, SessionEntityWrapper<RootAuthenticationSessionEntity>> cache;
protected final InfinispanChangelogBasedTransaction<String, RootAuthenticationSessionEntity> sessionTx;
protected final SessionEventsSenderTransaction clusterEventsSenderTx;
public InfinispanAuthenticationSessionProvider(KeycloakSession session, InfinispanKeyGenerator keyGenerator,
Cache<String, SessionEntityWrapper<RootAuthenticationSessionEntity>> cache, int authSessionsLimit, SerializeExecutionsByKey<String> serializer) {
InfinispanChangelogBasedTransaction<String, RootAuthenticationSessionEntity> sessionTx, int authSessionsLimit) {
this.session = session;
this.keyGenerator = keyGenerator;
this.authSessionsLimit = authSessionsLimit;
this.cache = cache;
this.sessionTx = new InfinispanChangelogBasedTransaction<>(session, cache, SessionTimeouts::getAuthSessionLifespanMS, SessionTimeouts::getAuthSessionMaxIdleMS, serializer);
this.sessionTx = sessionTx;
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);
session.getTransactionManager().enlistAfterCompletion(sessionTx);
session.getTransactionManager().enlistAfterCompletion(clusterEventsSenderTx);
}
@Override
public RootAuthenticationSessionModel createRootAuthenticationSession(RealmModel realm) {
String id = keyGenerator.generateKeyString(session, cache);
String id = keyGenerator.generateKeyString(session, sessionTx.getCache());
return createRootAuthenticationSession(realm, id);
}
@ -122,6 +117,7 @@ public class InfinispanAuthenticationSessionProvider implements AuthenticationSe
}
protected void onRealmRemovedEvent(String realmId) {
Cache<String, SessionEntityWrapper<RootAuthenticationSessionEntity>> cache = sessionTx.getCache();
Iterator<Map.Entry<String, SessionEntityWrapper<RootAuthenticationSessionEntity>>> itr = CacheDecorators.localCache(cache)
.entrySet()
.stream()
@ -180,7 +176,7 @@ public class InfinispanAuthenticationSessionProvider implements AuthenticationSe
}
public Cache<String, SessionEntityWrapper<RootAuthenticationSessionEntity>> getCache() {
return cache;
return sessionTx.getCache();
}
public InfinispanChangelogBasedTransaction<String, RootAuthenticationSessionEntity> getRootAuthSessionTransaction() {

View File

@ -20,9 +20,9 @@ package org.keycloak.models.sessions.infinispan;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.cluster.ClusterEvent;
@ -32,32 +32,38 @@ import org.keycloak.infinispan.util.InfinispanUtils;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.cache.infinispan.events.AuthenticationSessionAuthNoteUpdateEvent;
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
import org.keycloak.models.sessions.infinispan.changes.CacheHolder;
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangesUtils;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.AuthenticationSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.RootAuthenticationSessionEntity;
import org.keycloak.models.sessions.infinispan.events.AbstractAuthSessionClusterListener;
import org.keycloak.models.sessions.infinispan.events.RealmRemovedSessionEvent;
import org.keycloak.models.sessions.infinispan.transaction.InfinispanTransactionProvider;
import org.keycloak.models.sessions.infinispan.util.InfinispanKeyGenerator;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.models.utils.PostMigrationEvent;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.Provider;
import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder;
import org.keycloak.provider.ProviderEvent;
import org.keycloak.provider.ProviderEventListener;
import org.keycloak.sessions.AuthenticationSessionProviderFactory;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class InfinispanAuthenticationSessionProviderFactory implements AuthenticationSessionProviderFactory<InfinispanAuthenticationSessionProvider>, EnvironmentDependentProviderFactory {
public class InfinispanAuthenticationSessionProviderFactory implements AuthenticationSessionProviderFactory<InfinispanAuthenticationSessionProvider>, EnvironmentDependentProviderFactory, ProviderEventListener {
private static final Logger log = Logger.getLogger(InfinispanAuthenticationSessionProviderFactory.class);
private InfinispanKeyGenerator keyGenerator;
private volatile Cache<String, SessionEntityWrapper<RootAuthenticationSessionEntity>> authSessionsCache;
private final InfinispanKeyGenerator keyGenerator = new InfinispanKeyGenerator();
private CacheHolder<String, RootAuthenticationSessionEntity> cacheHolder;
private int authSessionsLimit;
@ -69,8 +75,6 @@ public class InfinispanAuthenticationSessionProviderFactory implements Authentic
public static final String REALM_REMOVED_AUTHSESSION_EVENT = "REALM_REMOVED_EVENT_AUTHSESSIONS";
SerializeExecutionsByKey<String> serializer = new SerializeExecutionsByKey<>();
@Override
public void init(Config.Scope config) {
authSessionsLimit = getAuthSessionsLimit(config);
@ -84,18 +88,10 @@ public class InfinispanAuthenticationSessionProviderFactory implements Authentic
@Override
public void postInit(KeycloakSessionFactory factory) {
keyGenerator = new InfinispanKeyGenerator();
factory.register(new ProviderEventListener() {
@Override
public void onEvent(ProviderEvent event) {
if (event instanceof PostMigrationEvent) {
KeycloakModelUtils.runJobInTransaction(factory, (KeycloakSession session) -> {
registerClusterListeners(session);
});
}
}
});
factory.register(this);
try (var session = factory.create()) {
cacheHolder = InfinispanChangesUtils.createWithCache(session, AUTHENTICATION_SESSIONS_CACHE_NAME, SessionTimeouts::getAuthSessionLifespanMS, SessionTimeouts::getAuthSessionMaxIdleMS);
}
}
@Override
@ -110,6 +106,13 @@ public class InfinispanAuthenticationSessionProviderFactory implements Authentic
.build();
}
@Override
public void onEvent(ProviderEvent event) {
if (event instanceof PostMigrationEvent pme) {
KeycloakModelUtils.runJobInTransaction(pme.getFactory(), this::registerClusterListeners);
}
}
protected void registerClusterListeners(KeycloakSession session) {
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
ClusterProvider cluster = session.getProvider(ClusterProvider.class);
@ -129,10 +132,12 @@ public class InfinispanAuthenticationSessionProviderFactory implements Authentic
@Override
public InfinispanAuthenticationSessionProvider create(KeycloakSession session) {
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
Cache<String, SessionEntityWrapper<RootAuthenticationSessionEntity>> cache = connections.getCache(InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME);
this.authSessionsCache = cache;
return new InfinispanAuthenticationSessionProvider(session, keyGenerator, cache, authSessionsLimit, serializer);
return new InfinispanAuthenticationSessionProvider(session, keyGenerator, createTransaction(session), authSessionsLimit);
}
@Override
public Set<Class<? extends Provider>> dependsOn() {
return Set.of(InfinispanConnectionProvider.class, InfinispanTransactionProvider.class);
}
private void updateAuthNotes(ClusterEvent clEvent) {
@ -140,14 +145,14 @@ public class InfinispanAuthenticationSessionProviderFactory implements Authentic
return;
}
var distribution = authSessionsCache.getAdvancedCache().getDistributionManager();
var distribution = cacheHolder.cache().getAdvancedCache().getDistributionManager();
if (distribution != null && !distribution.getCacheTopology().getDistribution(event.getAuthSessionId()).isPrimary()) {
// Distribution is null for non-clustered caches (local-cache, used by start-dev mode).
// If not the primary owner of the key, skip event handling.
return;
}
SessionEntityWrapper<RootAuthenticationSessionEntity> authSession = this.authSessionsCache.get(event.getAuthSessionId());
SessionEntityWrapper<RootAuthenticationSessionEntity> authSession = cacheHolder.cache().get(event.getAuthSessionId());
updateAuthSession(authSession, event.getTabId(), event.getAuthNotesFragment());
}
@ -174,7 +179,7 @@ public class InfinispanAuthenticationSessionProviderFactory implements Authentic
}
}
this.authSessionsCache.replace(rootAuthSession.getId(), new SessionEntityWrapper<>(rootAuthSessionWrapper.getLocalMetadata(), rootAuthSession));
cacheHolder.cache().replace(rootAuthSession.getId(), new SessionEntityWrapper<>(rootAuthSessionWrapper.getLocalMetadata(), rootAuthSession));
}
@Override
@ -195,4 +200,10 @@ public class InfinispanAuthenticationSessionProviderFactory implements Authentic
public boolean isSupported(Config.Scope config) {
return InfinispanUtils.isEmbeddedInfinispan();
}
private InfinispanChangelogBasedTransaction<String, RootAuthenticationSessionEntity> createTransaction(KeycloakSession session) {
var tx = new InfinispanChangelogBasedTransaction<>(session, cacheHolder);
session.getProvider(InfinispanTransactionProvider.class).registerTransaction(tx);
return tx;
}
}

View File

@ -16,21 +16,24 @@
*/
package org.keycloak.models.sessions.infinispan;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.context.Flag;
import org.keycloak.models.KeycloakTransaction;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.keycloak.models.sessions.infinispan.transaction.DatabaseUpdate;
import org.keycloak.models.sessions.infinispan.transaction.NonBlockingTransaction;
/**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
*/
public class InfinispanKeycloakTransaction implements KeycloakTransaction {
public class InfinispanKeycloakTransaction implements NonBlockingTransaction {
private final static Logger log = Logger.getLogger(InfinispanKeycloakTransaction.class);
@ -39,7 +42,7 @@ public class InfinispanKeycloakTransaction implements KeycloakTransaction {
*/
private static final CacheTask TOMBSTONE = new CacheTask() {
@Override
public void execute() {
public void execute(AggregateCompletionStage<Void> stage) {
// noop
}
@ -49,48 +52,24 @@ public class InfinispanKeycloakTransaction implements KeycloakTransaction {
}
};
public enum CacheOperation {
private final Map<Object, CacheTask> tasks = new HashMap<>();
private enum CacheOperation {
ADD_WITH_LIFESPAN, REMOVE, REPLACE
}
private boolean active;
private boolean rollback;
private final Map<Object, CacheTask> tasks = new LinkedHashMap<>();
@Override
public void begin() {
active = true;
}
@Override
public void commit() {
if (rollback) {
throw new RuntimeException("Rollback only!");
public void asyncCommit(AggregateCompletionStage<Void> stage, Consumer<DatabaseUpdate> databaseUpdates) {
for (var task : tasks.values()) {
task.execute(stage);
}
tasks.values().forEach(CacheTask::execute);
}
@Override
public void rollback() {
public void asyncRollback(AggregateCompletionStage<Void> stage) {
tasks.clear();
}
@Override
public void setRollbackOnly() {
rollback = true;
}
@Override
public boolean getRollbackOnly() {
return rollback;
}
@Override
public boolean isActive() {
return active;
}
public <K, V> void put(BasicCache<K, V> cache, K key, V value, long lifespan, TimeUnit lifespanUnit) {
log.tracev("Adding cache operation: {0} on {1}", CacheOperation.ADD_WITH_LIFESPAN, key);
@ -100,8 +79,8 @@ public class InfinispanKeycloakTransaction implements KeycloakTransaction {
} else {
tasks.put(taskKey, new CacheTaskWithValue<V>(value, lifespan, lifespanUnit) {
@Override
public void execute() {
decorateCache(cache).put(key, value, lifespan, lifespanUnit);
public void execute(AggregateCompletionStage<Void> stage) {
stage.dependsOn(decorateCache(cache).putAsync(key, value, lifespan, lifespanUnit));
}
@Override
@ -133,8 +112,8 @@ public class InfinispanKeycloakTransaction implements KeycloakTransaction {
} else {
tasks.put(taskKey, new CacheTaskWithValue<V>(value, lifespan, lifespanUnit) {
@Override
public void execute() {
decorateCache(cache).replace(key, value, lifespan, lifespanUnit);
public void execute(AggregateCompletionStage<Void> stage) {
stage.dependsOn(decorateCache(cache).replaceAsync(key, value, lifespan, lifespanUnit));
}
@Override
@ -165,8 +144,8 @@ public class InfinispanKeycloakTransaction implements KeycloakTransaction {
tasks.put(taskKey, new CacheTask() {
@Override
public void execute() {
decorateCache(cache).remove(key);
public void execute(AggregateCompletionStage<Void> stage) {
stage.dependsOn(decorateCache(cache).removeAsync(key));
}
@Override
@ -196,26 +175,20 @@ public class InfinispanKeycloakTransaction implements KeycloakTransaction {
}
private static <K, V> Object getTaskKey(BasicCache<K, V> cache, K key) {
if (key instanceof String) {
return new StringBuilder(cache.getName())
.append("::")
.append(key).toString();
} else {
return key;
}
return key instanceof String ? cache.getName() + "::" + key : key;
}
public interface CacheTask {
void execute();
private interface CacheTask {
void execute(AggregateCompletionStage<Void> stage);
default Operation getOperation() {
return Operation.OTHER;
}
}
public enum Operation { PUT, REMOVE, OTHER }
private enum Operation { PUT, REMOVE, OTHER }
public static abstract class CacheTaskWithValue<V> implements CacheTask {
private static abstract class CacheTaskWithValue<V> implements CacheTask {
protected V value;
protected long lifespan;
protected TimeUnit lifespanUnit;
@ -242,9 +215,8 @@ public class InfinispanKeycloakTransaction implements KeycloakTransaction {
// Ignore return values. Should have better performance within cluster
private static <K, V> BasicCache<K, V> decorateCache(BasicCache<K, V> cache) {
if (cache instanceof RemoteCache)
return cache;
return ((Cache) cache).getAdvancedCache()
.withFlags(Flag.IGNORE_RETURN_VALUES, Flag.SKIP_REMOTE_LOOKUP);
return cache instanceof Cache<K, V> c ?
c.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES, Flag.SKIP_REMOTE_LOOKUP) :
cache;
}
}

View File

@ -46,12 +46,11 @@ public class InfinispanSingleUseObjectProvider implements SingleUseObjectProvide
private final boolean persistRevokedTokens;
private final InfinispanKeycloakTransaction tx;
public InfinispanSingleUseObjectProvider(KeycloakSession session, BasicCache<String, SingleUseObjectValueEntity> singleUseObjectCache, boolean persistRevokedTokens) {
public InfinispanSingleUseObjectProvider(KeycloakSession session, BasicCache<String, SingleUseObjectValueEntity> singleUseObjectCache, boolean persistRevokedTokens, InfinispanKeycloakTransaction tx) {
this.session = session;
this.singleUseObjectCache = singleUseObjectCache;
this.persistRevokedTokens = persistRevokedTokens;
this.tx = new InfinispanKeycloakTransaction();
session.getTransactionManager().enlistAfterCompletion(tx);
this.tx = tx;
}
@Override

View File

@ -24,11 +24,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.infinispan.Cache;
import org.infinispan.commons.api.BasicCache;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
@ -39,6 +36,7 @@ import org.keycloak.models.SingleUseObjectProvider;
import org.keycloak.models.SingleUseObjectProviderFactory;
import org.keycloak.models.session.RevokedTokenPersisterProvider;
import org.keycloak.models.sessions.infinispan.entities.SingleUseObjectValueEntity;
import org.keycloak.models.sessions.infinispan.transaction.InfinispanTransactionProvider;
import org.keycloak.models.utils.PostMigrationEvent;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.Provider;
@ -46,6 +44,7 @@ import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder;
import org.keycloak.provider.ServerInfoAwareProviderFactory;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.ACTION_TOKEN_CACHE;
import static org.keycloak.storage.datastore.DefaultDatastoreProviderFactory.setupClearExpiredRevokedTokensScheduledTask;
/**
@ -57,8 +56,6 @@ public class InfinispanSingleUseObjectProviderFactory implements SingleUseObject
public static final boolean DEFAULT_PERSIST_REVOKED_TOKENS = true;
public static final String LOADED = "loaded" + SingleUseObjectProvider.REVOKED_KEY;
private static final Logger LOG = Logger.getLogger(InfinispanSingleUseObjectProviderFactory.class);
protected BasicCache<String, SingleUseObjectValueEntity> singleUseObjectCache;
private volatile boolean initialized;
@ -66,19 +63,13 @@ public class InfinispanSingleUseObjectProviderFactory implements SingleUseObject
@Override
public Set<Class<? extends Provider>> dependsOn() {
return Set.of(InfinispanConnectionProvider.class);
return Set.of(InfinispanConnectionProvider.class, InfinispanTransactionProvider.class);
}
@Override
public InfinispanSingleUseObjectProvider create(KeycloakSession session) {
initialize(session);
return new InfinispanSingleUseObjectProvider(session, singleUseObjectCache, persistRevokedTokens);
}
static Supplier<BasicCache<String, SingleUseObjectValueEntity>> getSingleUseObjectCache(KeycloakSession session) {
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
Cache cache = connections.getCache(InfinispanConnectionProvider.ACTION_TOKEN_CACHE);
return () -> cache;
return new InfinispanSingleUseObjectProvider(session, singleUseObjectCache, persistRevokedTokens, createTransaction(session));
}
@Override
@ -113,8 +104,10 @@ public class InfinispanSingleUseObjectProviderFactory implements SingleUseObject
// It is necessary to put the cache initialization here, otherwise the cache would be initialized lazily, that
// means also listeners will start only after first cache initialization - that would be too late
if (singleUseObjectCache == null) {
InfinispanConnectionProvider connections = factory.create().getProvider(InfinispanConnectionProvider.class);
singleUseObjectCache = connections.getCache(InfinispanConnectionProvider.ACTION_TOKEN_CACHE);
try (var session = factory.create()) {
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
singleUseObjectCache = connections.getCache(ACTION_TOKEN_CACHE);
}
}
if (persistRevokedTokens) {
@ -172,5 +165,12 @@ public class InfinispanSingleUseObjectProviderFactory implements SingleUseObject
return builder.build();
}
private static InfinispanKeycloakTransaction createTransaction(KeycloakSession session) {
InfinispanTransactionProvider transactionProvider = session.getProvider(InfinispanTransactionProvider.class);
InfinispanKeycloakTransaction tx = new InfinispanKeycloakTransaction();
transactionProvider.registerTransaction(tx);
return tx;
}
}

View File

@ -24,7 +24,6 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.UserLoginFailureModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.changes.Tasks;
@ -35,7 +34,6 @@ import org.keycloak.models.sessions.infinispan.events.SessionEventsSenderTransac
import org.keycloak.models.sessions.infinispan.stream.Mappers;
import org.keycloak.models.sessions.infinispan.stream.SessionWrapperPredicate;
import org.keycloak.models.sessions.infinispan.util.FuturesHelper;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
import java.util.concurrent.Future;
@ -52,20 +50,15 @@ public class InfinispanUserLoginFailureProvider implements UserLoginFailureProvi
protected final KeycloakSession session;
protected final Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> loginFailureCache;
protected final InfinispanChangelogBasedTransaction<LoginFailureKey, LoginFailureEntity> loginFailuresTx;
protected final SessionEventsSenderTransaction clusterEventsSenderTx;
public InfinispanUserLoginFailureProvider(KeycloakSession session,
Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> loginFailureCache,
SerializeExecutionsByKey<LoginFailureKey> serializer) {
InfinispanChangelogBasedTransaction<LoginFailureKey, LoginFailureEntity> loginFailuresTx) {
this.session = session;
this.loginFailureCache = loginFailureCache;
this.loginFailuresTx = new InfinispanChangelogBasedTransaction<>(session, loginFailureCache, SessionTimeouts::getLoginFailuresLifespanMs, SessionTimeouts::getLoginFailuresMaxIdleMs, serializer);
this.loginFailuresTx = loginFailuresTx;
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);
session.getTransactionManager().enlistAfterCompletion(clusterEventsSenderTx);
session.getTransactionManager().enlistAfterCompletion(loginFailuresTx);
}
@ -113,7 +106,7 @@ public class InfinispanUserLoginFailureProvider implements UserLoginFailureProvi
FuturesHelper futures = new FuturesHelper();
Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> localCache = CacheDecorators.localCache(loginFailureCache);
Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> localCache = CacheDecorators.localCache(loginFailuresTx.getCache());
// Go through local cache data only
// entries from other nodes will be removed by each instance receiving the event

View File

@ -16,7 +16,8 @@
*/
package org.keycloak.models.sessions.infinispan;
import org.infinispan.Cache;
import java.util.Set;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.cluster.ClusterProvider;
@ -27,34 +28,39 @@ import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.UserLoginFailureProvider;
import org.keycloak.models.UserLoginFailureProviderFactory;
import org.keycloak.models.UserModel;
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.CacheHolder;
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangesUtils;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
import org.keycloak.models.sessions.infinispan.events.AbstractUserSessionClusterListener;
import org.keycloak.models.sessions.infinispan.events.RealmRemovedSessionEvent;
import org.keycloak.models.sessions.infinispan.events.RemoveAllUserLoginFailuresEvent;
import org.keycloak.models.sessions.infinispan.transaction.InfinispanTransactionProvider;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.models.utils.PostMigrationEvent;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.Provider;
import org.keycloak.provider.ProviderEvent;
import org.keycloak.provider.ProviderEventListener;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME;
/**
* @author <a href="mailto:mkanis@redhat.com">Martin Kanis</a>
*/
public class InfinispanUserLoginFailureProviderFactory implements UserLoginFailureProviderFactory<InfinispanUserLoginFailureProvider>, EnvironmentDependentProviderFactory {
public class InfinispanUserLoginFailureProviderFactory implements UserLoginFailureProviderFactory<InfinispanUserLoginFailureProvider>, EnvironmentDependentProviderFactory, ProviderEventListener {
private static final Logger log = Logger.getLogger(InfinispanUserLoginFailureProviderFactory.class);
public static final String REALM_REMOVED_SESSION_EVENT = "REALM_REMOVED_EVENT_SESSIONS";
public static final String REMOVE_ALL_LOGIN_FAILURES_EVENT = "REMOVE_ALL_LOGIN_FAILURES_EVENT";
SerializeExecutionsByKey<LoginFailureKey> serializer = new SerializeExecutionsByKey<>();
private CacheHolder<LoginFailureKey, LoginFailureEntity> cacheHolder;
@Override
public InfinispanUserLoginFailureProvider create(KeycloakSession session) {
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> loginFailures = connections.getCache(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME);
return new InfinispanUserLoginFailureProvider(session, loginFailures, serializer);
return new InfinispanUserLoginFailureProvider(session, createTransaction(session));
}
@Override
@ -63,14 +69,15 @@ public class InfinispanUserLoginFailureProviderFactory implements UserLoginFailu
@Override
public void postInit(final KeycloakSessionFactory factory) {
factory.register(event -> {
if (event instanceof PostMigrationEvent) {
KeycloakModelUtils.runJobInTransaction(factory, this::registerClusterListeners);
} else if (event instanceof UserModel.UserRemovedEvent userRemovedEvent) {
UserLoginFailureProvider provider = userRemovedEvent.getKeycloakSession().getProvider(UserLoginFailureProvider.class, getId());
provider.removeUserLoginFailure(userRemovedEvent.getRealm(), userRemovedEvent.getUser().getId());
}
});
factory.register(this);
try (var session = factory.create()) {
cacheHolder = InfinispanChangesUtils.createWithCache(session, LOGIN_FAILURE_CACHE_NAME, SessionTimeouts::getLoginFailuresLifespanMs, SessionTimeouts::getLoginFailuresMaxIdleMs);
}
}
@Override
public Set<Class<? extends Provider>> dependsOn() {
return Set.of(InfinispanConnectionProvider.class, InfinispanTransactionProvider.class);
}
protected void registerClusterListeners(KeycloakSession session) {
@ -122,4 +129,20 @@ public class InfinispanUserLoginFailureProviderFactory implements UserLoginFailu
public boolean isSupported(Config.Scope config) {
return InfinispanUtils.isEmbeddedInfinispan();
}
@Override
public void onEvent(ProviderEvent event) {
if (event instanceof PostMigrationEvent pme) {
KeycloakModelUtils.runJobInTransaction(pme.getFactory(), this::registerClusterListeners);
} else if (event instanceof UserModel.UserRemovedEvent userRemovedEvent) {
UserLoginFailureProvider provider = userRemovedEvent.getKeycloakSession().getProvider(UserLoginFailureProvider.class, getId());
provider.removeUserLoginFailure(userRemovedEvent.getRealm(), userRemovedEvent.getUser().getId());
}
}
private InfinispanChangelogBasedTransaction<LoginFailureKey, LoginFailureEntity> createTransaction(KeycloakSession session) {
var tx = new InfinispanChangelogBasedTransaction<>(session, cacheHolder);
session.getProvider(InfinispanTransactionProvider.class).registerTransaction(tx);
return tx;
}
}

View File

@ -56,7 +56,6 @@ import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.light.LightweightUserAdapter;
import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.changes.Tasks;
@ -108,22 +107,18 @@ public class InfinispanUserSessionProvider implements UserSessionProvider, Sessi
public InfinispanUserSessionProvider(KeycloakSession session,
PersisterLastSessionRefreshStore persisterLastSessionRefreshStore,
InfinispanKeyGenerator keyGenerator,
Cache<String, SessionEntityWrapper<UserSessionEntity>> sessionCache,
Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionCache,
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache,
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionCache,
InfinispanChangelogBasedTransaction<String, UserSessionEntity> sessionTx,
InfinispanChangelogBasedTransaction<String, UserSessionEntity> offlineSessionTx,
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionTx,
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> offlineClientSessionTx,
SessionFunction<UserSessionEntity> offlineSessionCacheEntryLifespanAdjuster,
SessionFunction<AuthenticatedClientSessionEntity> offlineClientSessionCacheEntryLifespanAdjuster,
SerializeExecutionsByKey<String> serializerSession,
SerializeExecutionsByKey<String> serializerOfflineSession,
SerializeExecutionsByKey<UUID> serializerClientSession,
SerializeExecutionsByKey<UUID> serializerOfflineClientSession) {
SessionFunction<AuthenticatedClientSessionEntity> offlineClientSessionCacheEntryLifespanAdjuster) {
this.session = session;
this.sessionTx = new InfinispanChangelogBasedTransaction<>(session, sessionCache, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, serializerSession);
this.offlineSessionTx = new InfinispanChangelogBasedTransaction<>(session, offlineSessionCache, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, serializerOfflineSession);
this.clientSessionTx = new InfinispanChangelogBasedTransaction<>(session, clientSessionCache, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, serializerClientSession);
this.offlineClientSessionTx = new InfinispanChangelogBasedTransaction<>(session, offlineClientSessionCache, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, serializerOfflineClientSession);
this.sessionTx = sessionTx;
this.offlineSessionTx = offlineSessionTx;
this.clientSessionTx = clientSessionTx;
this.offlineClientSessionTx = offlineClientSessionTx;
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);
@ -133,10 +128,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider, Sessi
this.offlineClientSessionCacheEntryLifespanAdjuster = offlineClientSessionCacheEntryLifespanAdjuster;
session.getTransactionManager().enlistAfterCompletion(clusterEventsSenderTx);
session.getTransactionManager().enlistAfterCompletion(sessionTx);
session.getTransactionManager().enlistAfterCompletion(offlineSessionTx);
session.getTransactionManager().enlistAfterCompletion(clientSessionTx);
session.getTransactionManager().enlistAfterCompletion(offlineClientSessionTx);
}
protected Cache<String, SessionEntityWrapper<UserSessionEntity>> getCache(boolean offline) {

View File

@ -20,6 +20,7 @@ package org.keycloak.models.sessions.infinispan;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -39,10 +40,13 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.UserSessionProviderFactory;
import org.keycloak.models.sessions.infinispan.changes.CacheHolder;
import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction;
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangesUtils;
import org.keycloak.models.sessions.infinispan.changes.PersistentSessionsWorker;
import org.keycloak.models.sessions.infinispan.changes.PersistentUpdate;
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.UserSessionPersistentChangelogBasedTransaction;
import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStoreFactory;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
@ -50,12 +54,14 @@ import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.models.sessions.infinispan.events.AbstractUserSessionClusterListener;
import org.keycloak.models.sessions.infinispan.events.RealmRemovedSessionEvent;
import org.keycloak.models.sessions.infinispan.events.RemoveUserSessionsEvent;
import org.keycloak.models.sessions.infinispan.transaction.InfinispanTransactionProvider;
import org.keycloak.models.sessions.infinispan.util.InfinispanKeyGenerator;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.models.utils.PostMigrationEvent;
import org.keycloak.models.utils.ResetTimeOffsetEvent;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.Provider;
import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder;
import org.keycloak.provider.ServerInfoAwareProviderFactory;
@ -81,16 +87,17 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
public static final String CONFIG_USE_BATCHES = "useBatches";
private static final boolean DEFAULT_USE_BATCHES = false;
private CacheHolder<String, UserSessionEntity> sessionCacheHolder;
private CacheHolder<String, UserSessionEntity> offlineSessionCacheHolder;
private CacheHolder<UUID, AuthenticatedClientSessionEntity> clientSessionCacheHolder;
private CacheHolder<UUID, AuthenticatedClientSessionEntity> offlineClientSessionCacheHolder;
private long offlineSessionCacheEntryLifespanOverride;
private long offlineClientSessionCacheEntryLifespanOverride;
private PersisterLastSessionRefreshStore persisterLastSessionRefreshStore;
private InfinispanKeyGenerator keyGenerator;
final SerializeExecutionsByKey<String> serializerSession = new SerializeExecutionsByKey<>();
final SerializeExecutionsByKey<String> serializerOfflineSession = new SerializeExecutionsByKey<>();
final SerializeExecutionsByKey<UUID> serializerClientSession = new SerializeExecutionsByKey<>();
final SerializeExecutionsByKey<UUID> serializerOfflineClientSession = new SerializeExecutionsByKey<>();
ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate;
private PersistentSessionsWorker persistentSessionsWorker;
private int maxBatchSize;
@ -99,48 +106,26 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
@Override
public UserSessionProvider create(KeycloakSession session) {
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = null;
Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionsCache = null;
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache = null;
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionsCache = null;
if (useCaches) {
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
cache = connections.getCache(USER_SESSION_CACHE_NAME);
offlineSessionsCache = connections.getCache(OFFLINE_USER_SESSION_CACHE_NAME);
clientSessionCache = connections.getCache(CLIENT_SESSION_CACHE_NAME);
offlineClientSessionsCache = connections.getCache(OFFLINE_CLIENT_SESSION_CACHE_NAME);
}
if (MultiSiteUtils.isPersistentSessionsEnabled()) {
var tx = createPersistentTransaction(session);
return new PersistentUserSessionProvider(
session,
keyGenerator,
cache,
offlineSessionsCache,
clientSessionCache,
offlineClientSessionsCache,
asyncQueuePersistentUpdate,
serializerSession,
serializerOfflineSession,
serializerClientSession,
serializerOfflineClientSession
tx.userTx,
tx.clientTx
);
}
var tx = createVolatileTransaction(session);
return new InfinispanUserSessionProvider(
session,
persisterLastSessionRefreshStore,
keyGenerator,
cache,
offlineSessionsCache,
clientSessionCache,
offlineClientSessionsCache,
tx.sessionTx,
tx.offlineSessionTx,
tx.clientSessionTx,
tx.offlineClientSessionTx,
this::deriveOfflineSessionCacheEntryLifespanMs,
this::deriveOfflineClientSessionCacheEntryLifespanOverrideMs,
serializerSession,
serializerOfflineSession,
serializerClientSession,
serializerOfflineClientSession
this::deriveOfflineClientSessionCacheEntryLifespanOverrideMs
);
}
@ -210,6 +195,29 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
maxBatchSize);
persistentSessionsWorker.start();
}
if (MultiSiteUtils.isPersistentSessionsEnabled()) {
if (useCaches) {
try (var session = factory.create()) {
sessionCacheHolder = InfinispanChangesUtils.createWithCache(session, USER_SESSION_CACHE_NAME, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs);
offlineSessionCacheHolder = InfinispanChangesUtils.createWithCache(session, OFFLINE_USER_SESSION_CACHE_NAME, SessionTimeouts::getOfflineSessionLifespanMs, SessionTimeouts::getOfflineSessionMaxIdleMs);
clientSessionCacheHolder = InfinispanChangesUtils.createWithCache(session, CLIENT_SESSION_CACHE_NAME, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs);
offlineClientSessionCacheHolder = InfinispanChangesUtils.createWithCache(session, OFFLINE_CLIENT_SESSION_CACHE_NAME, SessionTimeouts::getOfflineClientSessionLifespanMs, SessionTimeouts::getOfflineClientSessionMaxIdleMs);
}
} else {
sessionCacheHolder = InfinispanChangesUtils.createWithoutCache(SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs);
offlineSessionCacheHolder = InfinispanChangesUtils.createWithoutCache(SessionTimeouts::getOfflineSessionLifespanMs, SessionTimeouts::getOfflineSessionMaxIdleMs);
clientSessionCacheHolder = InfinispanChangesUtils.createWithoutCache(SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs);
offlineClientSessionCacheHolder = InfinispanChangesUtils.createWithoutCache(SessionTimeouts::getOfflineClientSessionLifespanMs, SessionTimeouts::getOfflineClientSessionMaxIdleMs);
}
} else {
try (var session = factory.create()) {
sessionCacheHolder = InfinispanChangesUtils.createWithCache(session, USER_SESSION_CACHE_NAME, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs);
offlineSessionCacheHolder = InfinispanChangesUtils.createWithCache(session, OFFLINE_USER_SESSION_CACHE_NAME, this::deriveOfflineSessionCacheEntryLifespanMs, SessionTimeouts::getOfflineSessionMaxIdleMs);
clientSessionCacheHolder = InfinispanChangesUtils.createWithCache(session, CLIENT_SESSION_CACHE_NAME, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs);
offlineClientSessionCacheHolder = InfinispanChangesUtils.createWithCache(session, OFFLINE_CLIENT_SESSION_CACHE_NAME, this::deriveOfflineClientSessionCacheEntryLifespanOverrideMs, SessionTimeouts::getOfflineClientSessionMaxIdleMs);
}
}
}
public void initializePersisterLastSessionRefreshStore(final KeycloakSessionFactory sessionFactory) {
@ -363,5 +371,49 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
return builder.build();
}
@Override
public Set<Class<? extends Provider>> dependsOn() {
return Set.of(InfinispanConnectionProvider.class, InfinispanTransactionProvider.class);
}
private VolatileTransactions createVolatileTransaction(KeycloakSession session) {
var sessionTx = new InfinispanChangelogBasedTransaction<>(session, sessionCacheHolder);
var offlineSessionTx = new InfinispanChangelogBasedTransaction<>(session, offlineSessionCacheHolder);
var clientSessionTx = new InfinispanChangelogBasedTransaction<>(session, clientSessionCacheHolder);
var offlineClientSessionTx = new InfinispanChangelogBasedTransaction<>(session, offlineClientSessionCacheHolder);
var transactionProvider = session.getProvider(InfinispanTransactionProvider.class);
transactionProvider.registerTransaction(sessionTx);
transactionProvider.registerTransaction(offlineSessionTx);
transactionProvider.registerTransaction(clientSessionTx);
transactionProvider.registerTransaction(offlineClientSessionTx);
return new VolatileTransactions(sessionTx, offlineSessionTx, clientSessionTx, offlineClientSessionTx);
}
private PersistentTransaction createPersistentTransaction(KeycloakSession session) {
var sessionTx = new UserSessionPersistentChangelogBasedTransaction(session,
asyncQueuePersistentUpdate,
sessionCacheHolder,
offlineSessionCacheHolder);
var clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session,
asyncQueuePersistentUpdate,
clientSessionCacheHolder,
offlineClientSessionCacheHolder,
sessionTx);
var transactionProvider = session.getProvider(InfinispanTransactionProvider.class);
transactionProvider.registerTransaction(sessionTx);
transactionProvider.registerTransaction(clientSessionTx);
return new PersistentTransaction(sessionTx, clientSessionTx);
}
private record VolatileTransactions(InfinispanChangelogBasedTransaction<String, UserSessionEntity> sessionTx,
InfinispanChangelogBasedTransaction<String, UserSessionEntity> offlineSessionTx,
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionTx,
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> offlineClientSessionTx) {}
private record PersistentTransaction(UserSessionPersistentChangelogBasedTransaction userTx, ClientSessionPersistentChangelogBasedTransaction clientTx) {}
}

View File

@ -66,8 +66,6 @@ import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction;
import org.keycloak.models.sessions.infinispan.changes.JpaChangesPerformer;
import org.keycloak.models.sessions.infinispan.changes.MergedUpdate;
import org.keycloak.models.sessions.infinispan.changes.PersistentUpdate;
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdatesList;
@ -103,11 +101,6 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
protected final KeycloakSession session;
protected final Cache<String, SessionEntityWrapper<UserSessionEntity>> sessionCache;
protected final Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionCache;
protected final Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache;
protected final Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionCache;
protected final UserSessionPersistentChangelogBasedTransaction sessionTx;
protected final ClientSessionPersistentChangelogBasedTransaction clientSessionTx;
@ -117,58 +110,27 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
public PersistentUserSessionProvider(KeycloakSession session,
InfinispanKeyGenerator keyGenerator,
Cache<String, SessionEntityWrapper<UserSessionEntity>> sessionCache,
Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionCache,
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache,
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionCache,
ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate,
SerializeExecutionsByKey<String> serializerSession,
SerializeExecutionsByKey<String> serializerOfflineSession,
SerializeExecutionsByKey<UUID> serializerClientSession,
SerializeExecutionsByKey<UUID> serializerOfflineClientSession) {
UserSessionPersistentChangelogBasedTransaction sessionTx,
ClientSessionPersistentChangelogBasedTransaction clientSessionTx) {
if (!MultiSiteUtils.isPersistentSessionsEnabled()) {
throw new IllegalStateException("Persistent user sessions are not enabled");
}
this.session = session;
this.sessionCache = sessionCache;
this.clientSessionCache = clientSessionCache;
this.offlineSessionCache = offlineSessionCache;
this.offlineClientSessionCache = offlineClientSessionCache;
this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session,
sessionCache, offlineSessionCache,
SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs,
SessionTimeouts::getOfflineSessionLifespanMs, SessionTimeouts::getOfflineSessionMaxIdleMs,
asyncQueuePersistentUpdate,
serializerSession,
serializerOfflineSession);
this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session,
clientSessionCache, offlineClientSessionCache,
SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs,
SessionTimeouts::getOfflineClientSessionLifespanMs, SessionTimeouts::getOfflineClientSessionMaxIdleMs,
sessionTx,
asyncQueuePersistentUpdate,
serializerClientSession,
serializerOfflineClientSession);
this.sessionTx = sessionTx;
this.clientSessionTx = clientSessionTx;
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);
this.keyGenerator = keyGenerator;
session.getTransactionManager().enlistAfterCompletion(clusterEventsSenderTx);
session.getTransactionManager().enlistAfterCompletion(sessionTx);
session.getTransactionManager().enlistAfterCompletion(clientSessionTx);
}
protected Cache<String, SessionEntityWrapper<UserSessionEntity>> getCache(boolean offline) {
return offline ? offlineSessionCache : sessionCache;
return sessionTx.getCache(offline);
}
protected Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> getClientSessionCache(boolean offline) {
return offline ? offlineClientSessionCache : clientSessionCache;
return clientSessionTx.getCache(offline);
}
@Override
@ -220,7 +182,7 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
public UserSessionModel createUserSession(String id, RealmModel realm, UserModel user, String loginUsername, String ipAddress,
String authMethod, boolean rememberMe, String brokerSessionId, String brokerUserId, UserSessionModel.SessionPersistenceState persistenceState) {
if (id == null) {
id = keyGenerator.generateKeyString(session, sessionCache);
id = keyGenerator.generateKeyString(session, sessionTx.getCache(false));
}
UserSessionEntity entity = new UserSessionEntity(id);
@ -902,8 +864,10 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
* in a future version of Keycloak.
*/
public void migrateNonPersistentSessionsToPersistentSessions() {
JpaChangesPerformer<String, UserSessionEntity> userSessionPerformer = new JpaChangesPerformer<>(sessionCache.getName(), new ArrayBlockingQueue<>(1));
JpaChangesPerformer<UUID, AuthenticatedClientSessionEntity> clientSessionPerformer = new JpaChangesPerformer<>(clientSessionCache.getName(), new ArrayBlockingQueue<>(1));
var sessionCache = sessionTx.getCache(false);
var clientSessionCache = clientSessionTx.getCache(false);
JpaChangesPerformer<String, UserSessionEntity> userSessionPerformer = new JpaChangesPerformer<>(sessionCache.getName(), null);
JpaChangesPerformer<UUID, AuthenticatedClientSessionEntity> clientSessionPerformer = new JpaChangesPerformer<>(clientSessionCache.getName(), null);
AtomicInteger currentBatch = new AtomicInteger(0);
var persistence = ComponentRegistry.componentOf(sessionCache, PersistenceManager.class);
if (persistence != null && !persistence.getStoresAsString().isEmpty()) {
@ -922,8 +886,8 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
sessionCache.clear();
clientSessionCache.clear();
// Even though offline sessions haven't been migrated, they are cleared as the IDs of the client sessions have changed. It is safe to clear them as they are already stored in the database.
offlineSessionCache.clear();
offlineClientSessionCache.clear();
sessionTx.getCache(true).clear();
clientSessionTx.getCache(true).clear();
log.infof("Migrated %d user sessions total.", currentBatch.intValue());
}
@ -938,6 +902,7 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
// ignoring old and unknown realm found in the session
return;
}
var clientSessionCache = clientSessionTx.getCache(false);
sessionEntityWrapper.getEntity().getAuthenticatedClientSessions().forEach((clientId, uuid) -> {
SessionEntityWrapper<AuthenticatedClientSessionEntity> clientSession = clientSessionCache.get(uuid);
if (clientSession != null) {
@ -963,8 +928,8 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
private <E extends SessionEntity, K> void flush(JpaChangesPerformer<K, E> userSessionsPerformer, JpaChangesPerformer<UUID, AuthenticatedClientSessionEntity> clientSessionPerformer) {
KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(),
s -> {
userSessionsPerformer.applyChangesSynchronously(s);
clientSessionPerformer.applyChangesSynchronously(s);
userSessionsPerformer.write(s);
clientSessionPerformer.write(s);
});
userSessionsPerformer.clear();
clientSessionPerformer.clear();

View File

@ -0,0 +1,33 @@
/*
* Copyright 2025 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes;
import org.infinispan.Cache;
import org.infinispan.util.concurrent.ActionSequencer;
import org.keycloak.models.sessions.infinispan.SessionFunction;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
/**
* Groups the {@link Cache}, the {@link ActionSequencer} (used by replace method) and the {@link SessionFunction} to
* compute the lifespan, and the max-idle for this session entity.
*/
public record CacheHolder<K, V extends SessionEntity>(Cache<K, SessionEntityWrapper<V>> cache,
ActionSequencer sequencer,
SessionFunction<V> lifespanFunction,
SessionFunction<V> maxIdleFunction) {
}

View File

@ -26,7 +26,6 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.sessions.infinispan.PersistentUserSessionProvider;
import org.keycloak.models.sessions.infinispan.SessionFunction;
import org.keycloak.models.sessions.infinispan.UserSessionAdapter;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore;
@ -45,17 +44,11 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
private final UserSessionPersistentChangelogBasedTransaction userSessionTx;
public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session,
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> cache,
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineCache,
SessionFunction<AuthenticatedClientSessionEntity> lifespanMsLoader,
SessionFunction<AuthenticatedClientSessionEntity> maxIdleTimeMsLoader,
SessionFunction<AuthenticatedClientSessionEntity> offlineLifespanMsLoader,
SessionFunction<AuthenticatedClientSessionEntity> offlineMaxIdleTimeMsLoader,
UserSessionPersistentChangelogBasedTransaction userSessionTx,
ArrayBlockingQueue<PersistentUpdate> batchingQueue,
SerializeExecutionsByKey<UUID> serializerOnline,
SerializeExecutionsByKey<UUID> serializerOffline) {
super(session, CLIENT_SESSION_CACHE_NAME, cache, offlineCache, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue, serializerOnline, serializerOffline);
CacheHolder<UUID, AuthenticatedClientSessionEntity> cacheHolder,
CacheHolder<UUID, AuthenticatedClientSessionEntity> offlineCacheHolder,
UserSessionPersistentChangelogBasedTransaction userSessionTx) {
super(session, CLIENT_SESSION_CACHE_NAME, batchingQueue, cacheHolder, offlineCacheHolder);
this.userSessionTx = userSessionTx;
}

View File

@ -1,126 +0,0 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes;
import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.keycloak.connections.infinispan.InfinispanUtil;
import org.keycloak.models.sessions.infinispan.CacheDecorators;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implements SessionChangesPerformer<K, V> {
private static final Logger LOG = Logger.getLogger(EmbeddedCachesChangesPerformer.class);
private final Cache<K, SessionEntityWrapper<V>> cache;
private final SerializeExecutionsByKey<K> serializer;
private final List<Runnable> changes = new LinkedList<>();
public EmbeddedCachesChangesPerformer(Cache<K, SessionEntityWrapper<V>> cache, SerializeExecutionsByKey<K> serializer) {
this.cache = cache;
this.serializer = serializer;
}
private void runOperationInCluster(K key, MergedUpdate<V> task, SessionEntityWrapper<V> sessionWrapper) {
SessionUpdateTask.CacheOperation operation = task.getOperation();
// Don't need to run update of underlying entity. Local updates were already run
//task.runUpdate(session);
switch (operation) {
case REMOVE:
// Just remove it
CacheDecorators.ignoreReturnValues(cache).remove(key);
break;
case ADD:
CacheDecorators.ignoreReturnValues(cache)
.put(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS);
LOG.tracef("Added entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs());
break;
case ADD_IF_ABSENT:
SessionEntityWrapper<V> existing = cache.putIfAbsent(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS);
if (existing != null) {
LOG.debugf("Existing entity in cache for key: %s . Will update it", key);
// Apply updates on the existing entity and replace it
task.runUpdate(existing.getEntity());
replace(key, task, existing, task.getLifespanMs(), task.getMaxIdleTimeMs());
} else {
LOG.tracef("Add_if_absent successfully called for entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs());
}
break;
case REPLACE:
replace(key, task, sessionWrapper, task.getLifespanMs(), task.getMaxIdleTimeMs());
break;
default:
throw new IllegalStateException("Unsupported state " + operation);
}
}
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
serializer.runSerialized(key, () -> {
SessionEntityWrapper<V> oldVersion = oldVersionEntity;
SessionEntityWrapper<V> returnValue = null;
int iteration = 0;
V session = oldVersion.getEntity();
while (iteration++ < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
returnValue = cache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
if (returnValue == null) {
LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
return;
}
if (returnValue.getVersion().equals(newVersionEntity.getVersion())) {
if (LOG.isTraceEnabled()) {
LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
}
return;
}
oldVersion = returnValue;
session = oldVersion.getEntity();
task.runUpdate(session);
}
LOG.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue);
});
}
private SessionEntityWrapper<V> generateNewVersionAndWrapEntity(V entity, Map<String, String> localMetadata) {
return new SessionEntityWrapper<>(localMetadata, entity);
}
@Override
public void registerChange(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
changes.add(() -> runOperationInCluster(entry.getKey(), merged, entry.getValue().getEntityWrapper()));
}
@Override
public void applyChanges() {
changes.forEach(Runnable::run);
}
}

View File

@ -22,43 +22,35 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.infinispan.Cache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.jboss.logging.Logger;
import org.keycloak.connections.infinispan.InfinispanUtil;
import org.keycloak.models.AbstractKeycloakTransaction;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.sessions.infinispan.CacheDecorators;
import org.keycloak.models.sessions.infinispan.SessionFunction;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.transaction.DatabaseUpdate;
import org.keycloak.models.sessions.infinispan.transaction.NonBlockingTransaction;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> extends AbstractKeycloakTransaction implements SessionsChangelogBasedTransaction<K, V> {
public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> implements SessionsChangelogBasedTransaction<K, V>, NonBlockingTransaction {
public static final Logger logger = Logger.getLogger(InfinispanChangelogBasedTransaction.class);
protected final KeycloakSession kcSession;
protected final Cache<K, SessionEntityWrapper<V>> cache;
protected final Map<K, SessionUpdatesList<V>> updates = new HashMap<>();
protected final CacheHolder<K, V> cacheHolder;
protected final SessionFunction<V> lifespanMsLoader;
protected final SessionFunction<V> maxIdleTimeMsLoader;
private final SerializeExecutionsByKey<K> serializer;
public InfinispanChangelogBasedTransaction(KeycloakSession kcSession, Cache<K, SessionEntityWrapper<V>> cache,
SessionFunction<V> lifespanMsLoader, SessionFunction<V> maxIdleTimeMsLoader, SerializeExecutionsByKey<K> serializer) {
public InfinispanChangelogBasedTransaction(KeycloakSession kcSession, CacheHolder<K, V> cacheHolder) {
this.kcSession = kcSession;
this.cache = cache;
this.lifespanMsLoader = lifespanMsLoader;
this.maxIdleTimeMsLoader = maxIdleTimeMsLoader;
this.serializer = serializer;
this.cacheHolder = cacheHolder;
}
@ -67,7 +59,7 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
SessionUpdatesList<V> myUpdates = updates.get(key);
if (myUpdates == null) {
// Lookup entity from cache
SessionEntityWrapper<V> wrappedEntity = cache.get(key);
SessionEntityWrapper<V> wrappedEntity = cacheHolder.cache().get(key);
if (wrappedEntity == null) {
logger.tracef("Not present cache item for key %s", key);
return;
@ -103,13 +95,14 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
}
}
@Deprecated(since = "26.4", forRemoval = true)
//unused method
public void reloadEntityInCurrentTransaction(RealmModel realm, K key, SessionEntityWrapper<V> entity) {
if (entity == null) {
throw new IllegalArgumentException("Null entity not allowed");
}
SessionEntityWrapper<V> latestEntity = cache.get(key);
SessionEntityWrapper<V> latestEntity = cacheHolder.cache().get(key);
if (latestEntity == null) {
return;
}
@ -128,7 +121,7 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
public SessionEntityWrapper<V> get(K key) {
SessionUpdatesList<V> myUpdates = updates.get(key);
if (myUpdates == null) {
SessionEntityWrapper<V> wrappedEntity = cache.get(key);
SessionEntityWrapper<V> wrappedEntity = cacheHolder.cache().get(key);
if (wrappedEntity == null) {
return null;
}
@ -150,7 +143,7 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
}
@Override
protected void commitImpl() {
public void asyncCommit(AggregateCompletionStage<Void> stage, Consumer<DatabaseUpdate> databaseUpdates) {
for (Map.Entry<K, SessionUpdatesList<V>> entry : updates.entrySet()) {
SessionUpdatesList<V> sessionUpdates = entry.getValue();
SessionEntityWrapper<V> sessionWrapper = sessionUpdates.getEntityWrapper();
@ -172,105 +165,28 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
RealmModel realm = sessionUpdates.getRealm();
long lifespanMs = lifespanMsLoader.apply(realm, sessionUpdates.getClient(), sessionWrapper.getEntity());
long maxIdleTimeMs = maxIdleTimeMsLoader.apply(realm, sessionUpdates.getClient(), sessionWrapper.getEntity());
long lifespanMs = cacheHolder.lifespanFunction().apply(realm, sessionUpdates.getClient(), sessionWrapper.getEntity());
long maxIdleTimeMs = cacheHolder.maxIdleFunction().apply(realm, sessionUpdates.getClient(), sessionWrapper.getEntity());
MergedUpdate<V> merged = MergedUpdate.computeUpdate(updateTasks, sessionWrapper, lifespanMs, maxIdleTimeMs);
if (merged != null) {
// Now run the operation in our cluster
runOperationInCluster(entry.getKey(), merged, sessionWrapper);
InfinispanChangesUtils.runOperationInCluster(cacheHolder, entry.getKey(), merged, sessionWrapper, stage, logger);
}
}
}
private void runOperationInCluster(K key, MergedUpdate<V> task, SessionEntityWrapper<V> sessionWrapper) {
SessionUpdateTask.CacheOperation operation = task.getOperation();
// Don't need to run update of underlying entity. Local updates were already run
//task.runUpdate(session);
switch (operation) {
case REMOVE:
// Just remove it
CacheDecorators.ignoreReturnValues(cache).remove(key);
break;
case ADD:
CacheDecorators.ignoreReturnValues(cache)
.put(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS);
logger.tracef("Added entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs());
break;
case ADD_IF_ABSENT:
SessionEntityWrapper<V> existing = cache.putIfAbsent(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS);
if (existing != null) {
logger.debugf("Existing entity in cache for key: %s . Will update it", key);
// Apply updates on the existing entity and replace it
task.runUpdate(existing.getEntity());
replace(key, task, existing, task.getLifespanMs(), task.getMaxIdleTimeMs());
} else {
logger.tracef("Add_if_absent successfully called for entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs());
}
break;
case REPLACE:
replace(key, task, sessionWrapper, task.getLifespanMs(), task.getMaxIdleTimeMs());
break;
default:
throw new IllegalStateException("Unsupported state " + operation);
}
}
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
serializer.runSerialized(key, () -> {
SessionEntityWrapper<V> oldVersion = oldVersionEntity;
SessionEntityWrapper<V> returnValue = null;
int iteration = 0;
V session = oldVersion.getEntity();
while (iteration++ < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
if (session.shouldEvaluateRemoval() && task.shouldRemove(session)) {
logger.debugf("Entity %s removed after evaluation", key);
CacheDecorators.ignoreReturnValues(cache).remove(key);
return;
}
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
returnValue = cache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
if (returnValue == null) {
logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
return;
}
if (returnValue.getVersion().equals(newVersionEntity.getVersion())){
if (logger.isTraceEnabled()) {
logger.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
}
return;
}
oldVersion = returnValue;
session = oldVersion.getEntity();
task.runUpdate(session);
}
logger.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue);
});
}
@Override
protected void rollbackImpl() {
public void asyncRollback(AggregateCompletionStage<Void> stage) {
updates.clear();
}
/**
* @return The {@link Cache} backing up this transaction.
*/
public Cache<K, SessionEntityWrapper<V>> getCache() {
return cache;
return cacheHolder.cache();
}
/**
@ -296,7 +212,7 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
// exists in transaction, avoid cache operation
return updatesList.getEntityWrapper().getEntity();
}
SessionEntityWrapper<V> existing = cache.putIfAbsent(key, session, lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS);
SessionEntityWrapper<V> existing = cacheHolder.cache().putIfAbsent(key, session, lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS);
if (existing == null) {
// keep track of the imported session for updates
updates.put(key, new SessionUpdatesList<>(realmModel, session));
@ -343,7 +259,7 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
//nothing to import, already expired
return;
}
var future = cache.putIfAbsentAsync(key, session, lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS);
var future = cacheHolder.cache().putIfAbsentAsync(key, session, lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS);
// write result into concurrent hash map because the consumer is invoked in a different thread each time.
stage.dependsOn(future.thenAccept(existing -> allSessions.put(key, existing == null ? session : existing)));
});
@ -352,8 +268,4 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
allSessions.forEach((key, wrapper) -> updates.put(key, new SessionUpdatesList<>(realmModel, wrapper)));
}
private static <V extends SessionEntity> SessionEntityWrapper<V> generateNewVersionAndWrapEntity(V entity, Map<String, String> localMetadata) {
return new SessionEntityWrapper<>(localMetadata, entity);
}
}

View File

@ -0,0 +1,179 @@
/*
* Copyright 2025 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.util.concurrent.ActionSequencer;
import org.jboss.logging.Logger;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.connections.infinispan.InfinispanUtil;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.sessions.infinispan.CacheDecorators;
import org.keycloak.models.sessions.infinispan.SessionFunction;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
/**
* Utility methods for embedded and change-log based transaction
*/
public class InfinispanChangesUtils {
private InfinispanChangesUtils() {
}
public static <K, V extends SessionEntity> CacheHolder<K, V> createWithCache(KeycloakSession session,
String cacheName,
SessionFunction<V> lifespanFunction,
SessionFunction<V> maxIdleFunction) {
var connections = session.getProvider(InfinispanConnectionProvider.class);
var cache = connections.<K, SessionEntityWrapper<V>>getCache(cacheName);
var sequencer = new ActionSequencer(connections.getExecutor(cacheName + "Replace"), false, null);
return new CacheHolder<>(cache, sequencer, lifespanFunction, maxIdleFunction);
}
public static <K, V extends SessionEntity> CacheHolder<K, V> createWithoutCache(SessionFunction<V> lifespanFunction,
SessionFunction<V> maxIdleFunction) {
return new CacheHolder<>(null, null, lifespanFunction, maxIdleFunction);
}
public static <K, V extends SessionEntity> void runOperationInCluster(
CacheHolder<K, V> cacheHolder,
K key,
MergedUpdate<V> task,
SessionEntityWrapper<V> sessionWrapper,
AggregateCompletionStage<Void> stage,
Logger logger
) {
SessionUpdateTask.CacheOperation operation = task.getOperation();
// Don't need to run update of underlying entity. Local updates were already run
//task.runUpdate(session);
switch (operation) {
case REMOVE:
// Just remove it
stage.dependsOn(CacheDecorators.ignoreReturnValues(cacheHolder.cache()).removeAsync(key));
break;
case ADD:
CompletableFuture<?> future = CacheDecorators.ignoreReturnValues(cacheHolder.cache())
.putAsync(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS);
if (logger.isTraceEnabled()) {
future = future.thenRun(() -> logger.tracef("Added entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cacheHolder.cache().getName(), task.getLifespanMs(), task.getMaxIdleTimeMs()));
}
stage.dependsOn(future);
break;
case ADD_IF_ABSENT:
CompletableFuture<Void> putIfAbsentFuture = cacheHolder.cache().putIfAbsentAsync(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS)
.thenCompose(existing -> handlePutIfAbsentResponse(cacheHolder, existing, key, task, logger));
stage.dependsOn(putIfAbsentFuture);
break;
case REPLACE:
stage.dependsOn(replace(cacheHolder, key, task, sessionWrapper, logger));
break;
default:
throw new IllegalStateException("Unsupported state " + operation);
}
}
private static <K, V extends SessionEntity> CompletionStage<Void> handlePutIfAbsentResponse(
CacheHolder<K, V> cacheHolder,
SessionEntityWrapper<V> existing,
K key,
MergedUpdate<V> task,
Logger logger
) {
if (existing == null) {
if (logger.isTraceEnabled()) {
logger.tracef("Add_if_absent successfully called for entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cacheHolder.cache().getName(), task.getLifespanMs(), task.getMaxIdleTimeMs());
}
return CompletableFutures.completedNull();
}
if (logger.isDebugEnabled()) {
logger.debugf("Existing entity in cache for key: %s . Will update it", key);
}
// Apply updates on the existing entity and replace it
task.runUpdate(existing.getEntity());
return replace(cacheHolder, key, task, existing, logger);
}
private static <K, V extends SessionEntity> CompletionStage<Void> replace(
CacheHolder<K, V> cacheHolder,
K key,
MergedUpdate<V> task,
SessionEntityWrapper<V> oldVersionEntity,
Logger logger) {
return cacheHolder.sequencer().orderOnKey(key, () -> replaceIteration(cacheHolder.cache(), key, task, null, oldVersionEntity, 0, logger));
}
private static <K, V extends SessionEntity> CompletionStage<Void> replaceIteration(
Cache<K, SessionEntityWrapper<V>> cache,
K key,
MergedUpdate<V> task,
SessionEntityWrapper<V> previousSession,
SessionEntityWrapper<V> expectedSession,
int iteration,
Logger logger
) {
if (iteration >= InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
logger.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), previousSession, expectedSession);
return CompletableFutures.completedNull();
}
V session = expectedSession.getEntity();
if (session.shouldEvaluateRemoval() && task.shouldRemove(session)) {
logger.debugf("Entity %s removed after evaluation", key);
return CacheDecorators.ignoreReturnValues(cache).removeAsync(key).thenRun(CompletionStages.NO_OP_RUNNABLE);
}
SessionEntityWrapper<V> newVersionEntity = new SessionEntityWrapper<>(expectedSession.getLocalMetadata(), session);
CompletionStage<SessionEntityWrapper<V>> stage = cache.computeIfPresentAsync(key, new ReplaceFunction<>(expectedSession.getVersion(), newVersionEntity), task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS);
return stage.thenCompose(rv -> handleReplaceResponse(cache, key, task, expectedSession, newVersionEntity, rv, iteration + 1, logger));
}
private static <K, V extends SessionEntity> CompletionStage<Void> handleReplaceResponse(
Cache<K, SessionEntityWrapper<V>> cache,
K key,
MergedUpdate<V> task,
SessionEntityWrapper<V> expectedSession,
SessionEntityWrapper<V> newSession,
SessionEntityWrapper<V> returnValue,
int iteration,
Logger logger
) {
if (returnValue == null) {
logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
return CompletableFutures.completedNull();
}
if (returnValue.getVersion().equals(newSession.getVersion())) {
if (logger.isTraceEnabled()) {
logger.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, expectedSession.getVersion(), newSession.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
}
return CompletableFutures.completedNull();
}
task.runUpdate(returnValue.getEntity());
return replaceIteration(cache, key, task, expectedSession, returnValue, iteration + 1, logger);
}
}

View File

@ -17,6 +17,7 @@
package org.keycloak.models.sessions.infinispan.changes;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.function.TriConsumer;
import org.jboss.logging.Logger;
import org.keycloak.models.AuthenticatedClientSessionModel;
@ -42,48 +43,114 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_SESSION_CACHE_NAME;
public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionChangesPerformer<K, V> {
public class JpaChangesPerformer<K, V extends SessionEntity> {
private static final Logger LOG = Logger.getLogger(JpaChangesPerformer.class);
private final String cacheName;
private final List<PersistentUpdate> changes = new LinkedList<>();
private final List<PersistentUpdate> changes;
private final TriConsumer<KeycloakSession, Map.Entry<K, SessionUpdatesList<V>>, MergedUpdate<V>> processor;
private final ArrayBlockingQueue<PersistentUpdate> batchingQueue;
private boolean warningShown = false;
public JpaChangesPerformer(String cacheName, ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
this.cacheName = cacheName;
// The changes list is only used when batching is disabled.
this.changes = batchingQueue == null ? new ArrayList<>(2) : List.of();
this.batchingQueue = batchingQueue;
processor = processor();
processor = processor(cacheName);
}
@Override
/**
* Checks if this instance support non-blocking writes.
* <p>
* If this instance is non-blocking, the invoker must use
* {@link #asyncWrite(AggregateCompletionStage, Map.Entry, MergedUpdate)}.
* <p>
* Otherwise, the implementation must support {@link #registerChange(Map.Entry, MergedUpdate)} and
* {@link #write(KeycloakSession)}. The invoker should register the change using the first method and applied them
* in a blocking way using the later method.
*
* @return {@code true} if this instance is non-blocking.
* @see #asyncWrite(AggregateCompletionStage, Map.Entry, MergedUpdate)
* @see #registerChange(Map.Entry, MergedUpdate)
* @see #write(KeycloakSession)
*/
public boolean isNonBlocking() {
return batchingQueue != null;
}
/**
* Performs a non-blocking write into the database.
* <p>
* The implementation should register the {@link CompletionStage} into the {@link AggregateCompletionStage}.
*
* @param stage The {@link AggregateCompletionStage} to collect the {@link CompletionStage}.
* @param entry The {@link Map.Entry} with the ID and the session.
* @param merged The {@link MergedUpdate} to be applied to the existing session.
* @throws NullPointerException if this instance does not support non-blocking writes.
* @see #isNonBlocking()
*/
public void asyncWrite(AggregateCompletionStage<Void> stage, Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
var update = newUpdate(entry, merged);
offer(update);
stage.dependsOn(update.future());
}
/**
* It queues a database write to be applied at a future invocation.
*
* @param entry The {@link Map.Entry} with the ID and the session.
* @param merged The {@link MergedUpdate} to be applied to the existing session.
* @throws UnsupportedOperationException if this instance does not support blocking writes.
* @see #isNonBlocking()
*/
public void registerChange(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
changes.add(new PersistentUpdate(innerSession -> processor.accept(innerSession, entry, merged)));
changes.add(newUpdate(entry, merged));
}
private TriConsumer<KeycloakSession, Map.Entry<K, SessionUpdatesList<V>>, MergedUpdate<V>> processor() {
/**
* Applies all the pending write operation into the database.
*
* @param session The {@link KeycloakSession} to access the database.
*/
public void write(KeycloakSession session) {
changes.forEach(persistentUpdate -> persistentUpdate.perform(session));
}
/**
* Clears any pending blocking changes.
*
* @throws UnsupportedOperationException if this instance does not support blocking writes.
*/
public void clear() {
changes.clear();
}
private PersistentUpdate newUpdate(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
return new PersistentUpdate(innerSession -> processor.accept(innerSession, entry, merged));
}
private TriConsumer<KeycloakSession, Map.Entry<K, SessionUpdatesList<V>>, MergedUpdate<V>> processor(String cacheName) {
return switch (cacheName) {
case USER_SESSION_CACHE_NAME, OFFLINE_USER_SESSION_CACHE_NAME -> this::processUserSessionUpdate;
case CLIENT_SESSION_CACHE_NAME, OFFLINE_CLIENT_SESSION_CACHE_NAME -> this::processClientSessionUpdate;
case USER_SESSION_CACHE_NAME, OFFLINE_USER_SESSION_CACHE_NAME ->
JpaChangesPerformer::processUserSessionUpdate;
case CLIENT_SESSION_CACHE_NAME, OFFLINE_CLIENT_SESSION_CACHE_NAME ->
JpaChangesPerformer::processClientSessionUpdate;
default -> throw new IllegalStateException("Unexpected value: " + cacheName);
};
}
private boolean warningShown = false;
private void offer(PersistentUpdate update) {
if (!batchingQueue.offer(update)) {
if (!warningShown) {
@ -100,36 +167,7 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
}
}
@Override
public void applyChanges() {
if (!changes.isEmpty()) {
changes.forEach(this::offer);
List<Throwable> exceptions = new ArrayList<>();
CompletableFuture.allOf(changes.stream().map(f -> f.future().exceptionally(throwable -> {
exceptions.add(throwable);
return null;
})).toArray(CompletableFuture[]::new)).join();
// If any of those futures has failed, add the exceptions as suppressed exceptions to our runtime exception
if (!exceptions.isEmpty()) {
RuntimeException ex = new RuntimeException("unable to complete the session updates");
exceptions.forEach(ex::addSuppressed);
throw ex;
}
clear();
}
}
public void applyChangesSynchronously(KeycloakSession session) {
if (!changes.isEmpty()) {
changes.forEach(persistentUpdate -> persistentUpdate.perform(session));
}
}
public void clear() {
changes.clear();
}
private void processClientSessionUpdate(KeycloakSession session, Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
private static <K, V extends SessionEntity> void processClientSessionUpdate(KeycloakSession session, Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
SessionUpdatesList<V> sessionUpdates = entry.getValue();
SessionEntityWrapper<V> sessionWrapper = sessionUpdates.getEntityWrapper();
RealmModel realm = sessionUpdates.getRealm();
@ -146,7 +184,7 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
}
private void mergeClientSession(SessionEntityWrapper<V> sessionWrapper, UserSessionPersisterProvider userSessionPersister, RealmModel realm, SessionUpdatesList<V> sessionUpdates) {
private static <K, V extends SessionEntity> void mergeClientSession(SessionEntityWrapper<V> sessionWrapper, UserSessionPersisterProvider userSessionPersister, RealmModel realm, SessionUpdatesList<V> sessionUpdates) {
AuthenticatedClientSessionEntity entity = (AuthenticatedClientSessionEntity) sessionWrapper.getEntity();
ClientModel client = new ClientModelLazyDelegate(null) {
@Override
@ -398,7 +436,7 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
}, entity.isOffline());
}
private void processUserSessionUpdate(KeycloakSession session, Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
private static <K, V extends SessionEntity> void processUserSessionUpdate(KeycloakSession session, Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
SessionUpdatesList<V> sessionUpdates = entry.getValue();
SessionEntityWrapper<V> sessionWrapper = sessionUpdates.getEntityWrapper();
RealmModel realm = sessionUpdates.getRealm();
@ -567,7 +605,7 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
}, entity.isOffline());
}
private void mergeUserSession(KeycloakSession innerSession, Map.Entry<K, SessionUpdatesList<V>> entry, PersistentUserSessionAdapter userSessionModel, RealmModel realm, SessionUpdatesList<V> sessionUpdates, UserSessionPersisterProvider userSessionPersister, UserSessionEntity entity) {
private static <K, V extends SessionEntity> void mergeUserSession(KeycloakSession innerSession, Map.Entry<K, SessionUpdatesList<V>> entry, PersistentUserSessionAdapter userSessionModel, RealmModel realm, SessionUpdatesList<V> sessionUpdates, UserSessionPersisterProvider userSessionPersister, UserSessionEntity entity) {
UserSessionEntity userSessionEntity = new UserSessionEntity(userSessionModel.getId()) {
@Override
public Map<String, String> getNotes() {

View File

@ -18,98 +18,63 @@
package org.keycloak.models.sessions.infinispan.changes;
import org.infinispan.Cache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Retry;
import org.keycloak.models.AbstractKeycloakTransaction;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.sessions.infinispan.SessionFunction;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.transaction.DatabaseUpdate;
import org.keycloak.models.sessions.infinispan.transaction.NonBlockingTransaction;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
import org.keycloak.models.utils.KeycloakModelUtils;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEntity> extends AbstractKeycloakTransaction implements SessionsChangelogBasedTransaction<K, V> {
abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEntity> implements SessionsChangelogBasedTransaction<K, V>, NonBlockingTransaction {
private static final Logger LOG = Logger.getLogger(PersistentSessionsChangelogBasedTransaction.class);
protected final KeycloakSession kcSession;
protected final Map<K, SessionUpdatesList<V>> updates = new HashMap<>();
protected final Map<K, SessionUpdatesList<V>> offlineUpdates = new HashMap<>();
private final String cacheName;
private final Cache<K, SessionEntityWrapper<V>> cache;
private final Cache<K, SessionEntityWrapper<V>> offlineCache;
private final SessionFunction<V> lifespanMsLoader;
private final SessionFunction<V> maxIdleTimeMsLoader;
private final SessionFunction<V> offlineLifespanMsLoader;
private final SessionFunction<V> offlineMaxIdleTimeMsLoader;
private final ArrayBlockingQueue<PersistentUpdate> batchingQueue;
private final SerializeExecutionsByKey<K> serializerOnline;
private final SerializeExecutionsByKey<K> serializerOffline;
private final CacheHolder<K, V> cacheHolder;
private final CacheHolder<K, V> offlineCacheHolder;
public PersistentSessionsChangelogBasedTransaction(KeycloakSession session,
String cacheName,
Cache<K, SessionEntityWrapper<V>> cache,
Cache<K, SessionEntityWrapper<V>> offlineCache,
SessionFunction<V> lifespanMsLoader,
SessionFunction<V> maxIdleTimeMsLoader,
SessionFunction<V> offlineLifespanMsLoader,
SessionFunction<V> offlineMaxIdleTimeMsLoader,
ArrayBlockingQueue<PersistentUpdate> batchingQueue,
SerializeExecutionsByKey<K> serializerOnline,
SerializeExecutionsByKey<K> serializerOffline) {
CacheHolder<K, V> cacheHolder,
CacheHolder<K, V> offlineCacheHolder) {
kcSession = session;
this.cacheName = cacheName;
this.cache = cache;
this.offlineCache = offlineCache;
this.lifespanMsLoader = lifespanMsLoader;
this.maxIdleTimeMsLoader = maxIdleTimeMsLoader;
this.offlineLifespanMsLoader = offlineLifespanMsLoader;
this.offlineMaxIdleTimeMsLoader = offlineMaxIdleTimeMsLoader;
this.batchingQueue = batchingQueue;
this.serializerOnline = serializerOnline;
this.serializerOffline = serializerOffline;
this.cacheHolder = cacheHolder;
this.offlineCacheHolder = offlineCacheHolder;
}
protected Cache<K, SessionEntityWrapper<V>> getCache(boolean offline) {
if (offline) {
return offlineCache;
} else {
return cache;
}
public Cache<K, SessionEntityWrapper<V>> getCache(boolean offline) {
return offline ? offlineCacheHolder.cache() : cacheHolder.cache();
}
protected SessionFunction<V> getLifespanMsLoader(boolean offline) {
if (offline) {
return offlineLifespanMsLoader;
} else {
return lifespanMsLoader;
}
return offline ? offlineCacheHolder.lifespanFunction() : cacheHolder.lifespanFunction();
}
protected SessionFunction<V> getMaxIdleMsLoader(boolean offline) {
if (offline) {
return offlineMaxIdleTimeMsLoader;
} else {
return maxIdleTimeMsLoader;
}
return offline ? offlineCacheHolder.maxIdleFunction() : cacheHolder.maxIdleFunction();
}
protected Map<K, SessionUpdatesList<V>> getUpdates(boolean offline) {
if (offline) {
return offlineUpdates;
} else {
return updates;
}
return offline ? offlineUpdates : updates;
}
public SessionEntityWrapper<V> get(K key, boolean offline) {
@ -137,52 +102,9 @@ abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends S
}
}
List<SessionChangesPerformer<K, V>> prepareChangesPerformers() {
List<SessionChangesPerformer<K, V>> changesPerformers = new LinkedList<>();
if (batchingQueue != null) {
changesPerformers.add(new JpaChangesPerformer<>(cacheName, batchingQueue));
} else {
changesPerformers.add(new JpaChangesPerformer<>(cacheName, null) {
@Override
public void applyChanges() {
Retry.executeWithBackoff(
iteration -> KeycloakModelUtils.runJobInTransaction(kcSession.getKeycloakSessionFactory(), super::applyChangesSynchronously),
(iteration, t) -> {
if (iteration > 20) {
// Never retry more than 20 times.
throw new RuntimeException("Maximum number of retries reached", t);
}
}, PersistentSessionsWorker.UPDATE_TIMEOUT, PersistentSessionsWorker.UPDATE_BASE_INTERVAL_MILLIS);
clear();
}
});
}
if (cache != null) {
changesPerformers.add(new EmbeddedCachesChangesPerformer<>(cache, serializerOnline) {
@Override
public boolean shouldConsumeChange(V entity) {
return !entity.isOffline();
}
});
}
if (offlineCache != null) {
changesPerformers.add(new EmbeddedCachesChangesPerformer<>(offlineCache, serializerOffline) {
@Override
public boolean shouldConsumeChange(V entity) {
return entity.isOffline();
}
});
}
return changesPerformers;
}
@Override
protected void commitImpl() {
List<SessionChangesPerformer<K, V>> changesPerformers = null;
public void asyncCommit(AggregateCompletionStage<Void> stage, Consumer<DatabaseUpdate> databaseUpdates) {
JpaChangesPerformer<K, V> persister = null;
for (Map.Entry<K, SessionUpdatesList<V>> entry : Stream.concat(updates.entrySet().stream(), offlineUpdates.entrySet().stream()).toList()) {
SessionUpdatesList<V> sessionUpdates = entry.getValue();
if (sessionUpdates.getUpdateTasks().isEmpty()) {
@ -203,18 +125,33 @@ abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends S
MergedUpdate<V> merged = MergedUpdate.computeUpdate(sessionUpdates.getUpdateTasks(), sessionWrapper, lifespanMs, maxIdleTimeMs);
if (merged != null) {
if (changesPerformers == null) {
changesPerformers = prepareChangesPerformers();
var c = isOffline ? offlineCacheHolder : cacheHolder;
if (c.cache() != null) {
// Update cache. It is non-blocking.
InfinispanChangesUtils.runOperationInCluster(c, entry.getKey(), merged, entry.getValue().getEntityWrapper(), stage, LOG);
}
if (persister == null) {
persister =new JpaChangesPerformer<>(cacheName, batchingQueue);
if (!persister.isNonBlocking()) {
databaseUpdates.accept(persister::write);
}
}
if (persister.isNonBlocking()) {
// batching enabled, another thread will commit the changes.
persister.asyncWrite(stage, entry, merged);
} else {
// batching disabled, we queue, and we will execute the update later.
persister.registerChange(entry, merged);
}
changesPerformers.stream()
.filter(performer -> performer.shouldConsumeChange(entity))
.forEach(p -> p.registerChange(entry, merged));
}
}
}
if (changesPerformers != null) {
changesPerformers.forEach(SessionChangesPerformer::applyChanges);
}
@Override
public void asyncRollback(AggregateCompletionStage<Void> stage) {
updates.clear();
offlineUpdates.clear();
}
@Override
@ -285,11 +222,6 @@ abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends S
getUpdates(entity.getEntity().isOffline()).put(key, newUpdates);
}
@Override
protected void rollbackImpl() {
}
/**
* Imports a session from an external source into the {@link Cache}.
* <p>

View File

@ -25,7 +25,6 @@ import org.jboss.logging.Logger;
import org.keycloak.common.util.Retry;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.ModelDuplicateException;
import org.keycloak.models.ModelIllegalStateException;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.tracing.TracingProvider;

View File

@ -25,7 +25,6 @@ import org.keycloak.models.UserSessionModel;
import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.sessions.infinispan.PersistentUserSessionProvider;
import org.keycloak.models.sessions.infinispan.SessionFunction;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
@ -39,16 +38,10 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe
private static final Logger LOG = Logger.getLogger(UserSessionPersistentChangelogBasedTransaction.class);
public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session,
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache,
Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineCache,
SessionFunction<UserSessionEntity> lifespanMsLoader,
SessionFunction<UserSessionEntity> maxIdleTimeMsLoader,
SessionFunction<UserSessionEntity> offlineLifespanMsLoader,
SessionFunction<UserSessionEntity> offlineMaxIdleTimeMsLoader,
ArrayBlockingQueue<PersistentUpdate> batchingQueue,
SerializeExecutionsByKey<String> serializerOnline,
SerializeExecutionsByKey<String> serializerOffline) {
super(session, USER_SESSION_CACHE_NAME, cache, offlineCache, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue, serializerOnline, serializerOffline);
CacheHolder<String, UserSessionEntity> cacheHolder,
CacheHolder<String, UserSessionEntity> offlineCacheHolder) {
super(session, USER_SESSION_CACHE_NAME, batchingQueue, cacheHolder, offlineCacheHolder);
}
public SessionEntityWrapper<UserSessionEntity> get(RealmModel realm, String key, UserSessionModel userSession, boolean offline) {

View File

@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.MetadataValue;
@ -35,6 +36,7 @@ import org.keycloak.models.sessions.infinispan.changes.remote.remover.Conditiona
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Expiration;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.transaction.DatabaseUpdate;
import org.keycloak.models.sessions.infinispan.transaction.NonBlockingTransaction;
/**
@ -62,7 +64,7 @@ public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>, R extends
}
@Override
public void asyncCommit(AggregateCompletionStage<Void> stage) {
public void asyncCommit(AggregateCompletionStage<Void> stage, Consumer<DatabaseUpdate> databaseUpdates) {
conditionalRemover.executeRemovals(getCache(), stage);
for (var updater : entityChanges.values()) {

View File

@ -24,12 +24,14 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.jboss.logging.Logger;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
import org.keycloak.models.sessions.infinispan.transaction.DatabaseUpdate;
import org.keycloak.models.sessions.infinispan.transaction.NonBlockingTransaction;
class RemoteInfinispanKeycloakTransaction<K, V, R extends ConditionalRemover<K, V>> implements NonBlockingTransaction {
@ -46,7 +48,7 @@ class RemoteInfinispanKeycloakTransaction<K, V, R extends ConditionalRemover<K,
}
@Override
public void asyncCommit(AggregateCompletionStage<Void> stage) {
public void asyncCommit(AggregateCompletionStage<Void> stage, Consumer<DatabaseUpdate> databaseUpdates) {
conditionalRemover.executeRemovals(cache, stage);
tasks.values().stream()
.filter(this::shouldCommitOperation)

View File

@ -17,8 +17,11 @@
package org.keycloak.models.sessions.infinispan.remote.transaction;
import java.util.function.Consumer;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.keycloak.models.KeycloakTransaction;
import org.keycloak.models.sessions.infinispan.transaction.DatabaseUpdate;
import org.keycloak.models.sessions.infinispan.transaction.NonBlockingTransaction;
/**
@ -42,11 +45,11 @@ public class UserSessionTransaction implements NonBlockingTransaction {
}
@Override
public void asyncCommit(AggregateCompletionStage<Void> stage) {
userSessions.asyncCommit(stage);
clientSessions.asyncCommit(stage);
offlineUserSessions.asyncCommit(stage);
offlineClientSessions.asyncCommit(stage);
public void asyncCommit(AggregateCompletionStage<Void> stage, Consumer<DatabaseUpdate> databaseUpdates) {
userSessions.asyncCommit(stage, databaseUpdates);
clientSessions.asyncCommit(stage, databaseUpdates);
offlineUserSessions.asyncCommit(stage, databaseUpdates);
offlineClientSessions.asyncCommit(stage, databaseUpdates);
}
@Override

View File

@ -1,5 +1,5 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* Copyright 2025 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -15,18 +15,21 @@
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes;
package org.keycloak.models.sessions.infinispan.transaction;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.KeycloakSession;
import java.util.Map;
/**
* A pending (blocking) database update.
*/
@FunctionalInterface
public interface DatabaseUpdate {
public interface SessionChangesPerformer<K, V extends SessionEntity> {
default boolean shouldConsumeChange(V entity) {
return true;
}
/**
* Write the database changes using the {@code session} provided.
*
* @param session The {@link KeycloakSession}.
*/
void write(KeycloakSession session);
void registerChange(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged);
void applyChanges();
}

View File

@ -20,11 +20,17 @@ package org.keycloak.models.sessions.infinispan.transaction;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.keycloak.common.util.Retry;
import org.keycloak.models.AbstractKeycloakTransaction;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionTask;
import org.keycloak.models.KeycloakTransaction;
import org.keycloak.models.sessions.infinispan.changes.PersistentSessionsWorker;
import org.keycloak.models.utils.KeycloakModelUtils;
/**
* A {@link KeycloakTransaction} that collects {@link NonBlockingTransaction} to commit/rollback in a non-blocking
@ -35,6 +41,11 @@ import org.keycloak.models.KeycloakTransaction;
public class DefaultInfinispanTransactionProvider extends AbstractKeycloakTransaction implements InfinispanTransactionProvider {
private final List<NonBlockingTransaction> transactionList = new ArrayList<>(4);
private final KeycloakSession session;
public DefaultInfinispanTransactionProvider(KeycloakSession session) {
this.session = session;
}
@Override
public void registerTransaction(NonBlockingTransaction transaction) {
@ -49,7 +60,16 @@ public class DefaultInfinispanTransactionProvider extends AbstractKeycloakTransa
@Override
protected void commitImpl() {
final AggregateCompletionStage<Void> stage = CompletionStages.aggregateCompletionStage();
transactionList.forEach(transaction -> transaction.asyncCommit(stage));
final DatabaseWrites databaseWrites = new DatabaseWrites();
// sends all the cache requests and queues any pending database writes.
transactionList.forEach(transaction -> transaction.asyncCommit(stage, databaseWrites));
// all the cache requests has been sent
// apply the database changes in a blocking fashion, and in a single transaction.
commitDatabaseUpdates(databaseWrites);
// finally, wait for the completion of the cache updates.
CompletionStages.join(stage.freeze());
}
@ -59,4 +79,41 @@ public class DefaultInfinispanTransactionProvider extends AbstractKeycloakTransa
transactionList.forEach(transaction -> transaction.asyncRollback(stage));
CompletionStages.join(stage.freeze());
}
private void commitDatabaseUpdates(DatabaseWrites databaseWrites) {
if (databaseWrites.isEmpty()) {
return;
}
Retry.executeWithBackoff(
iteration -> KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(), databaseWrites),
(iteration, t) -> {
if (iteration > 20) {
// never retry more than 20 times
throw new RuntimeException("Maximum number of retries reached", t);
}
}, PersistentSessionsWorker.UPDATE_TIMEOUT, PersistentSessionsWorker.UPDATE_BASE_INTERVAL_MILLIS);
}
private static class DatabaseWrites implements KeycloakSessionTask, Consumer<DatabaseUpdate> {
private final List<DatabaseUpdate> databaseUpdateList = new ArrayList<>(2);
boolean isEmpty() {
return databaseUpdateList.isEmpty();
}
@Override
public void run(KeycloakSession session) {
databaseUpdateList.forEach(update -> update.write(session));
}
@Override
public void accept(DatabaseUpdate databaseUpdate) {
databaseUpdateList.add(databaseUpdate);
}
@Override
public String getTaskName() {
return "Database Update";
}
}
}

View File

@ -27,7 +27,7 @@ public class DefaultInfinispanTransactionProviderFactory implements InfinispanTr
@Override
public InfinispanTransactionProvider create(KeycloakSession session) {
var provider = new DefaultInfinispanTransactionProvider();
var provider = new DefaultInfinispanTransactionProvider(session);
session.getTransactionManager().enlistAfterCompletion(provider);
return provider;
}

View File

@ -18,6 +18,7 @@
package org.keycloak.models.sessions.infinispan.transaction;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
@ -35,10 +36,14 @@ public interface NonBlockingTransaction {
* <p>
* The implementation should not block the thread and add any (or none) {@link CompletionStage} into the
* {@code stage}.
* <p>
* Any blocking operation should be consumed by the {@code databaseUpdates}. It will be executed at a later
* instant.
*
* @param stage The {@link AggregateCompletionStage} to collect the {@link CompletionStage}.
* @param stage The {@link AggregateCompletionStage} to collect the {@link CompletionStage}.
* @param databaseUpdates The {@link Consumer} to use for blocking/database updates.
*/
void asyncCommit(AggregateCompletionStage<Void> stage);
void asyncCommit(AggregateCompletionStage<Void> stage, Consumer<DatabaseUpdate> databaseUpdates);
/**
* Asynchronously rollbacks the transaction.

View File

@ -325,19 +325,10 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
clientSessionQuery.setParameter("userSessionId", userSessionId);
clientSessionQuery.setParameter("offline", offlineStr);
Set<String> removedClientUUIDs = new HashSet<>();
closing(clientSessionQuery.getResultStream()).forEach(clientSession -> {
boolean added = addClientSessionToAuthenticatedClientSessionsIfPresent(userSession, clientSession);
if (!added) {
// client was removed in the meantime
removedClientUUIDs.add(clientSession.getClientId());
}
}
closing(clientSessionQuery.getResultStream()).forEach(clientSession ->
addClientSessionToAuthenticatedClientSessionsIfPresent(userSession, clientSession)
);
removedClientUUIDs.forEach(this::onClientRemoved);
return userSession;
}