Register new protocols to avoid exceptions on startup

Closes #43337

Signed-off-by: Alexander Schwartz <alexander.schwartz@ibm.com>
This commit is contained in:
Alexander Schwartz 2025-10-10 12:43:13 +02:00 committed by GitHub
parent 0c3a042029
commit cce230818e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 296 additions and 0 deletions

View File

@ -0,0 +1,98 @@
/*
* Copyright 2025 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.jgroups.header;
import org.jgroups.Header;
import org.jgroups.util.Util;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
/**
* Header which carries an OpenTelemetry {@link io.opentelemetry.api.trace.Span} between requests and responses
*
* @author Bela Ban
* @since 1.0.0
*/
public class TracerHeader extends Header {
public static final short ID = 1050;
protected final Map<String, String> ctx = new HashMap<>();
public TracerHeader() {
}
public short getMagicId() {
return ID;
}
public Supplier<? extends Header> create() {
return TracerHeader::new;
}
public void put(String key, String value) {
ctx.put(key, value);
}
public String get(String key) {
return ctx.get(key);
}
public Set<String> keys() {
return ctx.keySet();
}
public int serializedSize() {
int size = Integer.BYTES;
int num_attrs = ctx.size();
if (num_attrs > 0) {
for (Map.Entry<String, String> entry : ctx.entrySet()) {
String key = entry.getKey();
String val = entry.getValue();
size += Util.size(key) + Util.size(val);
}
}
return size;
}
public void writeTo(DataOutput out) throws IOException {
out.writeInt(ctx.size());
if (!ctx.isEmpty()) {
for (Map.Entry<String, String> e : ctx.entrySet()) {
Util.writeString(e.getKey(), out);
Util.writeString(e.getValue(), out);
}
}
}
public void readFrom(DataInput in) throws IOException {
int size = in.readInt();
if (size > 0) {
for (int i = 0; i < size; i++)
ctx.put(Util.readString(in), Util.readString(in));
}
}
public String toString() {
return ctx.toString();
}
}

View File

