Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes and options for gRPC instrumentation #13443

Merged
merged 4 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void grpcInstrumentation() {
.hasAttributesSatisfyingExactly(
equalTo(
MessageIncubatingAttributes.MESSAGE_TYPE, "RECEIVED"),
equalTo(MessageIncubatingAttributes.MESSAGE_ID, 2L))),
equalTo(MessageIncubatingAttributes.MESSAGE_ID, 1L))),
span ->
span.hasName("example.Greeter/SayHello")
.hasKind(SpanKind.SERVER)
Expand All @@ -123,6 +123,6 @@ void grpcInstrumentation() {
.hasName("message")
.hasAttributesSatisfyingExactly(
equalTo(MessageIncubatingAttributes.MESSAGE_TYPE, "SENT"),
equalTo(MessageIncubatingAttributes.MESSAGE_ID, 2L)))));
equalTo(MessageIncubatingAttributes.MESSAGE_ID, 1L)))));
}
}
1 change: 1 addition & 0 deletions instrumentation/grpc-1.6/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

| System property | Type | Default | Description |
|-------------------------------------------------------------|---------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `otel.instrumentation.grpc.emit-message-events` | Boolean | `true` | Determines whether to emit span event for each individual message received and sent. |
| `otel.instrumentation.grpc.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. |
| `otel.instrumentation.grpc.capture-metadata.client.request` | String | | A comma-separated list of request metadata keys. gRPC client instrumentation will capture metadata values corresponding to configured keys as span attributes. |
| `otel.instrumentation.grpc.capture-metadata.server.request` | String | | A comma-separated list of request metadata keys. gRPC server instrumentation will capture metadata values corresponding to configured keys as span attributes. |
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public final class GrpcSingletons {
private static final AtomicReference<Context.Storage> STORAGE_REFERENCE = new AtomicReference<>();

static {
boolean emitMessageEvents =
AgentInstrumentationConfig.get()
.getBoolean("otel.instrumentation.grpc.emit-message-events", true);

boolean experimentalSpanAttributes =
AgentInstrumentationConfig.get()
.getBoolean("otel.instrumentation.grpc.experimental-span-attributes", false);
Expand All @@ -40,6 +44,7 @@ public final class GrpcSingletons {

GrpcTelemetry telemetry =
GrpcTelemetry.builder(GlobalOpenTelemetry.get())
.setEmitMessageEvents(emitMessageEvents)
.setCaptureExperimentalSpanAttributes(experimentalSpanAttributes)
.setCapturedClientRequestMetadata(clientRequestMetadata)
.setCapturedServerRequestMetadata(serverRequestMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,36 @@ public static GrpcTelemetryBuilder builder(OpenTelemetry openTelemetry) {
private final Instrumenter<GrpcRequest, Status> clientInstrumenter;
private final ContextPropagators propagators;
private final boolean captureExperimentalSpanAttributes;
private final boolean emitMessageEvents;

GrpcTelemetry(
Instrumenter<GrpcRequest, Status> serverInstrumenter,
Instrumenter<GrpcRequest, Status> clientInstrumenter,
ContextPropagators propagators,
boolean captureExperimentalSpanAttributes) {
boolean captureExperimentalSpanAttributes,
boolean emitMessageEvents) {
this.serverInstrumenter = serverInstrumenter;
this.clientInstrumenter = clientInstrumenter;
this.propagators = propagators;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.emitMessageEvents = emitMessageEvents;
}

/**
* Returns a new {@link ClientInterceptor} for use with methods like {@link
* io.grpc.ManagedChannelBuilder#intercept(ClientInterceptor...)}.
*/
public ClientInterceptor newClientInterceptor() {
return new TracingClientInterceptor(clientInstrumenter, propagators);
return new TracingClientInterceptor(
clientInstrumenter, propagators, captureExperimentalSpanAttributes, emitMessageEvents);
}

/**
* Returns a new {@link ServerInterceptor} for use with methods like {@link
* io.grpc.ServerBuilder#intercept(ServerInterceptor)}.
*/
public ServerInterceptor newServerInterceptor() {
return new TracingServerInterceptor(serverInstrumenter, captureExperimentalSpanAttributes);
return new TracingServerInterceptor(
serverInstrumenter, captureExperimentalSpanAttributes, emitMessageEvents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class GrpcTelemetryBuilder {
additionalServerExtractors = new ArrayList<>();

private boolean captureExperimentalSpanAttributes;
private boolean emitMessageEvents = true;
private List<String> capturedClientRequestMetadata = Collections.emptyList();
private List<String> capturedServerRequestMetadata = Collections.emptyList();

Expand Down Expand Up @@ -130,6 +131,16 @@ public GrpcTelemetryBuilder setPeerService(String peerService) {
return this;
}

/**
* Determines whether to add span event for each individual message received and sent. The default
* is true. Set this to false in case of streaming large volumes of messages.
*/
@CanIgnoreReturnValue
public GrpcTelemetryBuilder setEmitMessageEvents(boolean emitMessageEvents) {
this.emitMessageEvents = emitMessageEvents;
return this;
}

/**
* Sets whether experimental attributes should be set to spans. These attributes may be changed or
* removed in the future, so only enable this if you know you do not require attributes filled by
Expand Down Expand Up @@ -211,6 +222,7 @@ public GrpcTelemetry build() {
// So we go ahead and inject manually in this instrumentation.
clientInstrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysClient()),
openTelemetry.getPropagators(),
captureExperimentalSpanAttributes);
captureExperimentalSpanAttributes,
emitMessageEvents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@

final class TracingClientInterceptor implements ClientInterceptor {

private static final AttributeKey<Long> GRPC_RECEIVED_MESSAGE_COUNT =
AttributeKey.longKey("grpc.received.message_count");
private static final AttributeKey<Long> GRPC_SENT_MESSAGE_COUNT =
AttributeKey.longKey("grpc.sent.message_count");
// copied from MessageIncubatingAttributes
private static final AttributeKey<Long> MESSAGE_ID = AttributeKey.longKey("message.id");
private static final AttributeKey<String> MESSAGE_TYPE = AttributeKey.stringKey("message.type");
Expand All @@ -34,16 +38,27 @@ final class TracingClientInterceptor implements ClientInterceptor {
private static final String RECEIVED = "RECEIVED";

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<TracingClientCall> MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingClientCall.class, "messageId");
private static final AtomicLongFieldUpdater<TracingClientCall> SENT_MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingClientCall.class, "sentMessageId");

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<TracingClientCall> RECEIVED_MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingClientCall.class, "receivedMessageId");

private final Instrumenter<GrpcRequest, Status> instrumenter;
private final ContextPropagators propagators;
private final boolean captureExperimentalSpanAttributes;
private final boolean emitMessageEvents;

TracingClientInterceptor(
Instrumenter<GrpcRequest, Status> instrumenter, ContextPropagators propagators) {
Instrumenter<GrpcRequest, Status> instrumenter,
ContextPropagators propagators,
boolean captureExperimentalSpanAttributes,
boolean emitMessageEvents) {
this.instrumenter = instrumenter;
this.propagators = propagators;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.emitMessageEvents = emitMessageEvents;
}

@Override
Expand Down Expand Up @@ -75,9 +90,13 @@ final class TracingClientCall<REQUEST, RESPONSE>
private final Context context;
private final GrpcRequest request;

// Used by MESSAGE_ID_UPDATER
// Used by SENT_MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long sentMessageId;

// Used by RECEIVED_MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long messageId;
volatile long receivedMessageId;

TracingClientCall(
ClientCall<REQUEST, RESPONSE> delegate,
Expand Down Expand Up @@ -113,10 +132,11 @@ public void sendMessage(REQUEST message) {
instrumenter.end(context, request, Status.UNKNOWN, e);
throw e;
}
Span span = Span.fromContext(context);
Attributes attributes =
Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, MESSAGE_ID_UPDATER.incrementAndGet(this));
span.addEvent("message", attributes);
long messageId = SENT_MESSAGE_ID_UPDATER.incrementAndGet(this);
if (emitMessageEvents) {
Attributes attributes = Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, messageId);
Span.fromContext(context).addEvent("message", attributes);
}
}

final class TracingClientCallListener
Expand All @@ -139,14 +159,11 @@ final class TracingClientCallListener

@Override
public void onMessage(RESPONSE message) {
Span span = Span.fromContext(context);
Attributes attributes =
Attributes.of(
MESSAGE_TYPE,
RECEIVED,
MESSAGE_ID,
MESSAGE_ID_UPDATER.incrementAndGet(TracingClientCall.this));
span.addEvent("message", attributes);
long messageId = RECEIVED_MESSAGE_ID_UPDATER.incrementAndGet(TracingClientCall.this);
if (emitMessageEvents) {
Attributes attributes = Attributes.of(MESSAGE_TYPE, RECEIVED, MESSAGE_ID, messageId);
Span.fromContext(context).addEvent("message", attributes);
}
try (Scope ignored = context.makeCurrent()) {
delegate().onMessage(message);
}
Expand All @@ -155,6 +172,13 @@ public void onMessage(RESPONSE message) {
@Override
public void onClose(Status status, Metadata trailers) {
request.setPeerSocketAddress(getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
if (captureExperimentalSpanAttributes) {
Span span = Span.fromContext(context);
span.setAttribute(
GRPC_RECEIVED_MESSAGE_COUNT, RECEIVED_MESSAGE_ID_UPDATER.get(TracingClientCall.this));
span.setAttribute(
GRPC_SENT_MESSAGE_COUNT, SENT_MESSAGE_ID_UPDATER.get(TracingClientCall.this));
}
instrumenter.end(context, request, status, status.getCause());
try (Scope ignored = parentContext.makeCurrent()) {
delegate().onClose(status, trailers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@

final class TracingServerInterceptor implements ServerInterceptor {

private static final AttributeKey<Boolean> GRPC_CANCELED =
AttributeKey.booleanKey("grpc.canceled");
private static final AttributeKey<Long> GRPC_RECEIVED_MESSAGE_COUNT =
AttributeKey.longKey("grpc.received.message_count");
private static final AttributeKey<Long> GRPC_SENT_MESSAGE_COUNT =
AttributeKey.longKey("grpc.sent.message_count");
// copied from MessageIncubatingAttributes
private static final AttributeKey<Long> MESSAGE_ID = AttributeKey.longKey("message.id");
private static final AttributeKey<String> MESSAGE_TYPE = AttributeKey.stringKey("message.type");
Expand All @@ -33,19 +39,27 @@ final class TracingServerInterceptor implements ServerInterceptor {
private static final String RECEIVED = "RECEIVED";

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<TracingServerCall> MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingServerCall.class, "messageId");
private static final AtomicLongFieldUpdater<TracingServerCall> SENT_MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingServerCall.class, "sentMessageId");

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<TracingServerCall> RECEIVED_MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingServerCall.class, "receivedMessageId");

private static final Metadata.Key<String> AUTHORITY_KEY =
InternalMetadata.keyOf(":authority", Metadata.ASCII_STRING_MARSHALLER);

private final Instrumenter<GrpcRequest, Status> instrumenter;
private final boolean captureExperimentalSpanAttributes;
private final boolean emitMessageEvents;

TracingServerInterceptor(
Instrumenter<GrpcRequest, Status> instrumenter, boolean captureExperimentalSpanAttributes) {
Instrumenter<GrpcRequest, Status> instrumenter,
boolean captureExperimentalSpanAttributes,
boolean emitMessageEvents) {
this.instrumenter = instrumenter;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.emitMessageEvents = emitMessageEvents;
}

@Override
Expand Down Expand Up @@ -85,9 +99,13 @@ final class TracingServerCall<REQUEST, RESPONSE>
private final GrpcRequest request;
private Status status;

// Used by MESSAGE_ID_UPDATER
// Used by SENT_MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long sentMessageId;

// Used by RECEIVED_MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long messageId;
volatile long receivedMessageId;

TracingServerCall(
ServerCall<REQUEST, RESPONSE> delegate, Context context, GrpcRequest request) {
Expand All @@ -106,10 +124,11 @@ public void sendMessage(RESPONSE message) {
try (Scope ignored = context.makeCurrent()) {
super.sendMessage(message);
}
Span span = Span.fromContext(context);
Attributes attributes =
Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, MESSAGE_ID_UPDATER.incrementAndGet(this));
span.addEvent("message", attributes);
long messageId = SENT_MESSAGE_ID_UPDATER.incrementAndGet(this);
if (emitMessageEvents) {
Attributes attributes = Attributes.of(MESSAGE_TYPE, SENT, MESSAGE_ID, messageId);
Span.fromContext(context).addEvent("message", attributes);
}
}

@Override
Expand All @@ -134,15 +153,27 @@ final class TracingServerCallListener
this.request = request;
}

private void end(Context context, GrpcRequest request, Status response, Throwable error) {
if (captureExperimentalSpanAttributes) {
Span span = Span.fromContext(context);
span.setAttribute(
GRPC_RECEIVED_MESSAGE_COUNT, RECEIVED_MESSAGE_ID_UPDATER.get(TracingServerCall.this));
span.setAttribute(
GRPC_SENT_MESSAGE_COUNT, SENT_MESSAGE_ID_UPDATER.get(TracingServerCall.this));
if (Status.CANCELLED.equals(status)) {
span.setAttribute(GRPC_CANCELED, true);
}
}
instrumenter.end(context, request, response, error);
}

@Override
public void onMessage(REQUEST message) {
Attributes attributes =
Attributes.of(
MESSAGE_TYPE,
RECEIVED,
MESSAGE_ID,
MESSAGE_ID_UPDATER.incrementAndGet(TracingServerCall.this));
Span.fromContext(context).addEvent("message", attributes);
long messageId = RECEIVED_MESSAGE_ID_UPDATER.incrementAndGet(TracingServerCall.this);
if (emitMessageEvents) {
Attributes attributes = Attributes.of(MESSAGE_TYPE, RECEIVED, MESSAGE_ID, messageId);
Span.fromContext(context).addEvent("message", attributes);
}
delegate().onMessage(message);
}

Expand All @@ -160,36 +191,33 @@ public void onHalfClose() {
public void onCancel() {
try {
delegate().onCancel();
if (captureExperimentalSpanAttributes) {
Span.fromContext(context).setAttribute("grpc.canceled", true);
}
} catch (Throwable e) {
instrumenter.end(context, request, Status.UNKNOWN, e);
end(context, request, Status.UNKNOWN, e);
throw e;
}
instrumenter.end(context, request, Status.CANCELLED, null);
end(context, request, Status.CANCELLED, null);
}

@Override
public void onComplete() {
try {
delegate().onComplete();
} catch (Throwable e) {
instrumenter.end(context, request, Status.UNKNOWN, e);
end(context, request, Status.UNKNOWN, e);
throw e;
}
if (status == null) {
status = Status.UNKNOWN;
}
instrumenter.end(context, request, status, status.getCause());
end(context, request, status, status.getCause());
}

@Override
public void onReady() {
try {
delegate().onReady();
} catch (Throwable e) {
instrumenter.end(context, request, Status.UNKNOWN, e);
end(context, request, Status.UNKNOWN, e);
throw e;
}
}
Expand Down
Loading
Loading