Find highest sequence number in jgroups_ping

* Find the highest sequence number in jgroups_ping table to avoid duplicates

Fixes #44189

Signed-off-by: Pedro Ruivo <1492066+pruivo@users.noreply.github.com>
Co-authored-by: Pedro Ruivo <1492066+pruivo@users.noreply.github.com>
This commit is contained in:
Pedro Ruivo 2025-11-14 19:54:52 +00:00 committed by GitHub
parent 8d0b64bd59
commit 6260622f2e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -22,6 +22,7 @@ import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -47,6 +48,7 @@ import org.keycloak.jgroups.header.TracerHeader;
import org.keycloak.jgroups.protocol.KEYCLOAK_JDBC_PING2;
import org.keycloak.jgroups.protocol.OPEN_TELEMETRY;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder;
@ -262,22 +264,50 @@ public final class JGroupsConfigurator {
* one second. This prevents a split-brain scenario on a concurrent startup.
*/
private static Address prepareJGroupsAddress(KeycloakSession session, String clusterName) {
var storage = session.getProvider(ServerConfigStorageProvider.class);
String seq = storage.loadOrCreate(JGROUPS_ADDRESS_SEQUENCE, () -> "0");
long value = Long.parseLong(seq) + 1;
String newSeq = Long.toString(value);
storage.replace(JGROUPS_ADDRESS_SEQUENCE, seq, newSeq);
var cp = session.getProvider(JpaConnectionProvider.class);
var tableName = JpaUtils.getTableNameForNativeQuery("JGROUPS_PING", cp.getEntityManager());
String statement = String.format("INSERT INTO %s values (?, ?, ?, ?, ?)", tableName);
ExtendedUUID address = new ExtendedUUID(0, value);
long highestSequence = findHighestSequenceInTable(cp, clusterName, tableName);
long mySequence = getNextSequence(session.getKeycloakSessionFactory(), highestSequence);
return insertSequenceInTable(cp, clusterName, tableName, mySequence);
}
private static long findHighestSequenceInTable(JpaConnectionProvider cp, String clusterName, String tableName) {
return cp.getEntityManager().<Connection, Long>callWithConnection(con -> {
long maxSequence = -1;
try (PreparedStatement s = con.prepareStatement("SELECT address FROM %s WHERE cluster_name=?".formatted(tableName))) {
s.setString(1, clusterName);
try (ResultSet resultSet = s.executeQuery()) {
while (resultSet.next()) {
String uuid = resultSet.getString(1);
Address addr = org.jgroups.util.Util.addressFromString(uuid);
if (addr instanceof UUID uuidAddr && uuidAddr.getMostSignificantBits() == 0 && uuidAddr.getLeastSignificantBits() > maxSequence) {
maxSequence = uuidAddr.getLeastSignificantBits();
}
}
}
}
return maxSequence;
});
}
cp.getEntityManager().runWithConnection(o -> {
Connection con = (Connection) o;
try (PreparedStatement s = con.prepareStatement(statement)) {
private static long getNextSequence(KeycloakSessionFactory sf, long highestSequence) {
// Run this in a separate transaction so that we already write the new ID to the database.
// This helps us in case there is an inconsistency if some old restored database and some inconsistent nodes writing
// their node IDs to the table. Given this is being called in a retry loop, we'll be making some progress.
return KeycloakModelUtils.runJobInTransactionWithResult(sf, session -> {
var storage = session.getProvider(ServerConfigStorageProvider.class);
String seq = storage.loadOrCreate(JGROUPS_ADDRESS_SEQUENCE, () -> "0");
long value = Math.max(highestSequence + 1, Long.parseLong(seq) + 1);
storage.replace(JGROUPS_ADDRESS_SEQUENCE, seq, Long.toString(value));
return value;
});
}
private static Address insertSequenceInTable(JpaConnectionProvider cp, String clusterName, String tableName, long mySequence) {
ExtendedUUID address = new ExtendedUUID(0, mySequence);
cp.getEntityManager().<Connection>runWithConnection(con -> {
try (PreparedStatement s = con.prepareStatement("INSERT INTO %s values (?, ?, ?, ?, ?)".formatted(tableName))) {
s.setString(1, org.jgroups.util.Util.addressToString(new UUID(address.getMostSignificantBits(), address.getLeastSignificantBits()))); // address
s.setString(2, "(starting)"); // name
s.setString(3, clusterName); // cluster name
@ -286,7 +316,6 @@ public final class JGroupsConfigurator {
s.execute();
}
});
return address;
}