Skip to content

Commit 2dff526

Browse files
authored
feat: Fix context prop for netty instrumentation (#426)
* netty-4.1's fix to correct context prop * netty-4.0's fix to correct context prop * spotless apply
1 parent 680b391 commit 2dff526

File tree

6 files changed

+327
-69
lines changed

6 files changed

+327
-69
lines changed

instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/NettyChannelPipelineInstrumentation.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.HttpClientRequestTracingHandler;
3737
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.HttpClientResponseTracingHandler;
3838
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.HttpClientTracingHandler;
39+
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.OtelHttpClientRequestTracingHandler;
3940
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server.HttpServerBlockingRequestHandler;
4041
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server.HttpServerRequestTracingHandler;
4142
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server.HttpServerResponseTracingHandler;
@@ -135,14 +136,13 @@ public static void addHandler(
135136
HttpClientTracingHandler.class.getName(),
136137
new HttpClientTracingHandler());
137138

138-
// add OTEL request handler to start spans
139+
// add our custom request handler to start spans with proper context propagation
139140
pipeline.addAfter(
140141
HttpClientTracingHandler.class.getName(),
141142
io.opentelemetry.javaagent.instrumentation.netty.v4_0.client
142143
.HttpClientRequestTracingHandler.class
143144
.getName(),
144-
new io.opentelemetry.javaagent.instrumentation.netty.v4_0.client
145-
.HttpClientRequestTracingHandler());
145+
new OtelHttpClientRequestTracingHandler());
146146
} else if (handler instanceof HttpRequestEncoder) {
147147
pipeline.addLast(
148148
HttpClientRequestTracingHandler.class.getName(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright The Hypertrace Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client;
18+
19+
import io.netty.channel.ChannelHandlerContext;
20+
import io.netty.channel.ChannelPromise;
21+
import io.netty.handler.codec.http.HttpRequest;
22+
import io.opentelemetry.api.trace.Span;
23+
import io.opentelemetry.api.trace.SpanContext;
24+
import io.opentelemetry.context.Context;
25+
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys;
26+
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.HttpClientRequestTracingHandler;
27+
import java.util.concurrent.ConcurrentHashMap;
28+
29+
/**
30+
* Custom extension of OpenTelemetry's HttpClientRequestTracingHandler that ensures proper context
31+
* propagation by using Context.current() as the parent context.
32+
*/
33+
public class OtelHttpClientRequestTracingHandler extends HttpClientRequestTracingHandler {
34+
35+
// Store the server context for each thread
36+
private static final ThreadLocal<Context> SERVER_CONTEXT = new ThreadLocal<>();
37+
38+
// Store the mapping from thread ID to server span context (for cross-thread scenarios)
39+
private static final ConcurrentHashMap<Long, SpanContext> THREAD_TO_SPAN_CONTEXT =
40+
new ConcurrentHashMap<>();
41+
42+
// Maximum size for the thread map before triggering cleanup
43+
private static final int MAX_THREAD_MAP_SIZE = 1000;
44+
45+
// Cleanup flag to avoid excessive synchronized blocks
46+
private static volatile boolean cleanupNeeded = false;
47+
48+
public OtelHttpClientRequestTracingHandler() {
49+
super();
50+
}
51+
52+
/**
53+
* Stores the current context as the server context for this thread. This should be called from
54+
* the server handler.
55+
*/
56+
public static void storeServerContext(Context context) {
57+
SERVER_CONTEXT.set(context);
58+
59+
// Also store the span context by thread ID for cross-thread lookup
60+
Span span = Span.fromContext(context);
61+
if (span != null && span.getSpanContext().isValid()) {
62+
THREAD_TO_SPAN_CONTEXT.put(Thread.currentThread().getId(), span.getSpanContext());
63+
64+
// Check if we need to clean up the map
65+
if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) {
66+
cleanupNeeded = true;
67+
}
68+
}
69+
}
70+
71+
/**
72+
* Perform cleanup of the thread map if it has grown too large. This is done in a synchronized
73+
* block to prevent concurrent modification issues.
74+
*/
75+
private static void cleanupThreadMapIfNeeded() {
76+
if (cleanupNeeded) {
77+
synchronized (THREAD_TO_SPAN_CONTEXT) {
78+
if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) {
79+
THREAD_TO_SPAN_CONTEXT.clear();
80+
cleanupNeeded = false;
81+
}
82+
}
83+
}
84+
}
85+
86+
@Override
87+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) {
88+
try {
89+
if (!(msg instanceof HttpRequest)) {
90+
super.write(ctx, msg, prm);
91+
return;
92+
}
93+
94+
Context parentContext = SERVER_CONTEXT.get();
95+
96+
// Fallback -> If no context in thread local, try Context.current()
97+
if (parentContext == null) {
98+
parentContext = Context.current();
99+
}
100+
101+
// Store the parent context in the channel attributes
102+
// This is used by the Opentelemetry's HttpClientRequestTracingHandler in propagating correct
103+
// context.
104+
ctx.channel().attr(AttributeKeys.CLIENT_PARENT_CONTEXT).set(parentContext);
105+
106+
// Call the parent implementation which will use our stored parent context
107+
super.write(ctx, msg, prm);
108+
109+
// Clean up after use to prevent memory leaks
110+
SERVER_CONTEXT.remove();
111+
THREAD_TO_SPAN_CONTEXT.remove(Thread.currentThread().getId());
112+
cleanupThreadMapIfNeeded();
113+
114+
} catch (Exception ignored) {
115+
}
116+
}
117+
}

instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/HttpServerRequestTracingHandler.java

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@
2727
import io.opentelemetry.api.common.AttributeKey;
2828
import io.opentelemetry.api.trace.Span;
2929
import io.opentelemetry.context.Context;
30+
import io.opentelemetry.context.Scope;
3031
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.AttributeKeys;
3132
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.DataCaptureUtils;
33+
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.OtelHttpClientRequestTracingHandler;
3234
import java.nio.charset.Charset;
3335
import java.util.HashMap;
3436
import java.util.Map;
@@ -57,47 +59,54 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
5759
ctx.fireChannelRead(msg);
5860
return;
5961
}
60-
Span span = Span.fromContext(context);
6162

62-
if (msg instanceof HttpRequest) {
63-
HttpRequest httpRequest = (HttpRequest) msg;
63+
// Store the server context in our ThreadLocal for later use by client handlers
64+
// This is CRITICAL for proper context propagation to client spans
65+
OtelHttpClientRequestTracingHandler.storeServerContext(context);
6466

65-
Map<String, String> headersMap = headersToMap(httpRequest);
66-
if (instrumentationConfig.httpHeaders().request()) {
67-
headersMap.forEach(span::setAttribute);
68-
}
69-
// used by blocking handler
70-
channel.attr(AttributeKeys.REQUEST_HEADERS).set(headersMap);
67+
try (Scope ignored = context.makeCurrent()) {
68+
Span span = Span.fromContext(context);
69+
70+
if (msg instanceof HttpRequest) {
71+
HttpRequest httpRequest = (HttpRequest) msg;
7172

72-
CharSequence contentType = DataCaptureUtils.getContentType(httpRequest);
73-
if (instrumentationConfig.httpBody().request()
74-
&& contentType != null
75-
&& ContentTypeUtils.shouldCapture(contentType.toString())) {
73+
Map<String, String> headersMap = headersToMap(httpRequest);
74+
if (instrumentationConfig.httpHeaders().request()) {
75+
headersMap.forEach(span::setAttribute);
76+
}
77+
// used by blocking handler
78+
channel.attr(AttributeKeys.REQUEST_HEADERS).set(headersMap);
7679

77-
CharSequence contentLengthHeader = DataCaptureUtils.getContentLength(httpRequest);
78-
int contentLength = ContentLengthUtils.parseLength(contentLengthHeader);
80+
CharSequence contentType = DataCaptureUtils.getContentType(httpRequest);
81+
if (instrumentationConfig.httpBody().request()
82+
&& contentType != null
83+
&& ContentTypeUtils.shouldCapture(contentType.toString())) {
7984

80-
String charsetString = ContentTypeUtils.parseCharset(contentType.toString());
81-
Charset charset = ContentTypeCharsetUtils.toCharset(charsetString);
85+
CharSequence contentLengthHeader = DataCaptureUtils.getContentLength(httpRequest);
86+
int contentLength = ContentLengthUtils.parseLength(contentLengthHeader);
8287

83-
// set the buffer to capture response body
84-
// the buffer is used byt captureBody method
85-
Attribute<BoundedByteArrayOutputStream> bufferAttr =
86-
ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER);
87-
bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset));
88+
String charsetString = ContentTypeUtils.parseCharset(contentType.toString());
89+
Charset charset = ContentTypeCharsetUtils.toCharset(charsetString);
8890

89-
channel.attr(AttributeKeys.CHARSET).set(charset);
91+
// set the buffer to capture response body
92+
// the buffer is used byt captureBody method
93+
Attribute<BoundedByteArrayOutputStream> bufferAttr =
94+
ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER);
95+
bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset));
96+
97+
channel.attr(AttributeKeys.CHARSET).set(charset);
98+
}
9099
}
91-
}
92100

