Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion newrelic/core/custom_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def create_custom_event(event_type, params, settings=None, is_ml_event=False):
)
return None

intrinsics = {"type": name, "timestamp": int(1000.0 * time.time())}
intrinsics = {"type": name, "timestamp": params.get("timestamp") or int(1000.0 * time.time())}

event = [intrinsics, attributes]
return event
90 changes: 70 additions & 20 deletions newrelic/hooks/external_botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import logging
import re
import sys
import time
import uuid
from io import BytesIO

Expand Down Expand Up @@ -193,6 +194,7 @@ def create_chat_completion_message_event(
request_id,
llm_metadata_dict,
response_id=None,
request_timestamp=None,
):
if not transaction:
return
Expand Down Expand Up @@ -227,6 +229,8 @@ def create_chat_completion_message_event(

if settings.ai_monitoring.record_content.enabled:
chat_completion_message_dict["content"] = content
if request_timestamp:
chat_completion_message_dict["timestamp"] = request_timestamp

chat_completion_message_dict.update(llm_metadata_dict)

Expand Down Expand Up @@ -542,10 +546,22 @@ def extract_bedrock_cohere_model_streaming_response(response_body, bedrock_attrs


def handle_bedrock_exception(
exc, is_embedding, model, span_id, trace_id, request_extractor, request_body, ft, transaction, kwargs, is_converse
exc,
is_embedding,
model,
span_id,
trace_id,
request_extractor,
request_body,
ft,
transaction,
kwargs,
is_converse,
request_timestamp=None,
):
try:
bedrock_attrs = {"model": model, "span_id": span_id, "trace_id": trace_id}

if is_converse:
try:
input_message_list = [
Expand Down Expand Up @@ -589,12 +605,14 @@ def handle_bedrock_exception(
if is_embedding:
handle_embedding_event(transaction, error_attributes)
else:
handle_chat_completion_event(transaction, error_attributes)
handle_chat_completion_event(transaction, error_attributes, request_timestamp)
except Exception:
_logger.warning(EXCEPTION_HANDLING_FAILURE_LOG_MESSAGE, exc_info=True)


def run_bedrock_response_extractor(response_extractor, response_body, bedrock_attrs, is_embedding, transaction):
def run_bedrock_response_extractor(
response_extractor, response_body, bedrock_attrs, is_embedding, transaction, request_timestamp=None
):
# Run response extractor for non-streaming responses
try:
response_extractor(response_body, bedrock_attrs)
Expand All @@ -604,7 +622,7 @@ def run_bedrock_response_extractor(response_extractor, response_body, bedrock_at
if is_embedding:
handle_embedding_event(transaction, bedrock_attrs)
else:
handle_chat_completion_event(transaction, bedrock_attrs)
handle_chat_completion_event(transaction, bedrock_attrs, request_timestamp)


def run_bedrock_request_extractor(request_extractor, request_body, bedrock_attrs):
Expand All @@ -628,6 +646,8 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
if not settings.ai_monitoring.enabled:
return wrapped(*args, **kwargs)

request_timestamp = int(1000.0 * time.time())

transaction.add_ml_model_info("Bedrock", BOTOCORE_VERSION)
transaction._add_agent_attribute("llm", True)

Expand Down Expand Up @@ -703,6 +723,7 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
transaction,
kwargs,
is_converse=False,
request_timestamp=request_timestamp,
)
raise

Expand Down Expand Up @@ -733,6 +754,8 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
run_bedrock_request_extractor(request_extractor, request_body, bedrock_attrs)

try:
bedrock_attrs.pop("timestamp", None) # The request timestamp is only needed for request extraction

if response_streaming:
# Wrap EventStream object here to intercept __iter__ method instead of instrumenting class.
# This class is used in numerous other services in botocore, and would cause conflicts.
Expand All @@ -748,7 +771,14 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
bedrock_attrs["duration"] = ft.duration * 1000
response["body"] = StreamingBody(BytesIO(response_body), len(response_body))

run_bedrock_response_extractor(response_extractor, response_body, bedrock_attrs, is_embedding, transaction)
run_bedrock_response_extractor(
response_extractor,
response_body,
bedrock_attrs,
is_embedding,
transaction,
request_timestamp=request_timestamp,
)

except Exception:
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, exc_info=True)
Expand All @@ -770,6 +800,8 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
if not settings.ai_monitoring.enabled:
return wrapped(*args, **kwargs)

request_timestamp = int(1000.0 * time.time())

transaction.add_ml_model_info("Bedrock", BOTOCORE_VERSION)
transaction._add_agent_attribute("llm", True)

Expand Down Expand Up @@ -810,7 +842,18 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
response = wrapped(*args, **kwargs)
except Exception as exc:
handle_bedrock_exception(
exc, False, model, span_id, trace_id, request_extractor, {}, ft, transaction, kwargs, is_converse=True
exc,
False,
model,
span_id,
trace_id,
request_extractor,
{},
ft,
transaction,
kwargs,
is_converse=True,
request_timestamp=request_timestamp,
)
raise

Expand All @@ -824,11 +867,14 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):

response_headers = response.get("ResponseMetadata", {}).get("HTTPHeaders") or {}
bedrock_attrs = extract_bedrock_converse_attrs(kwargs, response, response_headers, model, span_id, trace_id)
bedrock_attrs["timestamp"] = request_timestamp

try:
ft.__exit__(None, None, None)
bedrock_attrs["duration"] = ft.duration * 1000
run_bedrock_response_extractor(response_extractor, {}, bedrock_attrs, False, transaction)
run_bedrock_response_extractor(
response_extractor, {}, bedrock_attrs, False, transaction, request_timestamp=request_timestamp
)

except Exception:
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, exc_info=True)
Expand Down Expand Up @@ -881,6 +927,7 @@ def __iter__(self):
class GeneratorProxy(ObjectProxy):
def __init__(self, wrapped):
super().__init__(wrapped)
self.request_timestamp = int(1000.0 * time.time())

def __iter__(self):
return self
Expand All @@ -893,12 +940,12 @@ def __next__(self):
return_val = None
try:
return_val = self.__wrapped__.__next__()
record_stream_chunk(self, return_val, transaction)
record_stream_chunk(self, return_val, transaction, request_timestamp=self.request_timestamp)
except StopIteration:
record_events_on_stop_iteration(self, transaction)
record_events_on_stop_iteration(self, transaction, request_timestamp=self.request_timestamp)
raise
except Exception as exc:
record_error(self, transaction, exc)
record_error(self, transaction, exc, request_timestamp=self.request_timestamp)
raise
return return_val

Expand All @@ -918,6 +965,7 @@ def __aiter__(self):
class AsyncGeneratorProxy(ObjectProxy):
def __init__(self, wrapped):
super().__init__(wrapped)
self.request_timestamp = int(1000.0 * time.time())

def __aiter__(self):
return self
Expand All @@ -929,20 +977,20 @@ async def __anext__(self):
return_val = None
try:
return_val = await self.__wrapped__.__anext__()
record_stream_chunk(self, return_val, transaction)
record_stream_chunk(self, return_val, transaction, request_timestamp=self.request_timestamp)
except StopAsyncIteration:
record_events_on_stop_iteration(self, transaction)
record_events_on_stop_iteration(self, transaction, request_timestamp=self.request_timestamp)
raise
except Exception as exc:
record_error(self, transaction, exc)
record_error(self, transaction, exc, request_timestamp=self.request_timestamp)
raise
return return_val

async def aclose(self):
return await super().aclose()


def record_stream_chunk(self, return_val, transaction):
def record_stream_chunk(self, return_val, transaction, request_timestamp=None):
if return_val:
try:
chunk = json.loads(return_val["chunk"]["bytes"].decode("utf-8"))
Expand All @@ -951,12 +999,12 @@ def record_stream_chunk(self, return_val, transaction):
# So we need to call the record events here since stop iteration will not be raised.
_type = chunk.get("type")
if _type == "content_block_stop":
record_events_on_stop_iteration(self, transaction)
record_events_on_stop_iteration(self, transaction, request_timestamp)
except Exception:
_logger.warning(RESPONSE_EXTRACTOR_FAILURE_LOG_MESSAGE, exc_info=True)


def record_events_on_stop_iteration(self, transaction):
def record_events_on_stop_iteration(self, transaction, request_timestamp=None):
if hasattr(self, "_nr_ft"):
bedrock_attrs = getattr(self, "_nr_bedrock_attrs", {})
self._nr_ft.__exit__(None, None, None)
Expand All @@ -967,15 +1015,15 @@ def record_events_on_stop_iteration(self, transaction):

try:
bedrock_attrs["duration"] = self._nr_ft.duration * 1000
handle_chat_completion_event(transaction, bedrock_attrs)
handle_chat_completion_event(transaction, bedrock_attrs, request_timestamp)
except Exception:
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, exc_info=True)

