Skip to content

Commit 0058126

Browse files
committed
Merge remote-tracking branch 'origin/main' into merge-main-develop-strands
2 parents 6a04fd4 + 060ddbd commit 0058126

24 files changed

+1072
-669
lines changed

newrelic/api/asgi_application.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,20 @@ async def send_inject_browser_agent(self, message):
132132

133133
message_type = message["type"]
134134
if message_type == "http.response.start" and not self.initial_message:
135-
headers = list(message.get("headers", ()))
135+
# message["headers"] may be a generator, and consuming it via process_response will leave the original
136+
# application with no headers. Fix this by preserving them in a list before consuming them.
137+
if "headers" in message:
138+
message["headers"] = headers = list(message["headers"])
139+
else:
140+
headers = []
141+
142+
# Check if we should insert the HTML snippet based on the headers.
143+
# Currently if there are no headers this will always be False, but call the function
144+
# anyway in case this logic changes in the future.
136145
if not self.should_insert_html(headers):
137146
await self.abort()
138147
return
148+
139149
message["headers"] = headers
140150
self.initial_message = message
141151
elif message_type == "http.response.body" and self.initial_message:
@@ -232,7 +242,13 @@ async def send(self, event):
232242
finally:
233243
self.__exit__(*sys.exc_info())
234244
elif event["type"] == "http.response.start":
235-
self.process_response(event["status"], event.get("headers", ()))
245+
# event["headers"] may be a generator, and consuming it via process_response will leave the original
246+
# ASGI application with no headers. Fix this by preserving them in a list before consuming them.
247+
if "headers" in event:
248+
event["headers"] = headers = list(event["headers"])
249+
else:
250+
headers = []
251+
self.process_response(event["status"], headers)
236252
return await self._send(event)
237253

238254

newrelic/config.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2084,6 +2084,10 @@ def _process_module_builtin_defaults():
20842084
"asyncio.base_events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_base_events"
20852085
)
20862086

2087+
_process_module_definition("asyncio.events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_events")
2088+
2089+
_process_module_definition("asyncio.runners", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_runners")
2090+
20872091
_process_module_definition(
20882092
"langchain_core.runnables.base",
20892093
"newrelic.hooks.mlmodel_langchain",
@@ -2671,8 +2675,6 @@ def _process_module_builtin_defaults():
26712675
"langchain_core.callbacks.manager", "newrelic.hooks.mlmodel_langchain", "instrument_langchain_callbacks_manager"
26722676
)
26732677

2674-
_process_module_definition("asyncio.events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_events")
2675-
26762678
_process_module_definition("asgiref.sync", "newrelic.hooks.adapter_asgiref", "instrument_asgiref_sync")
26772679

26782680
_process_module_definition(

newrelic/hooks/coroutines_asyncio.py

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,73 @@
1616
from newrelic.core.trace_cache import trace_cache
1717

1818

19-
def remove_from_cache(task):
19+
def remove_from_cache_callback(task):
2020
cache = trace_cache()
2121
cache.task_stop(task)
2222

2323

24-
def propagate_task_context(task):
24+
def wrap_create_task(task):
2525
trace_cache().task_start(task)
26-
task.add_done_callback(remove_from_cache)
26+
task.add_done_callback(remove_from_cache_callback)
2727
return task
2828

2929

30-
def _bind_loop(loop, *args, **kwargs):
30+
def _instrument_event_loop(loop):
31+
if loop and hasattr(loop, "create_task") and not hasattr(loop.create_task, "__wrapped__"):
32+
wrap_out_function(loop, "create_task", wrap_create_task)
33+
34+
35+
def _bind_set_event_loop(loop, *args, **kwargs):
3136
return loop
3237

3338

34-
def wrap_create_task(wrapped, instance, args, kwargs):
35-
loop = _bind_loop(*args, **kwargs)
39+
def wrap_set_event_loop(wrapped, instance, args, kwargs):
40+
loop = _bind_set_event_loop(*args, **kwargs)
3641

37-
if loop and not hasattr(loop.create_task, "__wrapped__"):
38-
wrap_out_function(loop, "create_task", propagate_task_context)
42+
_instrument_event_loop(loop)
3943

4044
return wrapped(*args, **kwargs)
4145

4246

47+
def wrap__lazy_init(wrapped, instance, args, kwargs):
48+
result = wrapped(*args, **kwargs)
49+
# This logic can be used for uvloop, but should
50+
# work for any valid custom loop factory.
51+
52+
# A custom loop_factory will be used to create
53+
# a new event loop instance. It will then run
54+
# the main() coroutine on this event loop. Once
55+
# this coroutine is complete, the event loop will
56+
# be stopped and closed.
57+
58+
# The new loop that is created and set as the
59+
# running loop of the duration of the run() call.
60+
# When the coroutine starts, it runs in the context
61+
# that was active when run() was called. Any tasks
62+
# created within this coroutine on this new event
63+
# loop will inherit that context.
64+
65+
# Note: The loop created by loop_factory is never
66+
# set as the global current loop for the thread,
67+
# even while it is running.
68+
loop = instance._loop
69+
_instrument_event_loop(loop)
70+
71+
return result
72+
73+
4374
def instrument_asyncio_base_events(module):
44-
wrap_out_function(module, "BaseEventLoop.create_task", propagate_task_context)
75+
wrap_out_function(module, "BaseEventLoop.create_task", wrap_create_task)
4576

4677

4778
def instrument_asyncio_events(module):
4879
if hasattr(module, "_BaseDefaultEventLoopPolicy"): # Python >= 3.14
49-
wrap_function_wrapper(module, "_BaseDefaultEventLoopPolicy.set_event_loop", wrap_create_task)
50-
else: # Python <= 3.13
51-
wrap_function_wrapper(module, "BaseDefaultEventLoopPolicy.set_event_loop", wrap_create_task)
80+
wrap_function_wrapper(module, "_BaseDefaultEventLoopPolicy.set_event_loop", wrap_set_event_loop)
81+
elif hasattr(module, "BaseDefaultEventLoopPolicy"): # Python <= 3.13
82+
wrap_function_wrapper(module, "BaseDefaultEventLoopPolicy.set_event_loop", wrap_set_event_loop)
83+
84+
85+
# For Python >= 3.11
86+
def instrument_asyncio_runners(module):
87+
if hasattr(module, "Runner") and hasattr(module.Runner, "_lazy_init"):
88+
wrap_function_wrapper(module, "Runner._lazy_init", wrap__lazy_init)

newrelic/hooks/external_aiobotocore.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,17 @@ async def wrap_client__make_api_call(wrapped, instance, args, kwargs):
149149
bedrock_attrs = extract_bedrock_converse_attrs(
150150
args[1], response, response_headers, model, span_id, trace_id
151151
)
152+
153+
if response_streaming:
154+
# Wrap EventStream object here to intercept __iter__ method instead of instrumenting class.
155+
# This class is used in numerous other services in botocore, and would cause conflicts.
156+
response["stream"] = stream = AsyncEventStreamWrapper(response["stream"])
157+
stream._nr_ft = ft or None
158+
stream._nr_bedrock_attrs = bedrock_attrs or {}
159+
stream._nr_model_extractor = stream_extractor or None
160+
stream._nr_is_converse = True
161+
return response
162+
152163
else:
153164
bedrock_attrs = {
154165
"request_id": response_headers.get("x-amzn-requestid"),

0 commit comments

Comments
 (0)