93-
if ((msg instanceof HttpContent || msg instanceof ByteBuf)
94-
&& instrumentationConfig.httpBody().request()) {
95-
Charset charset = channel.attr(AttributeKeys.CHARSET).get();
96-
if (charset == null) {
97-
charset = ContentTypeCharsetUtils.getDefaultCharset();
101+
if ((msg instanceof HttpContent || msg instanceof ByteBuf)
102+
&& instrumentationConfig.httpBody().request()) {
103+
Charset charset = channel.attr(AttributeKeys.CHARSET).get();
104+
if (charset == null) {
105+
charset = ContentTypeCharsetUtils.getDefaultCharset();
106+
}
107+
DataCaptureUtils.captureBody(
108+
span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset);
98109
}
99-
DataCaptureUtils.captureBody(
100-
span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset);
101110
}
102111

103112
ctx.fireChannelRead(msg);

instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyChannelPipelineInstrumentation.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.HttpClientRequestTracingHandler;
3737
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.HttpClientResponseTracingHandler;
3838
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.HttpClientTracingHandler;
39+
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client.OtelHttpClientRequestTracingHandler;
3940
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server.HttpServerBlockingRequestHandler;
4041
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server.HttpServerRequestTracingHandler;
4142
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server.HttpServerResponseTracingHandler;
@@ -133,14 +134,13 @@ public static void addHandler(
133134
HttpClientTracingHandler.class.getName(),
134135
new HttpClientTracingHandler());
135136

136-
// add OTEL request handler to start spans
137+
// add our custom request handler to start spans with proper context propagation
137138
pipeline.addAfter(
138139
HttpClientTracingHandler.class.getName(),
139140
io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientTracingHandler
140141
.class
141142
.getName(),
142-
new io.opentelemetry.instrumentation.netty.v4_1.internal.client
143-
.HttpClientRequestTracingHandler(NettyClientSingletons.instrumenter()));
143+
new OtelHttpClientRequestTracingHandler(NettyClientSingletons.instrumenter()));
144144
} else if (handler instanceof HttpRequestEncoder) {
145145
pipeline.addLast(
146146
HttpClientRequestTracingHandler.class.getName(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright The Hypertrace Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.client;
18+
19+
import io.netty.channel.ChannelHandlerContext;
20+
import io.netty.channel.ChannelPromise;
21+
import io.netty.handler.codec.http.HttpRequest;
22+
import io.netty.handler.codec.http.HttpResponse;
23+
import io.opentelemetry.api.trace.Span;
24+
import io.opentelemetry.api.trace.SpanContext;
25+
import io.opentelemetry.context.Context;
26+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
27+
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
28+
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
29+
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientRequestTracingHandler;
30+
import java.util.concurrent.ConcurrentHashMap;
31+
32+
/**
33+
* Custom extension of OpenTelemetry's HttpClientRequestTracingHandler that ensures proper context
34+
* propagation by using Context.current() as the parent context.
35+
*/
36+
public class OtelHttpClientRequestTracingHandler extends HttpClientRequestTracingHandler {
37+
38+
// Store the server context for each thread
39+
private static final ThreadLocal<Context> SERVER_CONTEXT = new ThreadLocal<>();
40+
41+
// Store the mapping from thread ID to server span context (for cross-thread scenarios)
42+
private static final ConcurrentHashMap<Long, SpanContext> THREAD_TO_SPAN_CONTEXT =
43+
new ConcurrentHashMap<>();
44+
45+
// Maximum size for the thread map before triggering cleanup
46+
private static final int MAX_THREAD_MAP_SIZE = 1000;
47+
48+
// Cleanup flag to avoid excessive synchronized blocks
49+
private static volatile boolean cleanupNeeded = false;
50+
51+
public OtelHttpClientRequestTracingHandler(
52+
Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter) {
53+
super(instrumenter);
54+
}
55+
56+
/**
57+
* Stores the current context as the server context for this thread. This should be called from
58+
* the server handler.
59+
*/
60+
public static void storeServerContext(Context context) {
61+
SERVER_CONTEXT.set(context);
62+
63+
// Also store the span context by thread ID for cross-thread lookup
64+
Span span = Span.fromContext(context);
65+
if (span != null && span.getSpanContext().isValid()) {
66+
THREAD_TO_SPAN_CONTEXT.put(Thread.currentThread().getId(), span.getSpanContext());
67+
68+
// Check if we need to clean up the map
69+
if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) {
70+
cleanupNeeded = true;
71+
}
72+
}
73+
}
74+
75+
/**
76+
* Perform cleanup of the thread map if it has grown too large. This is done in a synchronized
77+
* block to prevent concurrent modification issues.
78+
*/
79+
private static void cleanupThreadMapIfNeeded() {
80+
if (cleanupNeeded) {
81+
synchronized (THREAD_TO_SPAN_CONTEXT) {
82+
if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) {
83+
THREAD_TO_SPAN_CONTEXT.clear();
84+
cleanupNeeded = false;
85+
}
86+
}
87+
}
88+
}
89+
90+
@Override
91+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) {
92+
try {
93+
if (!(msg instanceof HttpRequest)) {
94+
super.write(ctx, msg, prm);
95+
return;
96+
}
97+
98+
Context parentContext = SERVER_CONTEXT.get();
99+
100+
// Fallback -> If no context in thread local, try Context.current()
101+
if (parentContext == null) {
102+
parentContext = Context.current();
103+
}
104+
105+
// Store the parent context in the channel attributes
106+
// This is used by the Opentelemetry's HttpClientRequestTracingHandler in propagating correct
107+
// context.
108+
ctx.channel().attr(AttributeKeys.CLIENT_PARENT_CONTEXT).set(parentContext);
109+
110+
// Call the parent implementation which will use our stored parent context
111+
super.write(ctx, msg, prm);
112+
113+
// Clean up after use to prevent memory leaks
114+
SERVER_CONTEXT.remove();
115+
THREAD_TO_SPAN_CONTEXT.remove(Thread.currentThread().getId());
116+
cleanupThreadMapIfNeeded();
117+
118+
} catch (Exception ignored) {
119+
}
120+
}
121+
}

0 commit comments

Comments
 (0)