# Clear cached data as this can be very large.
self._nr_bedrock_attrs.clear()


def record_error(self, transaction, exc):
def record_error(self, transaction, exc, request_timestamp=None):
if hasattr(self, "_nr_ft"):
try:
ft = self._nr_ft
Expand All @@ -998,7 +1046,7 @@ def record_error(self, transaction, exc):
ft.__exit__(*sys.exc_info())
error_attributes["duration"] = ft.duration * 1000

handle_chat_completion_event(transaction, error_attributes)
handle_chat_completion_event(transaction, error_attributes, request_timestamp)

# Clear cached data as this can be very large.
error_attributes.clear()
Expand Down Expand Up @@ -1047,7 +1095,7 @@ def handle_embedding_event(transaction, bedrock_attrs):
transaction.record_custom_event("LlmEmbedding", embedding_dict)


def handle_chat_completion_event(transaction, bedrock_attrs):
def handle_chat_completion_event(transaction, bedrock_attrs, request_timestamp=None):
chat_completion_id = str(uuid.uuid4())
# Grab LLM-related custom attributes off of the transaction to store as metadata on LLM events
custom_attrs_dict = transaction._custom_params
Expand Down Expand Up @@ -1091,6 +1139,7 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
"response.number_of_messages": number_of_messages,
"response.choices.finish_reason": bedrock_attrs.get("response.choices.finish_reason", None),
"error": bedrock_attrs.get("error", None),
"timestamp": request_timestamp or None,
}
chat_completion_summary_dict.update(llm_metadata_dict)
chat_completion_summary_dict = {k: v for k, v in chat_completion_summary_dict.items() if v is not None}
Expand All @@ -1107,6 +1156,7 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
request_id=request_id,
llm_metadata_dict=llm_metadata_dict,
response_id=response_id,
request_timestamp=request_timestamp,
)


