Make sure LDAP sync runs in a single cluster node and respecting the configured period

Closes #43752

Signed-off-by: Pedro Igor <pigor.craveiro@gmail.com>
Signed-off-by: Alexander Schwartz <alexander.schwartz@ibm.com>
Co-authored-by: Alexander Schwartz <alexander.schwartz@ibm.com>
This commit is contained in:
Pedro Igor 2025-11-27 04:08:20 -03:00 committed by GitHub
parent 7167262909
commit 96aea99d6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 617 additions and 493 deletions

View File

@ -98,8 +98,8 @@ import org.keycloak.models.sessions.infinispan.stream.SessionUnwrapMapper;
import org.keycloak.models.sessions.infinispan.stream.SessionWrapperPredicate;
import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
import org.keycloak.sessions.CommonClientSessionModel;
import org.keycloak.storage.UserStorageProviderClusterEvent;
import org.keycloak.storage.UserStorageProviderModel;
import org.keycloak.storage.managers.UserStorageSyncManager;
import org.infinispan.protostream.FileDescriptorSource;
import org.infinispan.protostream.GeneratedSchema;
@ -126,7 +126,7 @@ import org.infinispan.protostream.types.java.CommonTypes;
CommonClientSessionModel.ExecutionStatus.class,
ComponentModel.MultiMapEntry.class,
UserStorageProviderModel.class,
UserStorageSyncManager.UserStorageProviderClusterEvent.class,
UserStorageProviderClusterEvent.class,
// clustering.infinispan package
LockEntry.class,

View File

@ -67,15 +67,15 @@ public final class Marshalling {
}
// Model
// see org.keycloak.models.UserSessionModel.State
/** see {@link org.keycloak.models.UserSessionModel.State} */
public static final int USER_STATE_ENUM = 65536;
// see org.keycloak.sessions.CommonClientSessionModel.ExecutionStatus
/** see {@link org.keycloak.sessions.CommonClientSessionModel.ExecutionStatus} */
public static final int CLIENT_SESSION_EXECUTION_STATUS = 65537;
// see org.keycloak.component.ComponentModel.MultiMapEntry
/** see {@link org.keycloak.component.ComponentModel.MultiMapEntry} */
public static final int MULTIMAP_ENTRY = 65538;
// see org.keycloak.storage.UserStorageProviderModel
/** see {@link org.keycloak.storage.UserStorageProviderModel} */
public static final int USER_STORAGE_PROVIDER_MODES = 65539;
// see org.keycloak.storage.managers.UserStorageSyncManager.UserStorageProviderClusterEvent
/** see {@link org.keycloak.storage.UserStorageProviderClusterEvent} */
public static final int USER_STORAGE_PROVIDER_CLUSTER_EVENT = 65540;
// clustering.infinispan package

View File