@ -0,0 +1,194 @@
/*
* Copyright 2025 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.jgroups.protocol;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import org.jgroups.Message;
import org.jgroups.Version;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;
import org.keycloak.jgroups.header.TracerHeader;
import java.util.ArrayList;
import java.util.List;
/**
* Provides Open Telemetry (https://opentelemetry.io/) tracing for JGroups. It should be placed just above the
* transport.<br/>
* When a message is sent, a {@link TracerHeader} is added with the (optional) parent span.
* When received a new span is started (as a child span, if the parent span in the header is non-null), and ended when
* the the thread returns.
*
* @author Bela Ban
* @since 1.0.0
*/
@MBean(description = "Records OpenTelemetry traces of sent and received messages")
public class OPEN_TELEMETRY extends Protocol {
public static final short OPEN_TELEMETRY_ID = 1026;
protected OpenTelemetry otel;
protected Tracer tracer;
@Property(description = "When active, traces are recorded, otherwise not")
protected boolean active = true;
public boolean active() {
return active;
}
public OPEN_TELEMETRY active(boolean f) {
active = activate(f);
return this;
}
public void start() throws Exception {
super.start();
activate(active);
}
public Object down(Message msg) {
if (!active || !Span.current().getSpanContext().isValid())
return down_prot.down(msg);
SpanBuilder spanBuilder = tracer.spanBuilder("JGroups.sendSingleMessage");
if (Span.current().isRecording()) {
if (msg.getDest() != null) {
spanBuilder.setAttribute("kc.jgroups.dest", msg.getDest().toString());
}
if (msg.getSrc() != null) {
spanBuilder.setAttribute("kc.jgroups.src", msg.getSrc().toString());
}
}
Span span = spanBuilder.startSpan();
try (var ignored = span.makeCurrent()) {
TracerHeader hdr = new TracerHeader();
populateHeader(hdr); // will populate if a span exists (created by the caller)
msg.putHeader(OPEN_TELEMETRY_ID, hdr);
return down_prot.down(msg);
} catch (Throwable t) {
span.setStatus(StatusCode.ERROR, String.format("failed delivering single message to %s", msg.dest()));
span.recordException(t);
throw t;
} finally {
span.end();
}
}
public Object up(Message msg) {
if (!active)
return up_prot.up(msg);
TracerHeader hdr = msg.getHeader(OPEN_TELEMETRY_ID);
if (hdr != null) {
Context extractedContext = otel.getPropagators().getTextMapPropagator()
.extract(Context.current(), hdr, TEXT_MAP_GETTER);
Span span = tracer.spanBuilder("JGroups.deliverSingleMessage")
.setSpanKind(SpanKind.SERVER)
.setParent(extractedContext).startSpan();
try (Scope ignored = span.makeCurrent()) {
span.setAttribute("from", msg.src().toString());
return up_prot.up(msg);
} catch (Throwable t) {
span.setStatus(StatusCode.ERROR, String.format("failed delivering single message from %s", msg.src()));
span.recordException(t);
throw t;
} finally {
span.end();
}
} else {
return up_prot.up(msg);
}
}
public void up(MessageBatch batch) {
if (!active) {
if (!batch.isEmpty())
up_prot.up(batch);
return;
}
List<Span> spans = new ArrayList<>(batch.size());
int index = 0, batch_size = batch.size();
for (Message msg : batch) {
index++;
TracerHeader hdr = msg.getHeader(OPEN_TELEMETRY_ID);
if (hdr != null) {
Context extractedContext = otel.getPropagators().getTextMapPropagator()
.extract(Context.current(), hdr, TEXT_MAP_GETTER);
Span span = tracer.spanBuilder("deliver-batched-msg")
.setSpanKind(SpanKind.SERVER)
.setParent(extractedContext).startSpan();
span.setAttribute("batch-msg", String.format("%d/%d", index, batch_size));
spans.add(span);
}
}
try {
if (!batch.isEmpty())
up_prot.up(batch);
} catch (Throwable t) {
spans.forEach(s -> {
s.setStatus(StatusCode.ERROR, String.format("failed delivering batched message from %s", batch.sender()))
.recordException(t);
});
throw t;
} finally {
spans.forEach(Span::end);
}
}
protected static void populateHeader(TracerHeader hdr) {
// Inject the request with the *current* Context, which contains our current Span.
W3CTraceContextPropagator.getInstance().inject(Context.current(), hdr, (carrier, key, val) -> hdr.put(key, val));
}
protected static final TextMapGetter<TracerHeader> TEXT_MAP_GETTER =
new TextMapGetter<>() {
@Override
public String get(TracerHeader carrier, String key) {
return carrier.get(key);
}
@Override
public Iterable<String> keys(TracerHeader carrier) {
return carrier.keys();
}
};
protected boolean activate(boolean flag) {
if (flag && otel == null)
otel = GlobalOpenTelemetry.get();
if (flag && tracer == null)
tracer = otel.getTracer("org.jgroups.trace", Version.printVersion());
return flag;
}
}

View File

@ -31,6 +31,8 @@ import org.jgroups.conf.ProtocolConfiguration;
import org.jgroups.protocols.JDBC_PING2;
import org.keycloak.connections.jpa.JpaConnectionProvider;
import org.keycloak.connections.jpa.util.JpaUtils;
import org.keycloak.jgroups.header.TracerHeader;
import org.keycloak.jgroups.protocol.OPEN_TELEMETRY;
import org.keycloak.models.KeycloakSession;
import org.keycloak.quarkus.runtime.storage.infinispan.CacheManagerFactory;
import org.keycloak.quarkus.runtime.storage.infinispan.jgroups.JGroupsStackConfigurator;
@ -89,6 +91,8 @@ public class JGroupsJdbcPingStackConfigurator implements JGroupsStackConfigurato
// Use custom Keycloak JDBC_PING implementation that workarounds issue https://issues.redhat.com/browse/JGRP-2870
// The id 1025 follows this instruction: https://github.com/belaban/JGroups/blob/38219e9ec1c629fa2f7929e3b53d1417d8e60b61/conf/jg-protocol-ids.xml#L85
ClassConfigurator.addProtocol((short) 1025, KEYCLOAK_JDBC_PING2.class);
ClassConfigurator.addProtocol((short) 1026, OPEN_TELEMETRY.class);
ClassConfigurator.add(TracerHeader.ID, TracerHeader.class);
return List.of(new ProtocolConfiguration(KEYCLOAK_JDBC_PING2.class.getName(), attributes));
}