Expand Down
12 changes: 12 additions & 0 deletions newrelic/hooks/mlmodel_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import logging
import sys
import time
import uuid

import google
Expand Down Expand Up @@ -221,6 +222,8 @@ def wrap_generate_content_sync(wrapped, instance, args, kwargs):
if not settings.ai_monitoring.enabled:
return wrapped(*args, **kwargs)

kwargs["timestamp"] = int(1000.0 * time.time())

# Framework metric also used for entity tagging in the UI
transaction.add_ml_model_info("Gemini", GEMINI_VERSION)
transaction._add_agent_attribute("llm", True)
Expand Down Expand Up @@ -255,6 +258,8 @@ async def wrap_generate_content_async(wrapped, instance, args, kwargs):
if not settings.ai_monitoring.enabled:
return await wrapped(*args, **kwargs)

kwargs["timestamp"] = int(1000.0 * time.time())

# Framework metric also used for entity tagging in the UI
transaction.add_ml_model_info("Gemini", GEMINI_VERSION)
transaction._add_agent_attribute("llm", True)
Expand Down Expand Up @@ -339,6 +344,7 @@ def _record_generation_error(transaction, linking_metadata, completion_id, kwarg
"ingest_source": "Python",
"duration": ft.duration * 1000,
"error": True,
"timestamp": kwargs.get("timestamp") or None,
}
llm_metadata = _get_llm_attributes(transaction)
error_chat_completion_dict.update(llm_metadata)
Expand All @@ -357,6 +363,7 @@ def _record_generation_error(transaction, linking_metadata, completion_id, kwarg
request_model,
llm_metadata,
output_message_list,
kwargs.get("timestamp") or None,
)
except Exception:
_logger.warning(RECORD_EVENTS_FAILURE_LOG_MESSAGE, exc_info=True)
Expand Down Expand Up @@ -436,6 +443,7 @@ def _record_generation_success(transaction, linking_metadata, completion_id, kwa
# message This value should be 2 in almost all cases since we will report a summary event for each
# separate request (every input and output from the LLM)
"response.number_of_messages": 1 + len(output_message_list),
"timestamp": kwargs.get("timestamp") or None,
}

llm_metadata = _get_llm_attributes(transaction)
Expand All @@ -452,6 +460,7 @@ def _record_generation_success(transaction, linking_metadata, completion_id, kwa
request_model,
llm_metadata,
output_message_list,
kwargs.get("timestamp") or None,
)
except Exception:
_logger.warning(RECORD_EVENTS_FAILURE_LOG_MESSAGE, exc_info=True)
Expand All @@ -467,6 +476,7 @@ def create_chat_completion_message_event(
request_model,
llm_metadata,
output_message_list,
request_timestamp=None,
):
try:
settings = transaction.settings or global_settings()
Expand Down Expand Up @@ -510,6 +520,8 @@ def create_chat_completion_message_event(

if settings.ai_monitoring.record_content.enabled:
chat_completion_input_message_dict["content"] = input_message_content
if request_timestamp:
chat_completion_input_message_dict["timestamp"] = request_timestamp

chat_completion_input_message_dict.update(llm_metadata)

Expand Down
Loading