@ -0,0 +1,153 @@
package org.keycloak.storage;
import java.util.stream.Stream;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.RealmModel;
import org.keycloak.models.StorageProviderRealmModel;
import org.keycloak.models.utils.PostMigrationEvent;
import org.keycloak.provider.ProviderEvent;
import org.keycloak.provider.ProviderEventListener;
import org.keycloak.storage.UserStorageProviderModel.SyncMode;
import org.keycloak.storage.user.ImportSynchronization;
import org.jboss.logging.Logger;
import static org.keycloak.models.utils.KeycloakModelUtils.runJobInTransaction;
public final class UserStorageEventListener implements ClusterListener, ProviderEventListener {
private static final Logger logger = Logger.getLogger(UserStorageEventListener.class);
private static final String USER_STORAGE_TASK_KEY = "user-storage";
private final KeycloakSessionFactory sessionFactory;
public UserStorageEventListener(KeycloakSessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
@Override
public void eventReceived(ClusterEvent event) {
UserStorageProviderClusterEvent fedEvent = (UserStorageProviderClusterEvent) event;
String realmId = fedEvent.getRealmId();
runJobInTransaction(sessionFactory, session -> {
RealmModel realm = session.realms().getRealm(realmId);
if (realm == null) {
throw new RuntimeException("Failed to execute session task. Realm with id " + realmId + " not found.");
}
session.getContext().setRealm(realm);
refreshScheduledTasks(session, fedEvent.getStorageProvider(), fedEvent.isRemoved());
});
}
@Override
public void onEvent(ProviderEvent event) {
if (event instanceof PostMigrationEvent) {
runJobInTransaction(sessionFactory, session -> {
session.realms().getRealmsWithProviderTypeStream(UserStorageProvider.class)
.forEach(realm -> {
try {
session.getContext().setRealm(realm);
getUserStorageProvidersStream(realm).forEachOrdered(provider -> reScheduleTasks(session, provider));
} finally {
session.getContext().setRealm(null);
}
});
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
if (clusterProvider != null) {
clusterProvider.registerListener(USER_STORAGE_TASK_KEY, this);
}
});
} else if (event instanceof StoreSyncEvent ev) {
UserStorageProviderModel model = ev.getModel() == null ? null: new UserStorageProviderModel(ev.getModel());
KeycloakSession session = ev.getSession();
boolean removed = ev.getRemoved();
RealmModel contextRealm = session.getContext().getRealm();
RealmModel realm = ev.getRealm();
try {
session.getContext().setRealm(realm);
if (model != null) {
refreshScheduledTasks(session, model, removed);
notifyStoreSyncClusterUpdate(session, realm, model, removed);
} else {
getUserStorageProvidersStream(realm).forEachOrdered(fedProvider -> {
refreshScheduledTasks(session, fedProvider, removed);
notifyStoreSyncClusterUpdate(session, realm, fedProvider, removed);
});
}
} finally {
session.getContext().setRealm(contextRealm);
}
}
}
private void reScheduleTasks(KeycloakSession session, UserStorageProviderModel provider) {
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
UserStorageProviderFactory<?> factory = (UserStorageProviderFactory<?>) sessionFactory.getProviderFactory(UserStorageProvider.class, provider.getProviderId());
RealmModel realm = session.getContext().getRealm();
if (!(factory instanceof ImportSynchronization)) {
logger.debugf("Not refreshing periodic sync settings for provider '%s' in realm '%s'", provider.getName(), realm.getName());
return;
}
logger.debugf("Going to refresh periodic sync settings for provider '%s' in realm '%s' with realmId '%s'. Full sync period: %d , changed users sync period: %d",
provider.getName(), realm.getName(), realm.getId(), provider.getFullSyncPeriod(), provider.getChangedSyncPeriod());
scheduleTask(session, provider, SyncMode.FULL);
scheduleTask(session, provider, SyncMode.CHANGED);
}
private void scheduleTask(KeycloakSession session, UserStorageProviderModel provider, SyncMode mode) {
UserStorageSyncTask task = new UserStorageSyncTask(provider, mode);
if (!task.schedule(session)) {
// cancel potentially dangling task
task.cancel(session);
}
}
// Ensure all cluster nodes are notified
private void notifyStoreSyncClusterUpdate(KeycloakSession session, RealmModel realm, UserStorageProviderModel provider, boolean removed) {
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
UserStorageProviderFactory<?> factory = (UserStorageProviderFactory<?>) sessionFactory.getProviderFactory(UserStorageProvider.class, provider.getProviderId());
if (!(factory instanceof ImportSynchronization)) {
return;
}
ClusterProvider cp = session.getProvider(ClusterProvider.class);
if (cp != null) {
UserStorageProviderClusterEvent event = UserStorageProviderClusterEvent.createEvent(removed, realm.getId(), provider);
cp.notify(USER_STORAGE_TASK_KEY, event, true);
}
}
private void refreshScheduledTasks(KeycloakSession session, UserStorageProviderModel model, boolean removed) {
if (removed) {
new UserStorageSyncTask(model, SyncMode.FULL).cancel(session);
new UserStorageSyncTask(model, SyncMode.CHANGED).cancel(session);
} else {
reScheduleTasks(session, model);
}
}
private Stream<UserStorageProviderModel> getUserStorageProvidersStream(RealmModel realm) {
if (realm instanceof StorageProviderRealmModel s) {
return s.getUserStorageProvidersStream();
}
return Stream.empty();
}
}

View File

@ -83,7 +83,6 @@ import io.opentelemetry.api.trace.StatusCode;
import org.jboss.logging.Logger;
import static org.keycloak.models.utils.KeycloakModelUtils.runJobInTransaction;
import static org.keycloak.storage.managers.UserStorageSyncManager.notifyToRefreshPeriodicSync;
import static org.keycloak.utils.StreamsUtil.distinctByKey;
import static org.keycloak.utils.StreamsUtil.paginatedStream;
@ -996,7 +995,7 @@ public class UserStorageManager extends AbstractStorageManager<UserStorageProvid
if (!component.getProviderType().equals(UserStorageProvider.class.getName())) return;
localStorage().preRemove(realm, component);
if (getFederatedStorage() != null) getFederatedStorage().preRemove(realm, component);
notifyToRefreshPeriodicSync(session, realm, new UserStorageProviderModel(component), true);
StoreSyncEvent.fire(session, realm, component, true);
}
@Override
@ -1024,7 +1023,7 @@ public class UserStorageManager extends AbstractStorageManager<UserStorageProvid
session.getTransactionManager().enlistAfterCompletion(new AbstractKeycloakTransaction() {
@Override
protected void commitImpl() {
notifyToRefreshPeriodicSync(session, realm, new UserStorageProviderModel(model), false);
StoreSyncEvent.fire(session, realm, model, false);
}
@Override
@ -1046,7 +1045,7 @@ public class UserStorageManager extends AbstractStorageManager<UserStorageProvid
UserStorageProviderModel actual= new UserStorageProviderModel(newModel);
if (isSyncSettingsUpdated(previous, actual)) {
notifyToRefreshPeriodicSync(session, realm, actual, false);
StoreSyncEvent.fire(session, realm, actual, false);
}
}

View File

@ -18,14 +18,36 @@
package org.keycloak.storage;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserProvider;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.storage.UserStorageProviderModel.SyncMode;
import org.keycloak.storage.datastore.DefaultDatastoreProvider;
import org.keycloak.storage.user.SynchronizationResult;
/**
* @author Alexander Schwartz
*/
public class UserStoragePrivateUtil {
public static UserProvider userLocalStorage(KeycloakSession session) {
return ((DefaultDatastoreProvider) session.getProvider(DatastoreProvider.class)).userLocalStorage();
}
public static SynchronizationResult runFullSync(KeycloakSessionFactory sessionFactory, UserStorageProviderModel provider) {
return KeycloakModelUtils.runJobInTransactionWithResult(sessionFactory, session -> {
RealmModel realm = session.realms().getRealm(provider.getParentId());
session.getContext().setRealm(realm);
return new UserStorageSyncTask(provider, SyncMode.FULL).runWithResult(session);
});
}
public static SynchronizationResult runPeriodicSync(KeycloakSessionFactory sessionFactory, UserStorageProviderModel provider) {
return KeycloakModelUtils.runJobInTransactionWithResult(sessionFactory, session -> {
RealmModel realm = session.realms().getRealm(provider.getParentId());
session.getContext().setRealm(realm);
return new UserStorageSyncTask(provider, SyncMode.CHANGED).runWithResult(session);
});
}
}

View File

@ -0,0 +1,65 @@
package org.keycloak.storage;
import java.util.Objects;
import org.keycloak.cluster.ClusterEvent;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.annotations.ProtoTypeId;
// Send to cluster during each update or remove of federationProvider, so all nodes can update sync periods
@ProtoTypeId(65540)
public class UserStorageProviderClusterEvent implements ClusterEvent {
private boolean removed;
private String realmId;
private UserStorageProviderModel storageProvider;
@ProtoField(1)
public boolean isRemoved() {
return removed;
}
public void setRemoved(boolean removed) {
this.removed = removed;
}
@ProtoField(2)
public String getRealmId() {
return realmId;
}
public void setRealmId(String realmId) {
this.realmId = realmId;
}
@ProtoField(3)
public UserStorageProviderModel getStorageProvider() {
return storageProvider;
}
public void setStorageProvider(UserStorageProviderModel federationProvider) {
this.storageProvider = federationProvider;
}
public static UserStorageProviderClusterEvent createEvent(boolean removed, String realmId, UserStorageProviderModel provider) {
UserStorageProviderClusterEvent notification = new UserStorageProviderClusterEvent();
notification.setRemoved(removed);
notification.setRealmId(realmId);
notification.setStorageProvider(provider);
return notification;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UserStorageProviderClusterEvent that = (UserStorageProviderClusterEvent) o;
return removed == that.removed && Objects.equals(realmId, that.realmId) && Objects.equals(storageProvider.getId(), that.storageProvider.getId());
}
@Override
public int hashCode() {
return Objects.hash(removed, realmId, storageProvider.getId());
}
}

View File

@ -0,0 +1,220 @@
package org.keycloak.storage;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ExecutionResult;
import org.keycloak.common.util.Time;
import org.keycloak.common.util.TriFunction;
import org.keycloak.component.ComponentModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.ModelIllegalStateException;
import org.keycloak.models.RealmModel;
import org.keycloak.storage.UserStorageProviderModel.SyncMode;
import org.keycloak.storage.user.ImportSynchronization;
import org.keycloak.storage.user.SynchronizationResult;
import org.keycloak.timer.ScheduledTask;
import org.keycloak.timer.TimerProvider;
import org.keycloak.timer.TimerProvider.TimerTaskContext;
import org.jboss.logging.Logger;
final class UserStorageSyncTask implements ScheduledTask {
private static final Logger logger = Logger.getLogger(UserStorageSyncTask.class);
private static final int TASK_EXECUTION_TIMEOUT = 30;
private final String providerId;
private final String realmId;
private final SyncMode syncMode;
private final int period;
UserStorageSyncTask(UserStorageProviderModel provider, SyncMode syncMode) {
this.providerId = provider.getId();
this.realmId = provider.getParentId();
this.syncMode = syncMode;
this.period = SyncMode.FULL.equals(syncMode) ? provider.getFullSyncPeriod() : provider.getChangedSyncPeriod();
}
@Override
public void run(KeycloakSession session) {
RealmModel realm = session.realms().getRealm(realmId);
session.getContext().setRealm(realm);
runWithResult(session);
}
@Override
public String getTaskName() {
return UserStorageSyncTask.class.getSimpleName() + "-" + providerId + "-" + syncMode;
}
SynchronizationResult runWithResult(KeycloakSession session) {
try {
UserStorageProviderModel provider = getStorageModel(session);
if (isSyncPeriod(provider)) {
return switch (syncMode) {
case FULL -> runFullSync(session);
case CHANGED -> runIncrementalSync(session);
};
}
logger.debugf("Ignored LDAP %s users-sync with storage provider %s due small time since last sync in realm %s", //
syncMode, provider.getName(), realmId);
return SynchronizationResult.ignored();
} catch (Throwable t) {
logger.errorf(t, "Error occurred during %s users-sync in realm %s and user provider %s", syncMode, realmId, providerId);
}
return SynchronizationResult.empty();
}
boolean schedule(KeycloakSession session) {
UserStorageProviderModel provider = getStorageModel(session);
if (isSchedulable(provider)) {
TimerProvider timer = session.getProvider(TimerProvider.class);
if (timer == null) {
logger.debugf("Timer provider not available. Not scheduling periodic sync task for provider '%s' in realm '%s'", provider.getName(), realmId);
return false;
}
logger.debugf("Scheduling user periodic sync task '%s' for user storage provider '%s' in realm '%s' with period %d seconds", getTaskName(), provider.getName(), realmId, period);
timer.scheduleTask(this, period * 1000L);
return true;
}
logger.debugf("Not scheduling periodic sync settings for provider '%s' in realm '%s'", provider.getName(), realmId);
return false;
}
void cancel(KeycloakSession session) {
TimerProvider timer = session.getProvider(TimerProvider.class);
if (timer == null) {
logger.debugf("Timer provider not available. Not cancelling periodic sync task for provider id '%s' in realm '%s'", providerId, realmId);
return;
}
UserStorageProviderModel provider = getStorageModel(session);
logger.debugf("Cancelling any running user periodic sync task '%s' for user storage provider provider '%s' in realm '%s'", getTaskName(), provider.getName(), realmId);
TimerTaskContext existingTask = timer.cancelTask(getTaskName());
if (existingTask != null) {
logger.debugf("Cancelled periodic sync task with task-name '%s' for provider with id '%s' and name '%s'",
getTaskName(), provider.getId(), provider.getName());
}
}
private UserStorageProviderModel getStorageModel(KeycloakSession session) {
RealmModel realm = session.getContext().getRealm();
if (realm == null) {
throw new ModelIllegalStateException("Realm with id " + realmId + " not found");
}
ComponentModel component = realm.getComponent(providerId);
if (component == null) {
cancel(session);
throw new ModelIllegalStateException("User storage provider with id " + providerId + " not found in realm " + realm.getName());
}
return new UserStorageProviderModel(component);
}
private SynchronizationResult runFullSync(KeycloakSession session) {
return runSync(session,
(sf, storage, model) -> storage.sync(sf, realmId, model));
}
private SynchronizationResult runIncrementalSync(KeycloakSession session) {
return runSync(session, (sf, storage, model) -> {
// See when we did last sync.
int oldLastSync = model.getLastSync();
return storage.syncSince(Time.toDate(oldLastSync), sf, realmId, model);
});
}
private SynchronizationResult runSync(KeycloakSession session, TriFunction<KeycloakSessionFactory, ImportSynchronization, UserStorageProviderModel, SynchronizationResult> syncFunction) {
UserStorageProviderModel provider = getStorageModel(session);
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
ImportSynchronization factory = getProviderFactory(session, provider);
if (factory == null) {
return SynchronizationResult.ignored();
}
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
// shared key for "full" and "changed" . Improve if needed
String taskKey = provider.getId() + "::sync";
// 30 seconds minimal timeout for now
int timeout = Math.max(TASK_EXECUTION_TIMEOUT, period);
ExecutionResult<SynchronizationResult> task = clusterProvider.executeIfNotExecuted(taskKey, timeout, () -> {
// Need to load component again in this transaction for updated data
SynchronizationResult result = syncFunction.apply(sessionFactory, factory, provider);
if (!result.isIgnored()) {
updateLastSyncInterval(session);
}
return result;
});
SynchronizationResult result = task.getResult();
if (result == null || !task.isExecuted()) {
logger.debugf("syncing users for federation provider %s was ignored as it's already in progress", provider.getName());
return SynchronizationResult.ignored();
}
return result;
}
private ImportSynchronization getProviderFactory(KeycloakSession session, UserStorageProviderModel provider) {
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
UserStorageProviderFactory<?> factory = (UserStorageProviderFactory<?>) sessionFactory.getProviderFactory(UserStorageProvider.class, provider.getProviderId());
if (factory instanceof ImportSynchronization f) {
return f;
}
return null;
}
// Update interval of last sync for given UserFederationProviderModel. Do it in separate transaction
private void updateLastSyncInterval(KeycloakSession session) {
UserStorageProviderModel provider = getStorageModel(session);
// Update persistent provider in DB
provider.setLastSync(Time.currentTime(), syncMode);
RealmModel realm = session.getContext().getRealm();
realm.updateComponent(provider);
}
// Skip syncing if there is short time since last sync time.
private boolean isSyncPeriod(UserStorageProviderModel provider) {
int lastSyncTime = provider.getLastSync(syncMode);
if (lastSyncTime <= 0) {
return true;
}
int currentTime = Time.currentTime();
int timeSinceLastSync = currentTime - lastSyncTime;
return timeSinceLastSync > period;
}
private boolean isSchedulable(UserStorageProviderModel provider) {
return provider.isImportEnabled() && provider.isEnabled() && period > 0;
}
}

View File

@ -39,8 +39,7 @@ import org.keycloak.services.scheduled.ClusterAwareScheduledTaskRunner;
import org.keycloak.storage.DatastoreProvider;
import org.keycloak.storage.DatastoreProviderFactory;
import org.keycloak.storage.StoreMigrateRepresentationEvent;
import org.keycloak.storage.StoreSyncEvent;
import org.keycloak.storage.managers.UserStorageSyncManager;
import org.keycloak.storage.UserStorageEventListener;
import org.keycloak.timer.ScheduledTask;
import org.keycloak.timer.TimerProvider;
@ -74,7 +73,12 @@ public class DefaultDatastoreProviderFactory implements DatastoreProviderFactory
@Override
public void postInit(KeycloakSessionFactory factory) {
factory.register(this);
onClose = () -> factory.unregister(this);
UserStorageEventListener userStorageEventListener = new UserStorageEventListener(factory);
factory.register(userStorageEventListener);
onClose = () -> {
factory.unregister(this);
factory.unregister(userStorageEventListener);
};
}
@Override
@ -119,9 +123,6 @@ public class DefaultDatastoreProviderFactory implements DatastoreProviderFactory
public void onEvent(ProviderEvent event) {
if (event instanceof PostMigrationEvent) {
setupScheduledTasks(((PostMigrationEvent) event).getFactory());
} else if (event instanceof StoreSyncEvent) {
StoreSyncEvent ev = (StoreSyncEvent) event;
UserStorageSyncManager.notifyToRefreshPeriodicSyncAll(ev.getSession(), ev.getRealm(), ev.getRemoved());
} else if (event instanceof StoreMigrateRepresentationEvent) {
StoreMigrateRepresentationEvent ev = (StoreMigrateRepresentationEvent) event;
MigrationModelManager.migrateImport(ev.getSession(), ev.getRealm(), ev.getRep(), ev.isSkipUserDependent());
@ -141,8 +142,6 @@ public class DefaultDatastoreProviderFactory implements DatastoreProviderFactory
for (ScheduledTask task : getScheduledTasks()) {
scheduleTask(timer, sessionFactory, task, interval);
}
UserStorageSyncManager.bootstrapPeriodic(sessionFactory, timer);
}
protected static List<ScheduledTask> getScheduledTasks() {

View File

@ -1242,7 +1242,6 @@ public class DefaultExportImportManager implements ExportImportManager {
model.setFullSyncPeriod(fedModel.getFullSyncPeriod());
model.setPriority(fedModel.getPriority());
model.setChangedSyncPeriod(fedModel.getChangedSyncPeriod());
model.setLastSync(fedModel.getLastSync());
if (fedModel.getConfig() != null) {
for (Map.Entry<String, String> entry : fedModel.getConfig().entrySet()) {
model.getConfig().putSingle(entry.getKey(), entry.getValue());

View File

@ -1,393 +0,0 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.storage.managers;
import java.util.Objects;
import java.util.stream.Stream;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ExecutionResult;
import org.keycloak.common.util.Time;
import org.keycloak.common.util.TriFunction;
import org.keycloak.component.ComponentModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.KeycloakSessionTask;
import org.keycloak.models.RealmModel;
import org.keycloak.models.StorageProviderRealmModel;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.storage.UserStorageProvider;
import org.keycloak.storage.UserStorageProviderFactory;
import org.keycloak.storage.UserStorageProviderModel;
import org.keycloak.storage.managers.UserStorageSyncManager.UserStorageSyncTask.SyncMode;
import org.keycloak.storage.user.ImportSynchronization;
import org.keycloak.storage.user.SynchronizationResult;
import org.keycloak.timer.TimerProvider;
import org.keycloak.timer.TimerProvider.TimerTaskContext;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.annotations.ProtoTypeId;
import org.jboss.logging.Logger;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class UserStorageSyncManager {
private static final String USER_STORAGE_TASK_KEY = "user-storage";
private static final Logger logger = Logger.getLogger(UserStorageSyncManager.class);
/**
* Check federationProviderModel of all realms and possibly start periodic sync for them
*
* @param sessionFactory
* @param timer
*/
public static void bootstrapPeriodic(final KeycloakSessionFactory sessionFactory, final TimerProvider timer) {
KeycloakModelUtils.runJobInTransaction(sessionFactory, session -> {
Stream<RealmModel> realms = session.realms().getRealmsWithProviderTypeStream(UserStorageProvider.class);
realms.forEach(realm -> {
Stream<UserStorageProviderModel> providers = ((StorageProviderRealmModel) realm).getUserStorageProvidersStream();
providers.forEachOrdered(provider -> {
refreshPeriodicSyncForProvider(session, provider, realm);
});
});
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
clusterProvider.registerListener(USER_STORAGE_TASK_KEY, new UserStorageClusterListener(sessionFactory));
});
}
private static class Holder {
ExecutionResult<SynchronizationResult> result;
}
public static SynchronizationResult syncAllUsers(final KeycloakSessionFactory sessionFactory, final String realmId, final UserStorageProviderModel provider) {
return syncUsers(sessionFactory, provider.getChangedSyncPeriod(), realmId, provider, (sf, factory, pm) -> {
return factory.sync(sessionFactory, realmId, pm);
});
}
public static SynchronizationResult syncChangedUsers(final KeycloakSessionFactory sessionFactory, final String realmId, final UserStorageProviderModel provider) {
return syncUsers(sessionFactory, provider.getChangedSyncPeriod(), realmId, provider, (sf, factory, pm) -> {
// See when we did last sync.
int oldLastSync = provider.getLastSync();
return factory.syncSince(Time.toDate(oldLastSync), sessionFactory, realmId, provider);
});
}
private static SynchronizationResult syncUsers(final KeycloakSessionFactory sessionFactory, int period, final String realmId, final UserStorageProviderModel provider, TriFunction<KeycloakSessionFactory, ImportSynchronization, UserStorageProviderModel, SynchronizationResult> syncFunction) {
UserStorageProviderFactory<?> factory = (UserStorageProviderFactory<?>) sessionFactory.getProviderFactory(UserStorageProvider.class, provider.getProviderId());
if (!(factory instanceof ImportSynchronization) || !provider.isImportEnabled() || !provider.isEnabled()) {
return SynchronizationResult.ignored();
}
Holder holder = new Holder();
// Ensure not executed concurrently on this or any other cluster node
KeycloakModelUtils.runJobInTransaction(sessionFactory, session -> {
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
// shared key for "full" and "changed" . Improve if needed
String taskKey = provider.getId() + "::sync";
// 30 seconds minimal timeout for now
int timeout = Math.max(30, period);
holder.result = clusterProvider.executeIfNotExecuted(taskKey, timeout, () -> {
RealmModel realm = session.realms().getRealm(realmId);
session.getContext().setRealm(realm);
// Need to load component again in this transaction for updated data
ComponentModel storageComponent = realm.getComponent(provider.getId());
UserStorageProviderModel storageModel = new UserStorageProviderModel(storageComponent);
SynchronizationResult result = syncFunction.apply(sessionFactory, (ImportSynchronization) factory, storageModel);
if (!result.isIgnored()) {
updateLastSyncInterval(sessionFactory, storageModel, realmId, Time.currentTime());
}
return result;
});
});
if (holder.result == null || !holder.result.isExecuted()) {
logger.debugf("syncing users for federation provider %s was ignored as it's already in progress", provider.getName());
return SynchronizationResult.ignored();
} else {
return holder.result.getResult();
}
}
public static void notifyToRefreshPeriodicSyncAll(KeycloakSession session, RealmModel realm, boolean removed) {
((StorageProviderRealmModel) realm).getUserStorageProvidersStream().forEachOrdered(fedProvider ->
notifyToRefreshPeriodicSync(session, realm, fedProvider, removed));
}
// Ensure all cluster nodes are notified
public static void notifyToRefreshPeriodicSync(KeycloakSession session, RealmModel realm, UserStorageProviderModel provider, boolean removed) {
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
UserStorageProviderFactory<?> factory = (UserStorageProviderFactory<?>) sessionFactory.getProviderFactory(UserStorageProvider.class, provider.getProviderId());
if (!(factory instanceof ImportSynchronization)) {
return;
}
ClusterProvider cp = session.getProvider(ClusterProvider.class);
if (cp != null) {
UserStorageProviderClusterEvent event = UserStorageProviderClusterEvent.createEvent(removed, realm.getId(), provider);
cp.notify(USER_STORAGE_TASK_KEY, event, false);
}
}
// Executed once it receives notification that some UserFederationProvider was created or updated
protected static void refreshPeriodicSyncForProvider(KeycloakSession session, UserStorageProviderModel provider, RealmModel realm) {
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
UserStorageProviderFactory<?> factory = (UserStorageProviderFactory<?>) sessionFactory.getProviderFactory(UserStorageProvider.class, provider.getProviderId());
if (!(factory instanceof ImportSynchronization)) {
logger.debugf("Not refreshing periodic sync settings for provider '%s' in realm '%s'", provider.getName(), realm.getName());
return;
}
logger.debugf("Going to refresh periodic sync settings for provider '%s' in realm '%s' with realmId '%s'. Full sync period: %d , changed users sync period: %d",
provider.getName(), realm.getName(), realm.getId(), provider.getFullSyncPeriod(), provider.getChangedSyncPeriod());
scheduleUserStorageSyncTask(session, provider, UserStorageSyncTask.SyncMode.FULL, provider.getFullSyncPeriod(), realm);
scheduleUserStorageSyncTask(session, provider, SyncMode.CHANGED, provider.getChangedSyncPeriod(), realm);
}
private static void scheduleUserStorageSyncTask(KeycloakSession session, UserStorageProviderModel provider, UserStorageSyncTask.SyncMode mode, int period, RealmModel realm) {
String syncTaskName = createSyncTaskName(provider, mode);
TimerProvider timer = session.getProvider(TimerProvider.class);
if (timer == null) {
logger.warnf("TimerProvider not available. Can't schedule user storage sync task for provider '%s' in realm '%s'", provider.getName(), realm.getName());
return;
}
if (provider.isImportEnabled() && provider.isEnabled() && period > 0) {
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
UserStorageSyncTask task = new UserStorageSyncTask(provider, realm, sessionFactory, mode);
logger.debugf("Scheduling user periodic sync task '%s' for user storage provider provider '%s' in realm '%s' with period %d seconds", syncTaskName, provider.getName(), realm.getName(), provider.getChangedSyncPeriod());
timer.schedule(task, period * 1000L, syncTaskName);
} else {
if (!provider.isEnabled()) {
logger.debugf("Not scheduling periodic sync settings for provider '%s' in realm '%s'. Provider is disabled", provider.getName(), realm.getName());
return;
}
if (!provider.isImportEnabled()) {
logger.debugf("Not scheduling periodic sync settings for provider '%s' in realm '%s'. Import users is disabled", provider.getName(), realm.getName());
return;
}
// cancel potentially dangling task
logger.debugf("Cancelling any running user periodic sync task '%s' for user storage provider provider '%s' in realm '%s'", syncTaskName, provider.getName(), realm.getName());
timer.cancelTask(syncTaskName);
}
}
public static class UserStorageSyncTask implements Runnable {
public enum SyncMode {
FULL, CHANGED
}
private final UserStorageProviderModel provider;
private final RealmModel realm;
private final KeycloakSessionFactory sessionFactory;
private final SyncMode syncMode;
public UserStorageSyncTask(UserStorageProviderModel provider, RealmModel realm, KeycloakSessionFactory sessionFactory, SyncMode syncMode) {
this.provider = provider;
this.realm = realm;
this.sessionFactory = sessionFactory;
this.syncMode = syncMode;
}
@Override
public void run() {
try {
boolean shouldPerformSync = shouldPerformNewPeriodicSync(provider.getLastSync(), provider.getChangedSyncPeriod());
if (!shouldPerformSync) {
logger.debugf("Ignored periodic %s users-sync with storage provider %s due small time since last sync in realm %s", //
syncMode, provider.getName(), realm.getName());
return;
}
switch (syncMode) {
case FULL:
syncAllUsers(sessionFactory, realm.getId(), provider);
break;
case CHANGED:
syncChangedUsers(sessionFactory, realm.getId(), provider);
break;
}
} catch (Throwable t) {
logger.errorf(t, "Error occurred during %s users-sync in realm %s", //
syncMode, realm.getName());
}
}
}
public static String createSyncTaskName(UserStorageProviderModel model, UserStorageSyncTask.SyncMode syncMode) {
return UserStorageSyncTask.class.getSimpleName() + "-" + model.getId() + "-" + syncMode;
}
// Skip syncing if there is short time since last sync time.
private static boolean shouldPerformNewPeriodicSync(int lastSyncTime, int period) {
if (lastSyncTime <= 0) {
return true;
}
int currentTime = Time.currentTime();
int timeSinceLastSync = currentTime - lastSyncTime;
return (timeSinceLastSync * 2 > period);
}
// Executed once it receives notification that some UserFederationProvider was removed
protected static void removePeriodicSyncForProvider(TimerProvider timer, UserStorageProviderModel fedProvider) {
cancelPeriodicSyncForProviderIfPresent(timer, fedProvider, UserStorageSyncTask.SyncMode.FULL);
cancelPeriodicSyncForProviderIfPresent(timer, fedProvider, UserStorageSyncTask.SyncMode.CHANGED);
}
protected static void cancelPeriodicSyncForProviderIfPresent(TimerProvider timer, UserStorageProviderModel providerModel, UserStorageSyncTask.SyncMode syncMode) {
String taskName = createSyncTaskName(providerModel, syncMode);
TimerTaskContext existingTask = timer.cancelTask(taskName);
if (existingTask != null) {
logger.debugf("Cancelled periodic sync task with task-name '%s' for provider with id '%s' and name '%s'",
taskName, providerModel.getId(), providerModel.getName());
}
}
// Update interval of last sync for given UserFederationProviderModel. Do it in separate transaction
private static void updateLastSyncInterval(final KeycloakSessionFactory sessionFactory, UserStorageProviderModel provider, final String realmId, final int lastSync) {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
RealmModel persistentRealm = session.realms().getRealm(realmId);
((StorageProviderRealmModel) persistentRealm).getUserStorageProvidersStream()
.filter(persistentFedProvider -> Objects.equals(provider.getId(), persistentFedProvider.getId()))
.forEachOrdered(persistentFedProvider -> {
// Update persistent provider in DB
persistentFedProvider.setLastSync(lastSync);
persistentRealm.updateComponent(persistentFedProvider);
// Update "cached" reference
provider.setLastSync(lastSync);
});
}
});
}
private static class UserStorageClusterListener implements ClusterListener {
private final KeycloakSessionFactory sessionFactory;
public UserStorageClusterListener(KeycloakSessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
@Override
public void eventReceived(ClusterEvent event) {
final UserStorageProviderClusterEvent fedEvent = (UserStorageProviderClusterEvent) event;
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
TimerProvider timer = session.getProvider(TimerProvider.class);
if (fedEvent.isRemoved()) {
removePeriodicSyncForProvider(timer, fedEvent.getStorageProvider());
} else {
RealmModel realm = session.realms().getRealm(fedEvent.getRealmId());
refreshPeriodicSyncForProvider(session, fedEvent.getStorageProvider(), realm);
}
}
});
}
}
// Send to cluster during each update or remove of federationProvider, so all nodes can update sync periods
@ProtoTypeId(65540)
public static class UserStorageProviderClusterEvent implements ClusterEvent {
private boolean removed;
private String realmId;
private UserStorageProviderModel storageProvider;
@ProtoField(1)
public boolean isRemoved() {
return removed;
}
public void setRemoved(boolean removed) {
this.removed = removed;
}
@ProtoField(2)
public String getRealmId() {
return realmId;
}
public void setRealmId(String realmId) {
this.realmId = realmId;
}
@ProtoField(3)
public UserStorageProviderModel getStorageProvider() {
return storageProvider;
}
public void setStorageProvider(UserStorageProviderModel federationProvider) {
this.storageProvider = federationProvider;
}
public static UserStorageProviderClusterEvent createEvent(boolean removed, String realmId, UserStorageProviderModel provider) {
UserStorageProviderClusterEvent notification = new UserStorageProviderClusterEvent();
notification.setRemoved(removed);
notification.setRealmId(realmId);
notification.setStorageProvider(provider);
return notification;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UserStorageProviderClusterEvent that = (UserStorageProviderClusterEvent) o;
return removed == that.removed && Objects.equals(realmId, that.realmId) && Objects.equals(storageProvider.getId(), that.storageProvider.getId());
}
@Override
public int hashCode() {
return Objects.hash(removed, realmId, storageProvider.getId());
}
}
}

View File

@ -35,16 +35,17 @@ import org.keycloak.common.ClientConnection;
import org.keycloak.component.ComponentModel;
import org.keycloak.events.admin.OperationType;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.RealmModel;
import org.keycloak.services.ErrorResponse;
import org.keycloak.services.ServicesLogger;
import org.keycloak.services.managers.LDAPServerCapabilitiesManager;
import org.keycloak.services.resources.admin.fgap.AdminPermissionEvaluator;
import org.keycloak.storage.UserStoragePrivateUtil;
import org.keycloak.storage.UserStorageProvider;
import org.keycloak.storage.UserStorageProviderModel;
import org.keycloak.storage.ldap.LDAPStorageProvider;
import org.keycloak.storage.ldap.mappers.LDAPStorageMapper;
import org.keycloak.storage.managers.UserStorageSyncManager;
import org.keycloak.storage.user.SynchronizationResult;
import org.jboss.logging.Logger;
@ -149,18 +150,19 @@ public class UserStorageProviderResource {
logger.debug("Syncing users");
UserStorageSyncManager syncManager = new UserStorageSyncManager();
SynchronizationResult syncResult;
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
if ("triggerFullSync".equals(action)) {
try {
syncResult = syncManager.syncAllUsers(session.getKeycloakSessionFactory(), realm.getId(), providerModel);
syncResult = UserStoragePrivateUtil.runFullSync(sessionFactory, providerModel);
} catch(Exception e) {
String errorMsg = getErrorCode(e);
throw ErrorResponse.error(errorMsg, Response.Status.BAD_REQUEST);
}
} else if ("triggerChangedUsersSync".equals(action)) {
try {
syncResult = syncManager.syncChangedUsers(session.getKeycloakSessionFactory(), realm.getId(), providerModel);
syncResult = UserStoragePrivateUtil.runPeriodicSync(sessionFactory, providerModel);
} catch(Exception e) {
String errorMsg = getErrorCode(e);
throw ErrorResponse.error(errorMsg, Response.Status.BAD_REQUEST);

View File

@ -36,6 +36,10 @@ public class UserStorageProviderModel extends CacheableStorageProviderModel {
public static final String LAST_SYNC = "lastSync";
public static final String REMOVE_INVALID_USERS_ENABLED = "removeInvalidUsersEnabled";
public enum SyncMode {
FULL, CHANGED;
}
public UserStorageProviderModel() {
setProviderType(UserStorageProvider.class.getName());
}
@ -46,7 +50,6 @@ public class UserStorageProviderModel extends CacheableStorageProviderModel {
private transient Integer fullSyncPeriod;
private transient Integer changedSyncPeriod;
private transient Integer lastSync;
private transient Boolean importEnabled;
private transient Boolean removeInvalidUsersEnabled;
@ -98,20 +101,24 @@ public class UserStorageProviderModel extends CacheableStorageProviderModel {
}
public int getLastSync() {
if (lastSync == null) {
String val = getConfig().getFirst(LAST_SYNC);
if (val == null) {
lastSync = 0;
} else {
lastSync = Integer.valueOf(val);
}
return Math.max(getLastSync(SyncMode.FULL), getLastSync(SyncMode.CHANGED));
}
public int getLastSync(SyncMode syncMode) {
String val = getConfig().getFirst(LAST_SYNC + "_" + syncMode.name());
if (val == null) {
return 0;
}
return lastSync;
return Integer.parseInt(val);
}
public void setLastSync(int lastSync, SyncMode syncMode) {
getConfig().putSingle(LAST_SYNC + "_" + syncMode.name(), Integer.toString(lastSync));
}
public void setLastSync(int lastSync) {
this.lastSync = lastSync;
getConfig().putSingle(LAST_SYNC, Integer.toString(lastSync));
setLastSync(lastSync, SyncMode.FULL);
setLastSync(lastSync, SyncMode.CHANGED);
}
public boolean isRemoveInvalidUsersEnabled() {

View File

@ -16,6 +16,7 @@
*/
package org.keycloak.storage;
import org.keycloak.component.ComponentModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.provider.ProviderEvent;
@ -29,10 +30,16 @@ public class StoreSyncEvent implements ProviderEvent {
private final KeycloakSession session;
private final RealmModel realm;
private final boolean removed;
private final ComponentModel model;
public StoreSyncEvent(KeycloakSession session, RealmModel realm, boolean removed) {
private StoreSyncEvent(KeycloakSession session, RealmModel realm, boolean removed) {
this(session, realm, null, removed);
}
private StoreSyncEvent(KeycloakSession session, RealmModel realm, ComponentModel model, boolean removed) {
this.session = session;
this.realm = realm;
this.model = model;
this.removed = removed;
}
@ -40,6 +47,10 @@ public class StoreSyncEvent implements ProviderEvent {
session.getKeycloakSessionFactory().publish(new StoreSyncEvent(session, realm, removed));
}
public static void fire(KeycloakSession session, RealmModel realm, ComponentModel model, boolean removed) {
session.getKeycloakSessionFactory().publish(new StoreSyncEvent(session, realm, model, removed));
}
public KeycloakSession getSession() {
return session;
}
@ -48,8 +59,11 @@ public class StoreSyncEvent implements ProviderEvent {
return realm;
}
public ComponentModel getModel() {
return model;
}
public boolean getRemoved() {
return removed;
}
}

View File

@ -297,8 +297,18 @@ public class RealmManager {
authSessions.onRealmRemoved(realm);
}
// Refresh periodic sync tasks for configured storageProviders
StoreSyncEvent.fire(session, realm, true);
session.getTransactionManager().enlistAfterCompletion(new AbstractKeycloakTransaction() {
@Override
protected void commitImpl() {
// Refresh periodic sync tasks for configured storageProviders
StoreSyncEvent.fire(session, realm, true);
}
@Override
protected void rollbackImpl() {
// nothing to rollback
}
});
}
return removed;
}

View File

@ -35,7 +35,6 @@ import org.keycloak.storage.UserStorageProvider;
import org.keycloak.storage.ldap.LDAPStorageProviderFactory;
import org.keycloak.storage.ldap.idm.model.LDAPObject;
import org.keycloak.storage.ldap.kerberos.LDAPProviderKerberosConfig;
import org.keycloak.storage.managers.UserStorageSyncManager;
import org.keycloak.storage.user.SynchronizationResult;
import org.keycloak.testsuite.KerberosEmbeddedServer;
import org.keycloak.testsuite.federation.ldap.LDAPTestAsserts;
@ -105,7 +104,6 @@ public class KerberosLdapTest extends AbstractKerberosSingleRealmTest {
RealmModel testRealm = ctx.getRealm();
ctx.getLdapModel().getConfig().putSingle(LDAPConstants.EDIT_MODE, UserStorageProvider.EditMode.WRITABLE.toString());
UserStorageSyncManager usersSyncManager = new UserStorageSyncManager();
renameUserInLDAP(ctx, testRealm, "hnelson", "hnelson2", "hnelson2@keycloak.org", "hnelson2@KEYCLOAK.ORG", "secret2");
@ -114,7 +112,7 @@ public class KerberosLdapTest extends AbstractKerberosSingleRealmTest {
// Trigger sync
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
SynchronizationResult syncResult = usersSyncManager.syncAllUsers(sessionFactory, testRealm.getId(), ctx.getLdapModel());
SynchronizationResult syncResult = UserStoragePrivateUtil.runFullSync(sessionFactory, ctx.getLdapModel());
Assert.assertEquals(0, syncResult.getFailed());
});

View File

@ -36,7 +36,6 @@ import org.keycloak.storage.ldap.idm.model.LDAPObject;
import org.keycloak.storage.ldap.mappers.membership.LDAPGroupMapperMode;
import org.keycloak.storage.ldap.mappers.membership.role.RoleLDAPStorageMapper;
import org.keycloak.storage.ldap.mappers.membership.role.RoleMapperConfig;
import org.keycloak.storage.managers.UserStorageSyncManager;
import org.keycloak.storage.user.SynchronizationResult;
import org.keycloak.testsuite.util.LDAPRule;
import org.keycloak.testsuite.util.LDAPTestUtils;
@ -342,9 +341,7 @@ public class LDAPRoleMappingsTest extends AbstractLDAPTest {
LDAPObject john = LDAPTestUtils.addLDAPUser(ldapProvider, appRealm, "johnrolemapper", "John", "RoleMapper", "johnrolemapper@email.org", null, "1234");
LDAPTestUtils.updateLDAPPassword(ldapProvider, john, "Password1");
LDAPTestUtils.addOrUpdateRoleLDAPMappers(appRealm, ctx.getLdapModel(), LDAPGroupMapperMode.LDAP_ONLY);
UserStorageSyncManager usersSyncManager = new UserStorageSyncManager();
SynchronizationResult syncResult = usersSyncManager.syncChangedUsers(session.getKeycloakSessionFactory(),
appRealm.getId(), new UserStorageProviderModel(ctx.getLdapModel()));
SynchronizationResult syncResult = UserStoragePrivateUtil.runPeriodicSync(session.getKeycloakSessionFactory(), new UserStorageProviderModel(ctx.getLdapModel()));
syncResult.getAdded();
});
@ -398,9 +395,7 @@ public class LDAPRoleMappingsTest extends AbstractLDAPTest {
roleMapper.addRoleMappingInLDAP("realmRole2", johnLdap);
}
UserStorageSyncManager usersSyncManager = new UserStorageSyncManager();
SynchronizationResult syncResult = usersSyncManager.syncChangedUsers(session.getKeycloakSessionFactory(),
appRealm.getId(), new UserStorageProviderModel(ctx.getLdapModel()));
UserStoragePrivateUtil.runPeriodicSync(session.getKeycloakSessionFactory(), new UserStorageProviderModel(ctx.getLdapModel()));
});
testingClient.server().run(session -> {

View File

@ -17,6 +17,7 @@
package org.keycloak.testsuite.federation.ldap;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -52,12 +53,12 @@ import org.keycloak.storage.ldap.mappers.membership.MembershipType;
import org.keycloak.storage.ldap.mappers.membership.group.GroupLDAPStorageMapper;
import org.keycloak.storage.ldap.mappers.membership.group.GroupLDAPStorageMapperFactory;
import org.keycloak.storage.ldap.mappers.membership.group.GroupMapperConfig;
import org.keycloak.storage.managers.UserStorageSyncManager;
import org.keycloak.storage.user.SynchronizationResult;
import org.keycloak.testsuite.util.LDAPRule;
import org.keycloak.testsuite.util.LDAPTestUtils;
import org.keycloak.testsuite.util.WaitUtils;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.FixMethodOrder;
@ -119,12 +120,6 @@ public class LDAPSyncTest extends AbstractLDAPTest {
});
}
// @Test
// public void test01runit() throws Exception {
// Thread.sleep(10000000);
// }
@Test
public void test01LDAPSync() {
// wait a bit
@ -134,9 +129,8 @@ public class LDAPSyncTest extends AbstractLDAPTest {
testingClient.server().run(session -> {
LDAPTestContext ctx = LDAPTestContext.init(session);
UserStorageSyncManager usersSyncManager = new UserStorageSyncManager();
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
SynchronizationResult syncResult = usersSyncManager.syncAllUsers(sessionFactory, ctx.getRealm().getId(), ctx.getLdapModel());
SynchronizationResult syncResult = UserStoragePrivateUtil.runFullSync(sessionFactory, ctx.getLdapModel());
LDAPTestAsserts.assertSyncEquals(syncResult, 5, 0, 0, 0);
});
@ -171,7 +165,6 @@ public class LDAPSyncTest extends AbstractLDAPTest {
LDAPTestContext ctx = LDAPTestContext.init(session);
RealmModel testRealm = ctx.getRealm();
UserProvider userProvider = UserStoragePrivateUtil.userLocalStorage(session);
UserStorageSyncManager usersSyncManager = new UserStorageSyncManager();
// Add user to LDAP and update 'user5' in LDAP
LDAPTestUtils.addLDAPUser(ctx.getLdapProvider(), testRealm, "user6", "User6FN", "User6LN", "user6@email.org", null, "126");
@ -185,17 +178,69 @@ public class LDAPSyncTest extends AbstractLDAPTest {
LDAPTestAsserts.assertUserImported(userProvider, testRealm, "user5", "User5FN", "User5LN", "user5@email.org", "125");
Assert.assertNull(userProvider.getUserByUsername(testRealm, "user6"));
// Trigger partial sync
// Trigger partial sync, one added and updates because a partial sync did not happen yet, only a full sync
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
SynchronizationResult syncResult = usersSyncManager.syncChangedUsers(sessionFactory, testRealm.getId(), ctx.getLdapModel());
SynchronizationResult syncResult = UserStoragePrivateUtil.runPeriodicSync(sessionFactory, ctx.getLdapModel());
LDAPTestAsserts.assertSyncEquals(syncResult, 1, 1, 0, 0);
});
Awaitility.await()
.timeout(Duration.ofSeconds(15))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(() -> {
testingClient.server().run(session -> {
LDAPTestContext ctx = LDAPTestContext.init(session);
// Trigger partial sync and both user6 added and user5 updated because this is the first partial sync
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
SynchronizationResult syncResult = UserStoragePrivateUtil.runPeriodicSync(sessionFactory, ctx.getLdapModel());
LDAPTestAsserts.assertSyncEquals(syncResult, 0, 0, 0, 0);
});
});
WaitUtils.pause(getLDAPRule().getSleepTime());
testingClient.server().run(session -> {
LDAPTestContext ctx = LDAPTestContext.init(session);
RealmModel testRealm = ctx.getRealm();
LDAPObject ldapUser5 = ctx.getLdapProvider().loadLDAPUserByUsername(testRealm, "user5");
// NOTE: Changing LDAP attributes directly here
ldapUser5.setSingleAttribute(LDAPConstants.EMAIL, "user5Updated@email.org");
ldapUser5.setSingleAttribute(LDAPConstants.POSTAL_CODE, "5213");
ctx.getLdapProvider().getLdapIdentityStore().update(ldapUser5);
});
Awaitility.await()
.timeout(Duration.ofSeconds(15))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(() -> {
testingClient.server().run(session -> {
LDAPTestContext ctx = LDAPTestContext.init(session);
// Trigger partial sync and one user updated since last sync
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
SynchronizationResult syncResult = UserStoragePrivateUtil.runPeriodicSync(sessionFactory, ctx.getLdapModel());
LDAPTestAsserts.assertSyncEquals(syncResult, 0, 1, 0, 0);
});
});
Awaitility.await()
.timeout(Duration.ofSeconds(15))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(() -> {
testingClient.server().run(session -> {
LDAPTestContext ctx = LDAPTestContext.init(session);
// Trigger partial sync but nothing to sync because no changes in LDAP since the last run
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
SynchronizationResult syncResult = UserStoragePrivateUtil.runPeriodicSync(sessionFactory, ctx.getLdapModel());
LDAPTestAsserts.assertSyncEquals(syncResult, 0, 0, 0, 0);
});
});
testingClient.server().run(session -> {
RealmModel testRealm = session.realms().getRealmByName(TEST_REALM_NAME);
UserProvider userProvider = UserStoragePrivateUtil.userLocalStorage(session);
// Assert users updated in local provider
LDAPTestAsserts.assertUserImported(userProvider, testRealm, "user5", "User5FN", "User5LN", "user5updated@email.org", "521");
LDAPTestAsserts.assertUserImported(userProvider, testRealm, "user5", "User5FN", "User5LN", "user5updated@email.org", "5213");
LDAPTestAsserts.assertUserImported(userProvider, testRealm, "user6", "User6FN", "User6LN", "user6@email.org", "126");
});
}
@ -218,7 +263,7 @@ public class LDAPSyncTest extends AbstractLDAPTest {
LDAPTestContext ctx = LDAPTestContext.init(session);
// Assert syncing from LDAP fails due to duplicated username
SynchronizationResult result = new UserStorageSyncManager().syncAllUsers(session.getKeycloakSessionFactory(), ctx.getRealm().getId(), ctx.getLdapModel());
SynchronizationResult result = UserStoragePrivateUtil.runFullSync(session.getKeycloakSessionFactory(), ctx.getLdapModel());
Assert.assertEquals(1, result.getFailed());
// Remove "user7" from LDAP
@ -233,7 +278,7 @@ public class LDAPSyncTest extends AbstractLDAPTest {
LDAPTestContext ctx = LDAPTestContext.init(session);
// Assert syncing from LDAP fails due to duplicated email
SynchronizationResult result = new UserStorageSyncManager().syncAllUsers(session.getKeycloakSessionFactory(), ctx.getRealm().getId(), ctx.getLdapModel());
SynchronizationResult result = UserStoragePrivateUtil.runFullSync(session.getKeycloakSessionFactory(), ctx.getLdapModel());
Assert.assertEquals(1, result.getFailed());
Assert.assertNull(UserStoragePrivateUtil.userLocalStorage(session).getUserByUsername(ctx.getRealm(), "user7-something"));
@ -243,7 +288,7 @@ public class LDAPSyncTest extends AbstractLDAPTest {
ctx.getLdapProvider().getLdapIdentityStore().update(duplicatedLdapUser);
// Assert user successfully synced now
result = new UserStorageSyncManager().syncAllUsers(session.getKeycloakSessionFactory(), ctx.getRealm().getId(), ctx.getLdapModel());
result = UserStoragePrivateUtil.runFullSync(session.getKeycloakSessionFactory(), ctx.getLdapModel());
Assert.assertEquals(0, result.getFailed());
});
@ -290,7 +335,7 @@ public class LDAPSyncTest extends AbstractLDAPTest {
testingClient.server().run(session -> {
LDAPTestContext ctx = LDAPTestContext.init(session);
SynchronizationResult result = new UserStorageSyncManager().syncAllUsers(session.getKeycloakSessionFactory(), ctx.getRealm().getId(), ctx.getLdapModel());
SynchronizationResult result = UserStoragePrivateUtil.runFullSync(session.getKeycloakSessionFactory(), ctx.getLdapModel());
Assert.assertEquals(0, result.getFailed());
});
}
@ -304,14 +349,13 @@ public class LDAPSyncTest extends AbstractLDAPTest {
// Add user to LDAP
LDAPTestUtils.addLDAPUser(ctx.getLdapProvider(), ctx.getRealm(), "beckybecks", "Becky", "Becks", "becky-becks@email.org", null, "123");
SynchronizationResult syncResult = new UserStorageSyncManager().syncAllUsers(sessionFactory, ctx.getRealm().getId(), ctx.getLdapModel());
SynchronizationResult syncResult = UserStoragePrivateUtil.runFullSync(sessionFactory, ctx.getLdapModel());
Assert.assertEquals(0, syncResult.getFailed());
});
testingClient.server().run(session -> {
LDAPTestContext ctx = LDAPTestContext.init(session);
RealmModel testRealm = ctx.getRealm();
UserStorageSyncManager usersSyncManager = new UserStorageSyncManager();
// Update user 'beckybecks' in LDAP
LDAPObject ldapUser = ctx.getLdapProvider().loadLDAPUserByUsername(testRealm, "beckybecks");
@ -326,7 +370,7 @@ public class LDAPSyncTest extends AbstractLDAPTest {
// Trigger partial sync
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
SynchronizationResult syncResult = usersSyncManager.syncChangedUsers(sessionFactory, testRealm.getId(), ctx.getLdapModel());
SynchronizationResult syncResult = UserStoragePrivateUtil.runPeriodicSync(sessionFactory, ctx.getLdapModel());
Assert.assertEquals(0, syncResult.getFailed());
});
@ -375,7 +419,7 @@ public class LDAPSyncTest extends AbstractLDAPTest {
LDAPTestContext ctx = LDAPTestContext.init(session);
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
SynchronizationResult syncResult = new UserStorageSyncManager().syncAllUsers(sessionFactory, ctx.getRealm().getId(), ctx.getLdapModel());
SynchronizationResult syncResult = UserStoragePrivateUtil.runFullSync(sessionFactory, ctx.getLdapModel());
Assert.assertEquals(0, syncResult.getFailed());
});
@ -446,7 +490,7 @@ public class LDAPSyncTest extends AbstractLDAPTest {
LDAPTestContext ctx = LDAPTestContext.init(session);
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
SynchronizationResult syncResult = new UserStorageSyncManager().syncAllUsers(sessionFactory, ctx.getRealm().getId(), ctx.getLdapModel());
SynchronizationResult syncResult = UserStoragePrivateUtil.runFullSync(sessionFactory, ctx.getLdapModel());
Assert.assertEquals(1, syncResult.getAdded());
Assert.assertTrue(syncResult.getFailed() > 0);
UserModel invalidUser = session.users().getUserByUsername(ctx.getRealm(), "user8street");
@ -635,7 +679,7 @@ public class LDAPSyncTest extends AbstractLDAPTest {
LDAPTestContext ctx = LDAPTestContext.init(session);
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
SynchronizationResult syncResult = new UserStorageSyncManager().syncAllUsers(sessionFactory, ctx.getRealm().getId(), ctx.getLdapModel());
SynchronizationResult syncResult = UserStoragePrivateUtil.runFullSync(sessionFactory, ctx.getLdapModel());
Assert.assertEquals(2, syncResult.getAdded());
});

View File

@ -25,14 +25,14 @@ import java.util.concurrent.TimeUnit;
import org.keycloak.component.ComponentModel;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.RealmModel;
import org.keycloak.storage.StoreSyncEvent;
import org.keycloak.storage.UserStoragePrivateUtil;
import org.keycloak.storage.UserStorageProvider;
import org.keycloak.storage.UserStorageProviderModel;
import org.keycloak.storage.managers.UserStorageSyncManager;
import org.keycloak.storage.user.SynchronizationResult;
import org.keycloak.testsuite.AbstractAuthTest;
import org.keycloak.testsuite.auth.page.AuthRealm;
import org.keycloak.testsuite.federation.DummyUserFederationProviderFactory;
import org.keycloak.timer.TimerProvider;
import org.jboss.logging.Logger;
import org.junit.Assert;
@ -95,12 +95,12 @@ public class SyncFederationTest extends AbstractAuthTest {
sleep(1800);
// Cancel timer
UserStorageSyncManager.notifyToRefreshPeriodicSync(session, appRealm, dummyModel, true);
StoreSyncEvent.fire(session, appRealm, dummyModel, true);
log.infof("Notified sync manager about cancel periodic sync");
// This sync is here just to ensure that we have lock (doublecheck that periodic sync, which was possibly triggered before canceling timer is finished too)
while (true) {
SynchronizationResult result = UserStorageSyncManager.syncChangedUsers(session.getKeycloakSessionFactory(), appRealm.getId(), dummyModel);
SynchronizationResult result = UserStoragePrivateUtil.runPeriodicSync(session.getKeycloakSessionFactory(), dummyModel);
if (result.isIgnored()) {
log.infof("Still waiting for lock before periodic sync is finished", result.toString());
sleep(1000);
@ -182,9 +182,6 @@ public class SyncFederationTest extends AbstractAuthTest {
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
DummyUserFederationProviderFactory dummyFedFactory = (DummyUserFederationProviderFactory) sessionFactory.getProviderFactory(UserStorageProvider.class, DummyUserFederationProviderFactory.PROVIDER_NAME);
// Assert that after some period was DummyUserFederationProvider triggered
UserStorageSyncManager usersSyncManager = new UserStorageSyncManager();
// Assert that dummy provider wasn't invoked anymore
sleep(1800);
@ -213,16 +210,15 @@ public class SyncFederationTest extends AbstractAuthTest {
DummyUserFederationProviderFactory dummyFedFactory = (DummyUserFederationProviderFactory) sessionFactory.getProviderFactory(UserStorageProvider.class, DummyUserFederationProviderFactory.PROVIDER_NAME);
// Assert that after some period was DummyUserFederationProvider triggered
UserStorageSyncManager usersSyncManager = new UserStorageSyncManager();
sleep(1800);
// Cancel timer
usersSyncManager.notifyToRefreshPeriodicSync(session, appRealm, dummyModel, true);
StoreSyncEvent.fire(session, appRealm, dummyModel, true);
log.infof("Notified sync manager about cancel periodic sync");
// This sync is here just to ensure that we have lock (doublecheck that periodic sync, which was possibly triggered before canceling timer is finished too)
while (true) {
SynchronizationResult result = usersSyncManager.syncChangedUsers(session.getKeycloakSessionFactory(), appRealm.getId(), dummyModel);
SynchronizationResult result = UserStoragePrivateUtil.runPeriodicSync(session.getKeycloakSessionFactory(), dummyModel);
if (result.isIgnored()) {
log.infof("Still waiting for lock before periodic sync is finished", result.toString());
sleep(1000);
@ -258,7 +254,7 @@ public class SyncFederationTest extends AbstractAuthTest {
@Test
public void test03ConcurrentSync() throws Exception {
public void test03ConcurrentSync() {
// Enable timer for SyncDummyUserFederationProvider
testingClient.server().run(session -> {
SyncDummyUserFederationProviderFactory.restartLatches();
@ -280,21 +276,16 @@ public class SyncFederationTest extends AbstractAuthTest {
testingClient.server().run(session -> {
RealmModel appRealm = session.realms().getRealmByName(AuthRealm.TEST);
UserStorageProviderModel dummyModel = findDummyProviderModel(appRealm);
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
// bootstrap periodic sync
UserStorageSyncManager usersSyncManager = new UserStorageSyncManager();
usersSyncManager.bootstrapPeriodic(sessionFactory, session.getProvider(TimerProvider.class));
// Wait and then trigger sync manually. Assert it will be ignored
try {
SyncDummyUserFederationProviderFactory.latchStarted.await(20000, TimeUnit.MILLISECONDS);
SynchronizationResult syncResult = usersSyncManager.syncChangedUsers(sessionFactory, appRealm.getId(), dummyModel);
SynchronizationResult syncResult = UserStoragePrivateUtil.runPeriodicSync(sessionFactory, dummyModel);
Assert.assertTrue(syncResult.isIgnored());
// Cancel timer
usersSyncManager.notifyToRefreshPeriodicSync(session, appRealm, dummyModel, true);
StoreSyncEvent.fire(session, appRealm, dummyModel, true);
// Signal to factory to finish waiting
SyncDummyUserFederationProviderFactory.latchWait.countDown();
@ -304,7 +295,7 @@ public class SyncFederationTest extends AbstractAuthTest {
// This sync is here just to ensure that we have lock (doublecheck that periodic sync, which was possibly triggered before canceling timer is finished too)
while (true) {
SynchronizationResult result = usersSyncManager.syncChangedUsers(session.getKeycloakSessionFactory(), appRealm.getId(), dummyModel);
SynchronizationResult result = UserStoragePrivateUtil.runPeriodicSync(session.getKeycloakSessionFactory(), dummyModel);
if (result.isIgnored()) {
log.infof("Still waiting for lock before periodic sync is finished: %s", result.toString());
sleep(1000);
@ -345,9 +336,9 @@ public class SyncFederationTest extends AbstractAuthTest {
RealmModel appRealm = session.realms().getRealmByName(AuthRealm.TEST);
UserStorageProviderModel dummyModel = findDummyProviderModel(appRealm);
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
SynchronizationResult syncResult = UserStorageSyncManager.syncAllUsers(sessionFactory, appRealm.getId(), dummyModel);
SynchronizationResult syncResult = UserStoragePrivateUtil.runFullSync(sessionFactory, dummyModel);
Assert.assertTrue(syncResult.isIgnored());
syncResult = UserStorageSyncManager.syncChangedUsers(sessionFactory, appRealm.getId(), dummyModel);
syncResult = UserStoragePrivateUtil.runPeriodicSync(sessionFactory, dummyModel);
Assert.assertTrue(syncResult.isIgnored());
});
@ -386,9 +377,9 @@ public class SyncFederationTest extends AbstractAuthTest {
RealmModel appRealm = session.realms().getRealmByName(AuthRealm.TEST);
UserStorageProviderModel dummyModel = findDummyProviderModel(appRealm);
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
SynchronizationResult syncResult = UserStorageSyncManager.syncAllUsers(sessionFactory, appRealm.getId(), dummyModel);
SynchronizationResult syncResult = UserStoragePrivateUtil.runFullSync(sessionFactory, dummyModel);
Assert.assertTrue(syncResult.isIgnored());
syncResult = UserStorageSyncManager.syncChangedUsers(sessionFactory, appRealm.getId(), dummyModel);
syncResult = UserStoragePrivateUtil.runPeriodicSync(sessionFactory, dummyModel);
Assert.assertTrue(syncResult.isIgnored());
});

View File

@ -44,7 +44,6 @@ import org.keycloak.storage.ldap.idm.store.ldap.LDAPOperationManager;
import org.keycloak.storage.ldap.mappers.LDAPStorageMapper;
import org.keycloak.storage.ldap.mappers.UserAttributeLDAPStorageMapper;
import org.keycloak.storage.ldap.mappers.UserAttributeLDAPStorageMapperFactory;
import org.keycloak.storage.managers.UserStorageSyncManager;
import org.keycloak.storage.user.ImportSynchronization;
import org.keycloak.storage.user.SynchronizationResult;
import org.keycloak.testsuite.model.KeycloakModelTest;
@ -139,7 +138,7 @@ public class UserSyncTest extends KeycloakModelTest {
long start = System.currentTimeMillis();
SynchronizationResult res = withRealm(realmId, (session, realm) -> {
UserStorageProviderModel providerModel = new UserStorageProviderModel(realm.getComponent(userFederationId));
return UserStorageSyncManager.syncAllUsers(session.getKeycloakSessionFactory(), realm.getId(), providerModel);
return UserStoragePrivateUtil.runFullSync(session.getKeycloakSessionFactory(), providerModel);
});
long end = System.currentTimeMillis();
long timeNeeded = end - start;

View File

@ -23,8 +23,8 @@ import org.keycloak.common.util.MultivaluedHashMap;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.StorageProviderRealmModel;
import org.keycloak.storage.StoreSyncEvent;
import org.keycloak.storage.UserStorageProviderModel;
import org.keycloak.storage.managers.UserStorageSyncManager;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@ -57,7 +57,7 @@ public class SyncDummyFederationProviderCommand extends AbstractCommand {
realm.updateComponent(fedProviderModel);
}
new UserStorageSyncManager().notifyToRefreshPeriodicSync(session, realm, fedProviderModel, false);
StoreSyncEvent.fire(session, realm, fedProviderModel, false);
log.infof("User federation provider created and sync was started", waitTime);
}