From 7149fed6a6f36abe1e3967f60407908dcdff37e0 Mon Sep 17 00:00:00 2001 From: shuningc Date: Mon, 17 Nov 2025 18:36:42 -0800 Subject: [PATCH 01/14] Adding Llamaindex llm instrumentation spike --- .../pyproject.toml | 58 +++++ .../instrumentation/llamaindex/__init__.py | 67 ++++++ .../llamaindex/callback_handler.py | 222 ++++++++++++++++++ .../instrumentation/llamaindex/config.py | 3 + .../instrumentation/llamaindex/version.py | 1 + .../tests/test_llm_instrumentation.py | 190 +++++++++++++++ 6 files changed, 541 insertions(+) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml new file mode 100644 index 00000000..55a67080 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml @@ -0,0 +1,58 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "splunk-otel-instrumentation-llamaindex" +dynamic = ["version"] +description = "OpenTelemetry LlamaIndex instrumentation" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.9" +authors = [ + { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] +dependencies = [ + "opentelemetry-api ~= 1.38.0.dev0", + "opentelemetry-instrumentation ~= 0.59b0.dev0", + "opentelemetry-semantic-conventions ~= 0.59b0.dev0", + "splunk-otel-util-genai>=0.1.4", +] + +[project.optional-dependencies] +instruments = ["llama-index-core >= 0.14.0"] +test = [ + "llama-index-core >= 0.14.0", + "llama-index-llms-openai >= 0.6.0", + "pytest >= 7.0.0", +] + +[project.entry-points.opentelemetry_instrumentor] +llamaindex = "opentelemetry.instrumentation.llamaindex:LlamaindexInstrumentor" + +[project.urls] +Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation-genai/opentelemetry-instrumentation-llamaindex" +Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" + +[tool.hatch.version] +path = "src/opentelemetry/instrumentation/llamaindex/version.py" + +[tool.hatch.build.targets.sdist] +include = ["/src", "/tests"] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] + +[tool.ruff] +exclude = ["./"] diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py new file mode 100644 index 00000000..c3bea984 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py @@ -0,0 +1,67 @@ +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.util.genai.handler import get_telemetry_handler +from opentelemetry.instrumentation.llamaindex.config import Config +from opentelemetry.instrumentation.llamaindex.callback_handler import ( + LlamaindexCallbackHandler, +) +from wrapt import wrap_function_wrapper + +_instruments = ("llama-index-core >= 0.14.0",) + + +class LlamaindexInstrumentor(BaseInstrumentor): + def __init__( + self, + exception_logger=None, + disable_trace_context_propagation=False, + use_legacy_attributes: bool = True, + ): + super().__init__() + Config._exception_logger = exception_logger + Config.use_legacy_attributes = use_legacy_attributes + self._disable_trace_context_propagation = ( + disable_trace_context_propagation + ) + self._telemetry_handler = None + + def instrumentation_dependencies(self): + return _instruments + + def _instrument(self, **kwargs): + tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") + logger_provider = kwargs.get("logger_provider") + + self._telemetry_handler = get_telemetry_handler( + tracer_provider=tracer_provider, + meter_provider=meter_provider, + logger_provider=logger_provider, + ) + + llamaindexCallBackHandler = LlamaindexCallbackHandler( + telemetry_handler=self._telemetry_handler + ) + + wrap_function_wrapper( + module="llama_index.core.callbacks.base", + name="CallbackManager.__init__", + wrapper=_BaseCallbackManagerInitWrapper(llamaindexCallBackHandler), + ) + + def _uninstrument(self, **kwargs): + pass + + +class _BaseCallbackManagerInitWrapper: + def __init__(self, callback_handler: "LlamaindexCallbackHandler"): + self._callback_handler = callback_handler + + def __call__(self, wrapped, instance, args, kwargs) -> None: + wrapped(*args, **kwargs) + # LlamaIndex uses 'handlers' instead of 'inheritable_handlers' + for handler in instance.handlers: + if isinstance(handler, type(self._callback_handler)): + break + else: + self._callback_handler._callback_manager = instance + instance.add_handler(self._callback_handler) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py new file mode 100644 index 00000000..7846cfbe --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -0,0 +1,222 @@ +from typing import Any, Dict, Optional + +from llama_index.core.callbacks.base_handler import BaseCallbackHandler +from llama_index.core.callbacks.schema import CBEventType + +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import ( + InputMessage, + LLMInvocation, + OutputMessage, + Text, +) + + +def _safe_str(value: Any) -> str: + """Safely convert value to string.""" + try: + return str(value) + except (TypeError, ValueError): + return "" + + +class LlamaindexCallbackHandler(BaseCallbackHandler): + """Simplified LlamaIndex callback handler - LLM invocation only.""" + + def __init__( + self, + telemetry_handler: Optional[TelemetryHandler] = None, + ) -> None: + super().__init__( + event_starts_to_ignore=[], + event_ends_to_ignore=[], + ) + self._handler = telemetry_handler + + def start_trace(self, trace_id: Optional[str] = None) -> None: + """Start a trace - required by BaseCallbackHandler.""" + pass + + def end_trace( + self, + trace_id: Optional[str] = None, + trace_map: Optional[Dict[str, Any]] = None, + ) -> None: + """End a trace - required by BaseCallbackHandler.""" + pass + + def on_event_start( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + parent_id: str = "", + **kwargs: Any, + ) -> str: + """Handle event start - only processing LLM events.""" + if event_type == CBEventType.LLM: + self._handle_llm_start(event_id, parent_id, payload, **kwargs) + return event_id + + def on_event_end( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + **kwargs: Any, + ) -> None: + """Handle event end - only processing LLM events.""" + if event_type == CBEventType.LLM: + self._handle_llm_end(event_id, payload, **kwargs) + + def _handle_llm_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle LLM invocation start.""" + if not self._handler or not payload: + return + + # Extract model information from payload + serialized = payload.get("serialized", {}) + model_name = ( + serialized.get("model") + or serialized.get("model_name") + or "unknown" + ) + + # Extract messages from payload + # LlamaIndex messages are ChatMessage objects with .content and .role properties + messages = payload.get("messages", []) + input_messages = [] + + for msg in messages: + # Handle ChatMessage objects (has .content property and .role attribute) + if hasattr(msg, "content") and hasattr(msg, "role"): + # Extract role - could be MessageRole enum + role_value = ( + str(msg.role.value) + if hasattr(msg.role, "value") + else str(msg.role) + ) + # Extract content - this is a property that pulls from blocks[0].text + content = _safe_str(msg.content) + input_messages.append( + InputMessage( + role=role_value, parts=[Text(content=content)] + ) + ) + elif isinstance(msg, dict): + # Handle serialized messages (dict format) + role = msg.get("role", "user") + # Try to extract from blocks first (LlamaIndex format) + blocks = msg.get("blocks", []) + if blocks and isinstance(blocks[0], dict): + content = blocks[0].get("text", "") + else: + # Fallback to direct content field + content = msg.get("content", "") + + role_value = ( + str(role.value) if hasattr(role, "value") else str(role) + ) + input_messages.append( + InputMessage( + role=role_value, + parts=[Text(content=_safe_str(content))], + ) + ) + + # Create LLM invocation with event_id as run_id + llm_inv = LLMInvocation( + request_model=_safe_str(model_name), + input_messages=input_messages, + attributes={}, + run_id=event_id, # Use event_id as run_id for registry lookup + ) + llm_inv.framework = "llamaindex" + + # Start the LLM invocation (handler stores it in _entity_registry) + self._handler.start_llm(llm_inv) + + def _handle_llm_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle LLM invocation end.""" + if not self._handler: + return + + # Get the LLM invocation from handler's registry using event_id + llm_inv = self._handler.get_entity(event_id) + if not llm_inv or not isinstance(llm_inv, LLMInvocation): + return + + if payload: + # Extract response from payload + response = payload.get("response") + + # Handle both dict and object types for response + if response: + # Get message - could be dict or object + if isinstance(response, dict): + message = response.get("message", {}) + raw_response = response.get("raw") + else: + # response is a ChatResponse object + message = getattr(response, "message", None) + raw_response = getattr(response, "raw", None) + + # Extract content from message + if message: + if isinstance(message, dict): + # Message is dict + blocks = message.get("blocks", []) + if blocks and isinstance(blocks[0], dict): + content = blocks[0].get("text", "") + else: + content = message.get("content", "") + else: + # Message is ChatMessage object + blocks = getattr(message, "blocks", []) + if blocks and len(blocks) > 0: + content = getattr(blocks[0], "text", "") + else: + content = getattr(message, "content", "") + + # Create output message + llm_inv.output_messages = [ + OutputMessage( + role="assistant", + parts=[Text(content=_safe_str(content))], + finish_reason="stop", + ) + ] + + # Extract token usage from response.raw (OpenAI format) + # LlamaIndex stores the raw API response (e.g., OpenAI response) in response.raw + # raw_response could be a dict or an object (e.g., ChatCompletion from OpenAI) + if raw_response: + # Try to get usage from dict or object + if isinstance(raw_response, dict): + usage = raw_response.get("usage", {}) + else: + # It's an object, try to get usage attribute + usage = getattr(raw_response, "usage", None) + + if usage: + # usage could also be dict or object + if isinstance(usage, dict): + llm_inv.input_tokens = usage.get("prompt_tokens") + llm_inv.output_tokens = usage.get("completion_tokens") + else: + llm_inv.input_tokens = getattr(usage, "prompt_tokens", None) + llm_inv.output_tokens = getattr(usage, "completion_tokens", None) + + # Stop the LLM invocation + self._handler.stop_llm(llm_inv) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py new file mode 100644 index 00000000..44199c03 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py @@ -0,0 +1,3 @@ +class Config: + exception_logger = None + use_legacy_attributes = True diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py new file mode 100644 index 00000000..3dc1f76b --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py new file mode 100644 index 00000000..50324c37 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py @@ -0,0 +1,190 @@ +"""Tests for LlamaIndex LLM instrumentation with OpenTelemetry.""" + +import os + +from llama_index.core.llms import ChatMessage, MessageRole +from llama_index.core.llms.mock import MockLLM +from opentelemetry import metrics, trace +from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, +) +from opentelemetry.semconv._incubating.metrics import gen_ai_metrics + + +def setup_telemetry(): + """Setup OpenTelemetry with both trace and metrics exporters.""" + # Setup tracing + trace.set_tracer_provider(TracerProvider()) + tracer_provider = trace.get_tracer_provider() + tracer_provider.add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + # Setup metrics with InMemoryMetricReader + metric_reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + return tracer_provider, meter_provider, metric_reader + + +def test_with_openai(): + """Test with real OpenAI API - requires OPENAI_API_KEY environment variable.""" + from llama_index.llms.openai import OpenAI + + print("=" * 80) + print("Testing with OpenAI API") + print("=" * 80) + + llm = OpenAI(model="gpt-3.5-turbo") + messages = [ + ChatMessage( + role=MessageRole.SYSTEM, content="You are a helpful assistant." + ), + ChatMessage( + role=MessageRole.USER, content="Say hello in exactly 5 words" + ), + ] + + response = llm.chat(messages) + print(f"\nResponse: {response.message.content}") + + if hasattr(response, "raw") and response.raw: + if isinstance(response.raw, dict): + usage = response.raw.get("usage", {}) + else: + usage = getattr(response.raw, "usage", None) + + if usage: + if isinstance(usage, dict): + prompt_tokens = usage.get("prompt_tokens") + completion_tokens = usage.get("completion_tokens") + total_tokens = usage.get("total_tokens") + else: + prompt_tokens = getattr(usage, "prompt_tokens", None) + completion_tokens = getattr(usage, "completion_tokens", None) + total_tokens = getattr(usage, "total_tokens", None) + + print(f"\nToken Usage: input={prompt_tokens}, output={completion_tokens}, total={total_tokens}") + + print("=" * 80) + + +class MockLLMWithUsage(MockLLM): + """MockLLM that includes fake usage data for testing.""" + + def _complete(self, prompt, **kwargs): + """Override internal complete to inject usage data.""" + response = super()._complete(prompt, **kwargs) + # Note: MockLLM uses _complete internally, but we can't easily inject + # usage here because the ChatResponse is created later + return response + + +def test_with_mock(): + """Test with MockLLM - no API key needed.""" + print("=" * 80) + print("Testing with MockLLM") + print("=" * 80) + + llm = MockLLM(max_tokens=50) + messages = [ + ChatMessage( + role=MessageRole.SYSTEM, content="You are a helpful assistant." + ), + ChatMessage(role=MessageRole.USER, content="Say hello in 5 words"), + ] + + response = llm.chat(messages) + print(f"\nResponse: {response.message.content[:100]}...") + print("=" * 80) + + +def test_message_extraction(): + """Test message extraction.""" + print("\n" + "=" * 80) + print("Testing message extraction") + print("=" * 80) + + llm = MockLLM(max_tokens=20) + messages = [ + ChatMessage(role=MessageRole.SYSTEM, content="You are helpful."), + ChatMessage(role=MessageRole.USER, content="Test message"), + ] + + response = llm.chat(messages) + print(f"\nResponse: {response.message.content[:50]}...") + print("=" * 80) + + +if __name__ == "__main__": + # Enable metrics emission + os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" + + # Setup telemetry + tracer_provider, meter_provider, metric_reader = setup_telemetry() + + # Instrument LlamaIndex + instrumentor = LlamaindexInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + meter_provider=meter_provider + ) + print("LlamaIndex instrumentation enabled\n") + + # Run tests + if os.environ.get("OPENAI_API_KEY"): + print("Testing with real OpenAI API\n") + test_with_openai() + else: + print("Testing with MockLLM (set OPENAI_API_KEY to test real API)\n") + test_with_mock() + + # Test message extraction + test_message_extraction() + + # Check metrics + print("\n" + "=" * 80) + print("Metrics Summary") + print("=" * 80) + + metrics_data = metric_reader.get_metrics_data() + found_duration = False + found_token_usage = False + + if metrics_data: + for rm in getattr(metrics_data, "resource_metrics", []) or []: + for scope in getattr(rm, "scope_metrics", []) or []: + for metric in getattr(scope, "metrics", []) or []: + print(f"\nMetric: {metric.name}") + + if metric.name == gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION: + found_duration = True + dps = getattr(metric.data, "data_points", []) + if dps: + print(f" Duration: {dps[0].sum:.4f} seconds") + print(f" Count: {dps[0].count}") + + if metric.name == gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE: + found_token_usage = True + dps = getattr(metric.data, "data_points", []) + for dp in dps: + token_type = dp.attributes.get("gen_ai.token.type", "unknown") + print(f" Token type: {token_type}, Sum: {dp.sum}, Count: {dp.count}") + + print("\n" + "=" * 80) + status = [] + if found_duration: + status.append("Duration: OK") + if found_token_usage: + status.append("Token Usage: OK") + if not found_duration and not found_token_usage: + status.append("No metrics (use real API for metrics)") + + print("Status: " + " | ".join(status)) + print("=" * 80) From 0bde623c06ab03d8719fe34302ae466e56b05231 Mon Sep 17 00:00:00 2001 From: shuningc Date: Mon, 17 Nov 2025 18:43:00 -0800 Subject: [PATCH 02/14] Updating readme --- .../README.rst | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst new file mode 100644 index 00000000..5371d3c0 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst @@ -0,0 +1,155 @@ +OpenTelemetry LlamaIndex Instrumentation +========================================= + +This library provides automatic instrumentation for LlamaIndex applications using OpenTelemetry. + +Installation +------------ + +Development installation:: + + # Install the package in editable mode + cd instrumentation-genai/opentelemetry-instrumentation-llamaindex + pip install -e . + + # Install test dependencies + pip install -e ".[test]" + + # Install util-genai (required for telemetry) + cd ../../util/opentelemetry-util-genai + pip install -e . + + +Quick Start +----------- + +.. code-block:: python + + import os + from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor + from opentelemetry import trace, metrics + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import InMemoryMetricReader + + # Enable metrics (default is spans only) + os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" + + # Setup tracing + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + # Setup metrics + metric_reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + # Enable instrumentation with providers + LlamaindexInstrumentor().instrument( + tracer_provider=trace.get_tracer_provider(), + meter_provider=meter_provider + ) + + # Use LlamaIndex as normal + from llama_index.llms.openai import OpenAI + from llama_index.core.llms import ChatMessage, MessageRole + + llm = OpenAI(model="gpt-3.5-turbo") + messages = [ChatMessage(role=MessageRole.USER, content="Hello")] + response = llm.chat(messages) + + +Running Tests +------------- + +.. code-block:: bash + + # Set environment variables + export OPENAI_API_KEY=your-api-key + export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric + + # Run the test + cd tests + python test_llm_instrumentation.py + + +Expected Output +--------------- + +**Span Attributes**:: + + { + "gen_ai.framework": "llamaindex", + "gen_ai.request.model": "gpt-3.5-turbo", + "gen_ai.operation.name": "chat", + "gen_ai.usage.input_tokens": 24, + "gen_ai.usage.output_tokens": 7 + } + +**Metrics**:: + + Metric: gen_ai.client.operation.duration + Duration: 0.6900 seconds + Count: 1 + + Metric: gen_ai.client.token.usage + Token type: input, Sum: 24, Count: 1 + Token type: output, Sum: 7, Count: 1 + + +Key Implementation Differences from LangChain +---------------------------------------------- + +**1. Event-Based Callbacks** + +LlamaIndex uses ``on_event_start(event_type, ...)`` and ``on_event_end(event_type, ...)`` +instead of LangChain's method-based callbacks (``on_llm_start``, ``on_llm_end``). + +Event types are dispatched via ``CBEventType`` enum:: + + CBEventType.LLM # LLM invocations + CBEventType.AGENT # Agent steps + CBEventType.EMBEDDING # Embedding operations + +**2. Handler Registration** + +LlamaIndex uses ``handlers`` list:: + + callback_manager.handlers.append(handler) + +LangChain uses ``inheritable_handlers``:: + + callback_manager.inheritable_handlers.append(handler) + +**3. Response Structure** + +LlamaIndex ``ChatMessage`` uses ``blocks`` (list of TextBlock objects):: + + message.content # Computed property from blocks[0].text + +LangChain uses simple strings:: + + message.content # Direct string property + +**4. Token Usage** + +LlamaIndex returns objects (not dicts):: + + response.raw.usage.prompt_tokens # Object attribute + response.raw.usage.completion_tokens # Object attribute + +LangChain returns dicts:: + + response["usage"]["prompt_tokens"] # Dict key + response["usage"]["completion_tokens"] # Dict key + + +References +---------- + +* `OpenTelemetry Project `_ +* `LlamaIndex `_ +* `LlamaIndex Callbacks `_ From 633d0af33df0d2670a95f1c06b3ab4ca81cb4a99 Mon Sep 17 00:00:00 2001 From: shuningc Date: Tue, 18 Nov 2025 14:39:39 -0800 Subject: [PATCH 03/14] feat: Add embedding instrumentation for LlamaIndex - Add embedding event handlers (_handle_embedding_start, _handle_embedding_end) - Extract model name, input texts, and dimension count from embedding events - Create vendor_detection.py module with VendorRule-based provider detection - Support 13+ embedding providers (OpenAI, Azure, AWS, Google, Cohere, etc.) - Add test_embedding_instrumentation.py with single and batch embedding tests - Update README with embedding documentation and provider list - Tested successfully with OpenAI embeddings API --- .../README.rst | 70 +++++++- .../llamaindex/callback_handler.py | 89 ++++++++++- .../llamaindex/vendor_detection.py | 119 ++++++++++++++ .../tests/test_embedding_instrumentation.py | 151 ++++++++++++++++++ 4 files changed, 422 insertions(+), 7 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst index 5371d3c0..bf9ea59b 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst @@ -65,6 +65,8 @@ Quick Start Running Tests ------------- +**LLM Tests**: + .. code-block:: bash # Set environment variables @@ -75,11 +77,23 @@ Running Tests cd tests python test_llm_instrumentation.py +**Embedding Tests**: + +.. code-block:: bash + + # Set environment variables + export OPENAI_API_KEY=your-api-key + export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric + + # Run the test + cd tests + python test_embedding_instrumentation.py + Expected Output --------------- -**Span Attributes**:: +**LLM Span Attributes**:: { "gen_ai.framework": "llamaindex", @@ -89,6 +103,15 @@ Expected Output "gen_ai.usage.output_tokens": 7 } +**Embedding Span Attributes**:: + + { + "gen_ai.operation.name": "embeddings", + "gen_ai.request.model": "text-embedding-3-small", + "gen_ai.provider.name": "openai", + "gen_ai.embeddings.dimension.count": 1536 + } + **Metrics**:: Metric: gen_ai.client.operation.duration @@ -110,9 +133,9 @@ instead of LangChain's method-based callbacks (``on_llm_start``, ``on_llm_end``) Event types are dispatched via ``CBEventType`` enum:: - CBEventType.LLM # LLM invocations - CBEventType.AGENT # Agent steps - CBEventType.EMBEDDING # Embedding operations + CBEventType.LLM # LLM invocations (chat, complete) + CBEventType.AGENT # Agent steps (not yet instrumented) + CBEventType.EMBEDDING # Embedding operations (get_text_embedding, get_text_embedding_batch) **2. Handler Registration** @@ -147,6 +170,45 @@ LangChain returns dicts:: response["usage"]["completion_tokens"] # Dict key +Supported Features +------------------ + +**LLM Operations** + +* ✅ Chat completion (``llm.chat()``, ``llm.stream_chat()``) +* ✅ Text completion (``llm.complete()``, ``llm.stream_complete()``) +* ✅ Token usage tracking +* ✅ Model name detection +* ✅ Framework attribution + +**Embedding Operations** + +* ✅ Single text embedding (``embed_model.get_text_embedding()``) +* ✅ Batch embedding (``embed_model.get_text_embedding_batch()``) +* ✅ Query embedding (``embed_model.get_query_embedding()``) +* ✅ Provider detection (OpenAI, Azure, AWS Bedrock, Google, Cohere, HuggingFace, Ollama, and more) +* ✅ Dimension count tracking +* ✅ Input text capture + +**Provider Detection** + +Embedding instrumentation automatically detects the provider from class names: + +* **OpenAI**: ``OpenAIEmbedding`` +* **Azure**: ``AzureOpenAIEmbedding`` +* **AWS**: ``BedrockEmbedding`` +* **Google**: ``GeminiEmbedding``, ``VertexTextEmbedding``, ``GooglePaLMEmbedding`` +* **Cohere**: ``CohereEmbedding`` +* **HuggingFace**: ``HuggingFaceEmbedding``, ``HuggingFaceInferenceAPIEmbedding`` +* **Ollama**: ``OllamaEmbedding`` +* **Anthropic**: ``AnthropicEmbedding`` +* **MistralAI**: ``MistralAIEmbedding`` +* **Together**: ``TogetherEmbedding`` +* **Fireworks**: ``FireworksEmbedding`` +* **Voyage**: ``VoyageEmbedding`` +* **Jina**: ``JinaEmbedding`` + + References ---------- diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py index 7846cfbe..fe9a1a1f 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -5,12 +5,15 @@ from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( + EmbeddingInvocation, InputMessage, LLMInvocation, OutputMessage, Text, ) +from .vendor_detection import detect_vendor_from_class + def _safe_str(value: Any) -> str: """Safely convert value to string.""" @@ -21,7 +24,7 @@ def _safe_str(value: Any) -> str: class LlamaindexCallbackHandler(BaseCallbackHandler): - """Simplified LlamaIndex callback handler - LLM invocation only.""" + """LlamaIndex callback handler supporting LLM and Embedding instrumentation.""" def __init__( self, @@ -53,9 +56,11 @@ def on_event_start( parent_id: str = "", **kwargs: Any, ) -> str: - """Handle event start - only processing LLM events.""" + """Handle event start - processing LLM and EMBEDDING events.""" if event_type == CBEventType.LLM: self._handle_llm_start(event_id, parent_id, payload, **kwargs) + elif event_type == CBEventType.EMBEDDING: + self._handle_embedding_start(event_id, parent_id, payload, **kwargs) return event_id def on_event_end( @@ -65,9 +70,11 @@ def on_event_end( event_id: str = "", **kwargs: Any, ) -> None: - """Handle event end - only processing LLM events.""" + """Handle event end - processing LLM and EMBEDDING events.""" if event_type == CBEventType.LLM: self._handle_llm_end(event_id, payload, **kwargs) + elif event_type == CBEventType.EMBEDDING: + self._handle_embedding_end(event_id, payload, **kwargs) def _handle_llm_start( self, @@ -220,3 +227,79 @@ def _handle_llm_end( # Stop the LLM invocation self._handler.stop_llm(llm_inv) + + def _handle_embedding_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle embedding invocation start.""" + if not self._handler or not payload: + return + + # Extract model information from payload + serialized = payload.get("serialized", {}) + model_name = ( + serialized.get("model_name") + or serialized.get("model") + or "unknown" + ) + + # Detect provider from class name + class_name = serialized.get("class_name", "") + provider = detect_vendor_from_class(class_name) + + # Note: input texts are not available at start time in LlamaIndex + # They will be available in the end event payload + + # Create embedding invocation with event_id as run_id + emb_inv = EmbeddingInvocation( + request_model=_safe_str(model_name), + input_texts=[], # Will be populated on end event + provider=provider, + attributes={}, + run_id=event_id, + ) + emb_inv.framework = "llamaindex" + + # Start the embedding invocation + self._handler.start_embedding(emb_inv) + + def _handle_embedding_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle embedding invocation end.""" + if not self._handler: + return + + # Get the embedding invocation from handler's registry using event_id + emb_inv = self._handler.get_entity(event_id) + if not emb_inv or not isinstance(emb_inv, EmbeddingInvocation): + return + + if payload: + # Extract input chunks (texts) from response + # chunks is the list of input texts that were embedded + chunks = payload.get("chunks", []) + if chunks: + emb_inv.input_texts = [_safe_str(chunk) for chunk in chunks] + + # Extract embedding vectors from response + # embeddings is the list of output vectors + embeddings = payload.get("embeddings", []) + + # Determine dimension from first embedding vector + if embeddings and len(embeddings) > 0: + first_embedding = embeddings[0] + if isinstance(first_embedding, list): + emb_inv.dimension_count = len(first_embedding) + elif hasattr(first_embedding, "__len__"): + emb_inv.dimension_count = len(first_embedding) + + # Stop the embedding invocation + self._handler.stop_embedding(emb_inv) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py new file mode 100644 index 00000000..6f9c9f06 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py @@ -0,0 +1,119 @@ +"""Vendor detection for LlamaIndex embedding providers.""" + +from dataclasses import dataclass +from typing import List, Set + + +@dataclass(frozen=True) +class VendorRule: + """Rule for detecting vendor from LlamaIndex class names.""" + + exact_matches: Set[str] + patterns: List[str] + vendor_name: str + + def matches(self, class_name: str) -> bool: + """Check if class name matches this vendor rule.""" + if class_name in self.exact_matches: + return True + class_lower = class_name.lower() + return any(pattern in class_lower for pattern in self.patterns) + + +def _get_vendor_rules() -> List[VendorRule]: + """ + Get vendor detection rules ordered by specificity (most specific first). + + Returns: + List of VendorRule objects for detecting embedding vendors from class names + """ + return [ + VendorRule( + exact_matches={"AzureOpenAIEmbedding"}, + patterns=["azure"], + vendor_name="azure", + ), + VendorRule( + exact_matches={"OpenAIEmbedding"}, + patterns=["openai"], + vendor_name="openai", + ), + VendorRule( + exact_matches={"BedrockEmbedding"}, + patterns=["bedrock", "aws"], + vendor_name="aws", + ), + VendorRule( + exact_matches={"VertexTextEmbedding", "GeminiEmbedding", "GooglePaLMEmbedding"}, + patterns=["vertex", "google", "palm", "gemini"], + vendor_name="google", + ), + VendorRule( + exact_matches={"CohereEmbedding"}, + patterns=["cohere"], + vendor_name="cohere", + ), + VendorRule( + exact_matches={"HuggingFaceEmbedding", "HuggingFaceInferenceAPIEmbedding"}, + patterns=["huggingface"], + vendor_name="huggingface", + ), + VendorRule( + exact_matches={"OllamaEmbedding"}, + patterns=["ollama"], + vendor_name="ollama", + ), + VendorRule( + exact_matches={"AnthropicEmbedding"}, + patterns=["anthropic"], + vendor_name="anthropic", + ), + VendorRule( + exact_matches={"MistralAIEmbedding"}, + patterns=["mistral"], + vendor_name="mistralai", + ), + VendorRule( + exact_matches={"TogetherEmbedding"}, + patterns=["together"], + vendor_name="together", + ), + VendorRule( + exact_matches={"FireworksEmbedding"}, + patterns=["fireworks"], + vendor_name="fireworks", + ), + VendorRule( + exact_matches={"VoyageEmbedding"}, + patterns=["voyage"], + vendor_name="voyage", + ), + VendorRule( + exact_matches={"JinaEmbedding"}, + patterns=["jina"], + vendor_name="jina", + ), + ] + + +def detect_vendor_from_class(class_name: str) -> str: + """ + Detect vendor from LlamaIndex embedding class name. + Uses unified detection rules combining exact matches and patterns. + + Args: + class_name: The class name from serialized embedding information + + Returns: + Vendor string (lowercase), defaults to None if no match found + """ + if not class_name: + return None + + vendor_rules = _get_vendor_rules() + + for rule in vendor_rules: + if rule.matches(class_name): + return rule.vendor_name + + return None diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py new file mode 100644 index 00000000..355a0570 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py @@ -0,0 +1,151 @@ +"""Test embedding instrumentation for LlamaIndex.""" + +import os + +from llama_index.core import Settings +from llama_index.core.callbacks import CallbackManager +from llama_index.embeddings.openai import OpenAIEmbedding +from opentelemetry import metrics, trace +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor + +from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor + + +# Global setup - shared across tests +metric_reader = None +instrumentor = None + + +def setup_telemetry(): + """Setup OpenTelemetry with span and metric exporters (once).""" + global metric_reader, instrumentor + + if metric_reader is not None: + return metric_reader + + # Enable metrics + os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" + + # Setup tracing + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + # Setup metrics with InMemoryMetricReader + metric_reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + # Enable instrumentation once + instrumentor = LlamaindexInstrumentor() + instrumentor.instrument( + tracer_provider=trace.get_tracer_provider(), + meter_provider=metrics.get_meter_provider(), + ) + + return metric_reader + + +def test_embedding_single_text(): + """Test single text embedding instrumentation.""" + print("\nTest: Single Text Embedding") + print("=" * 60) + + metric_reader = setup_telemetry() + + # Configure embedding model + embed_model = OpenAIEmbedding( + model="text-embedding-3-small", + api_key=os.environ.get("OPENAI_API_KEY"), + ) + Settings.embed_model = embed_model + + # Make sure callback manager is initialized + if Settings.callback_manager is None: + Settings.callback_manager = CallbackManager() + + # Generate single embedding + text = "LlamaIndex is a data framework for LLM applications" + embedding = embed_model.get_text_embedding(text) + + print(f"\nText: {text}") + print(f"Embedding dimension: {len(embedding)}") + print(f"First 5 values: {embedding[:5]}") + + # Validate metrics + print("\nMetrics:") + metrics_data = metric_reader.get_metrics_data() + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + print(f"\nMetric: {metric.name}") + for data_point in metric.data.data_points: + if hasattr(data_point, "bucket_counts"): + # Histogram + print(f" Count: {sum(data_point.bucket_counts)}") + else: + # Counter + print(f" Value: {data_point.value}") + + print("\nTest completed successfully") + + +def test_embedding_batch(): + """Test batch embedding instrumentation.""" + print("\nTest: Batch Embeddings") + print("=" * 60) + + metric_reader = setup_telemetry() + + # Configure embedding model + embed_model = OpenAIEmbedding( + model="text-embedding-3-small", + api_key=os.environ.get("OPENAI_API_KEY"), + ) + Settings.embed_model = embed_model + + # Make sure callback manager is initialized + if Settings.callback_manager is None: + Settings.callback_manager = CallbackManager() + + # Generate batch embeddings + texts = [ + "Paris is the capital of France", + "Berlin is the capital of Germany", + "Rome is the capital of Italy", + ] + embeddings = embed_model.get_text_embedding_batch(texts) + + print(f"\nEmbedded {len(embeddings)} texts") + print(f"Dimension: {len(embeddings[0])}") + + # Validate metrics + print("\nMetrics:") + metrics_data = metric_reader.get_metrics_data() + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + print(f"\nMetric: {metric.name}") + for data_point in metric.data.data_points: + if hasattr(data_point, "bucket_counts"): + # Histogram + print(f" Count: {sum(data_point.bucket_counts)}") + else: + # Counter + print(f" Value: {data_point.value}") + + print("\nTest completed successfully") + + +if __name__ == "__main__": + test_embedding_single_text() + print("\n" + "=" * 60 + "\n") + test_embedding_batch() + + # Cleanup + if instrumentor: + instrumentor.uninstrument() From 5ed2d858ebf8fe33d0e897ed35537b889de79810 Mon Sep 17 00:00:00 2001 From: shuningc Date: Tue, 25 Nov 2025 15:31:30 -0800 Subject: [PATCH 04/14] Adding temporary RAG instrumentation solution for llamaindex --- .../llamaindex/callback_handler.py | 280 +++++++++++++++++- .../tests/test_rag.py | 97 ++++++ 2 files changed, 373 insertions(+), 4 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py index fe9a1a1f..a070e06d 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -2,14 +2,15 @@ from llama_index.core.callbacks.base_handler import BaseCallbackHandler from llama_index.core.callbacks.schema import CBEventType - from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( EmbeddingInvocation, InputMessage, LLMInvocation, OutputMessage, + Step, Text, + Workflow, ) from .vendor_detection import detect_vendor_from_class @@ -35,6 +36,7 @@ def __init__( event_ends_to_ignore=[], ) self._handler = telemetry_handler + self._auto_workflow_id: Optional[str] = None # Track auto-created workflow def start_trace(self, trace_id: Optional[str] = None) -> None: """Start a trace - required by BaseCallbackHandler.""" @@ -48,6 +50,17 @@ def end_trace( """End a trace - required by BaseCallbackHandler.""" pass + def _get_parent_span(self, parent_id: str) -> Optional[Any]: + """Get parent span from handler's registry using parent_id.""" + if not self._handler or not parent_id: + return None + # Get the parent entity from handler's registry + parent_entity = self._handler.get_entity(parent_id) + if parent_entity: + # Return the span attribute if it exists + return getattr(parent_entity, "span", None) + return None + def on_event_start( self, event_type: CBEventType, @@ -56,11 +69,17 @@ def on_event_start( parent_id: str = "", **kwargs: Any, ) -> str: - """Handle event start - processing LLM and EMBEDDING events.""" + """Handle event start - processing LLM, EMBEDDING, QUERY, RETRIEVE, and SYNTHESIZE events.""" if event_type == CBEventType.LLM: self._handle_llm_start(event_id, parent_id, payload, **kwargs) elif event_type == CBEventType.EMBEDDING: self._handle_embedding_start(event_id, parent_id, payload, **kwargs) + elif event_type == CBEventType.QUERY: + self._handle_query_start(event_id, parent_id, payload, **kwargs) + elif event_type == CBEventType.RETRIEVE: + self._handle_retrieve_start(event_id, parent_id, payload, **kwargs) + elif event_type == CBEventType.SYNTHESIZE: + self._handle_synthesize_start(event_id, parent_id, payload, **kwargs) return event_id def on_event_end( @@ -70,11 +89,17 @@ def on_event_end( event_id: str = "", **kwargs: Any, ) -> None: - """Handle event end - processing LLM and EMBEDDING events.""" + """Handle event end - processing LLM, EMBEDDING, QUERY, RETRIEVE, and SYNTHESIZE events.""" if event_type == CBEventType.LLM: self._handle_llm_end(event_id, payload, **kwargs) elif event_type == CBEventType.EMBEDDING: self._handle_embedding_end(event_id, payload, **kwargs) + elif event_type == CBEventType.QUERY: + self._handle_query_end(event_id, payload, **kwargs) + elif event_type == CBEventType.RETRIEVE: + self._handle_retrieve_end(event_id, payload, **kwargs) + elif event_type == CBEventType.SYNTHESIZE: + self._handle_synthesize_end(event_id, payload, **kwargs) def _handle_llm_start( self, @@ -143,12 +168,18 @@ def _handle_llm_start( input_messages=input_messages, attributes={}, run_id=event_id, # Use event_id as run_id for registry lookup + parent_run_id=parent_id if parent_id else None, # Set parent for hierarchy ) llm_inv.framework = "llamaindex" + + # Resolve parent_id to parent_span for proper span context + parent_span = self._get_parent_span(parent_id) + if parent_span: + llm_inv.parent_span = parent_span # type: ignore[attr-defined] # Start the LLM invocation (handler stores it in _entity_registry) self._handler.start_llm(llm_inv) - + def _handle_llm_end( self, event_id: str, @@ -261,8 +292,14 @@ def _handle_embedding_start( provider=provider, attributes={}, run_id=event_id, + parent_run_id=parent_id if parent_id else None, # Set parent for hierarchy ) emb_inv.framework = "llamaindex" + + # Resolve parent_id to parent_span for proper span context + parent_span = self._get_parent_span(parent_id) + if parent_span: + emb_inv.parent_span = parent_span # type: ignore[attr-defined] # Start the embedding invocation self._handler.start_embedding(emb_inv) @@ -303,3 +340,238 @@ def _handle_embedding_end( # Stop the embedding invocation self._handler.stop_embedding(emb_inv) + + def _handle_query_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle query pipeline start - create Workflow if no parent, else Step.""" + if not self._handler or not payload: + return + + query_str = payload.get("query_str", "") + + # If no parent, this is the root workflow + if not parent_id: + workflow = Workflow( + name="llama_index_query_pipeline", + workflow_type="workflow", + initial_input=_safe_str(query_str), + attributes={}, + run_id=event_id, + ) + workflow.framework = "llamaindex" + self._handler.start_workflow(workflow) + + def _handle_query_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle query pipeline end.""" + if not self._handler: + return + + entity = self._handler.get_entity(event_id) + if not entity: + return + + if isinstance(entity, Workflow): + if payload: + response = payload.get("response") + if response: + # Extract response text + response_text = "" + if isinstance(response, dict): + response_text = response.get("response", "") + elif hasattr(response, "response"): + response_text = getattr(response, "response", "") + entity.final_output = _safe_str(response_text) + self._handler.stop_workflow(entity) + elif isinstance(entity, Step): + if payload: + response = payload.get("response") + if response: + response_text = "" + if isinstance(response, dict): + response_text = response.get("response", "") + elif hasattr(response, "response"): + response_text = getattr(response, "response", "") + entity.output_data = _safe_str(response_text) + self._handler.stop_step(entity) + + def _handle_retrieve_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle retrieval start - create Step for retrieve task.""" + if not self._handler or not payload: + return + + query_str = payload.get("query_str", "") + + # If parent_id doesn't exist or doesn't resolve to a tracked entity, + # create a root Workflow to hold the RAG steps + parent_entity = self._handler.get_entity(parent_id) if parent_id else None + + if not parent_entity: + # No valid parent - create auto-workflow + workflow_id = f"{event_id}_workflow" + workflow = Workflow( + name="llama_index_rag", + workflow_type="rag", + initial_input=_safe_str(query_str), + attributes={}, + run_id=workflow_id, + ) + workflow.framework = "llamaindex" + self._handler.start_workflow(workflow) + # Track this auto-created workflow + self._auto_workflow_id = workflow_id + # Get the workflow's span to use as parent + workflow_entity = self._handler.get_entity(workflow_id) + if workflow_entity: + parent_span = getattr(workflow_entity, "span", None) + else: + parent_span = None + else: + # Valid parent exists - resolve to parent_span + parent_span = self._get_parent_span(parent_id) + + # Create a step for the retrieval task + step = Step( + name="retrieve.task", + step_type="retrieve", + objective="Retrieve relevant documents", + input_data=_safe_str(query_str), + run_id=event_id, + parent_run_id=parent_id if parent_id else None, + attributes={}, + ) + + # Set parent_span if we have one + if parent_span: + step.parent_span = parent_span # type: ignore[attr-defined] + + self._handler.start_step(step) + + def _handle_retrieve_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle retrieval end - update step with retrieved nodes.""" + if not self._handler: + return + + step = self._handler.get_entity(event_id) + if not step or not isinstance(step, Step): + return + + if payload: + nodes = payload.get("nodes", []) + if nodes: + # Store document count and scores + step.attributes["retrieve.documents_count"] = len(nodes) + scores = [] + doc_ids = [] + for node in nodes: + if hasattr(node, "score") and node.score is not None: + scores.append(node.score) + if hasattr(node, "node_id"): + doc_ids.append(str(node.node_id)) + elif hasattr(node, "id_"): + doc_ids.append(str(node.id_)) + + if scores: + step.attributes["retrieve.scores"] = scores + if doc_ids: + step.attributes["retrieve.document_ids"] = doc_ids + + # Create output summary + step.output_data = f"Retrieved {len(nodes)} documents" + + self._handler.stop_step(step) + + def _handle_synthesize_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle synthesis start - create Step for synthesize task.""" + if not self._handler or not payload: + return + + query_str = payload.get("query_str", "") + + # Create a step for the synthesis task + step = Step( + name="synthesize.task", + step_type="synthesize", + objective="Synthesize response from retrieved documents", + input_data=_safe_str(query_str), + run_id=event_id, + parent_run_id=parent_id if parent_id else None, + attributes={}, + ) + + # Resolve parent_id to parent_span for proper span context + parent_span = self._get_parent_span(parent_id) + if parent_span: + step.parent_span = parent_span # type: ignore[attr-defined] + + self._handler.start_step(step) + + def _handle_synthesize_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle synthesis end - update step with synthesized response.""" + if not self._handler: + return + + step = self._handler.get_entity(event_id) + if not step or not isinstance(step, Step): + return + + if payload: + response = payload.get("response") + if response: + # Extract response text + response_text = "" + if isinstance(response, dict): + response_text = response.get("response", "") + elif hasattr(response, "response"): + response_text = getattr(response, "response", "") + step.output_data = _safe_str(response_text) + + self._handler.stop_step(step) + + # If we auto-created a workflow, close it after synthesize completes + if self._auto_workflow_id: + workflow = self._handler.get_entity(self._auto_workflow_id) + if workflow and isinstance(workflow, Workflow): + # Set final output from synthesize response + if payload: + response = payload.get("response") + if response: + response_text = "" + if isinstance(response, dict): + response_text = response.get("response", "") + elif hasattr(response, "response"): + response_text = getattr(response, "response", "") + workflow.final_output = _safe_str(response_text) + self._handler.stop_workflow(workflow) + self._auto_workflow_id = None # Reset for next query diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py new file mode 100644 index 00000000..fb8b7180 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py @@ -0,0 +1,97 @@ +""" +Test LlamaIndex RAG instrumentation without agents. + +This test validates that: +1. QUERY events create Workflow spans at the root level +2. RETRIEVE events create Step spans with parent_run_id pointing to the Workflow +3. SYNTHESIZE events create Step spans with parent_run_id pointing to the Workflow +4. LLM invocations nest under their Step parent via parent_run_id +5. Embedding invocations nest under their Step parent via parent_run_id +""" + +from llama_index.core import Document, Settings, VectorStoreIndex +from llama_index.embeddings.openai import OpenAIEmbedding +from llama_index.llms.openai import OpenAI +from opentelemetry import trace +from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor + + +def setup_telemetry(): + """Setup OpenTelemetry with console exporter to see trace structure.""" + trace.set_tracer_provider(TracerProvider()) + tracer_provider = trace.get_tracer_provider() + tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + return tracer_provider + + +def test_rag_without_agents(): + """Test RAG instrumentation creates correct hierarchy: Workflow -> Steps -> LLM/Embedding""" + + print("=" * 80) + print("Setting up telemetry...") + print("=" * 80) + setup_telemetry() + + # Setup LlamaIndex + Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0.1) + Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small") + + # Instrument + instrumentor = LlamaindexInstrumentor() + instrumentor.instrument() + + # Debug: Check callback handler + from llama_index.core import Settings as LlamaSettings + print(f"\nCallbacks registered: {len(LlamaSettings.callback_manager.handlers)}") + for handler in LlamaSettings.callback_manager.handlers: + print(f" Handler: {type(handler).__name__}") + + # Create sample documents + documents = [ + Document( + text="Paris is the capital of France. It has a population of over 2 million.", + metadata={"source": "geography", "country": "France"}, + ), + Document( + text="The Eiffel Tower is in Paris. It was completed in 1889.", + metadata={"source": "landmarks", "country": "France"}, + ), + ] + + print("\n" + "=" * 80) + print("Creating vector index (should see Embedding spans)...") + print("=" * 80) + index = VectorStoreIndex.from_documents(documents) + + print("\n" + "=" * 80) + print("Creating query engine...") + print("=" * 80) + query_engine = index.as_query_engine(similarity_top_k=2) + + print("\n" + "=" * 80) + print("Executing RAG query (should see Workflow -> retrieve.task/synthesize.task -> LLM/Embedding)...") + print("=" * 80) + response = query_engine.query("What is the capital of France?") + + print("\n" + "=" * 80) + print("RESULTS") + print("=" * 80) + print(f"Response: {response.response}") + print(f"Source nodes: {len(response.source_nodes)}") + + print("\n" + "=" * 80) + print("✓ Test completed!") + print("=" * 80) + print("\nExpected trace structure:") + print(" Workflow (gen_ai.operation.name=query)") + print(" ├─ Step (gen_ai.operation.name=retrieve.task)") + print(" │ └─ EmbeddingInvocation") + print(" └─ Step (gen_ai.operation.name=synthesize.task)") + print(" └─ LLMInvocation") + print("=" * 80) + + +if __name__ == "__main__": + test_rag_without_agents() From b0b94d6dc034281032780737bcee58859fb3250e Mon Sep 17 00:00:00 2001 From: JWinermaSplunk Date: Thu, 4 Dec 2025 15:09:38 -0800 Subject: [PATCH 05/14] initial commit --- .../examples/retrievals_example.py | 417 ++++++++++++++++++ .../opentelemetry/util/genai/attributes.py | 7 + .../util/genai/emitters/metrics.py | 48 ++ .../opentelemetry/util/genai/emitters/span.py | 85 +++- .../src/opentelemetry/util/genai/handler.py | 71 +++ .../opentelemetry/util/genai/instruments.py | 5 + .../src/opentelemetry/util/genai/types.py | 36 ++ .../tests/test_retrieval_invocation.py | 406 +++++++++++++++++ 8 files changed, 1072 insertions(+), 3 deletions(-) create mode 100644 util/opentelemetry-util-genai/examples/retrievals_example.py create mode 100644 util/opentelemetry-util-genai/tests/test_retrieval_invocation.py diff --git a/util/opentelemetry-util-genai/examples/retrievals_example.py b/util/opentelemetry-util-genai/examples/retrievals_example.py new file mode 100644 index 00000000..0e6b8833 --- /dev/null +++ b/util/opentelemetry-util-genai/examples/retrievals_example.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python3 +"""Example demonstrating OpenTelemetry GenAI telemetry for retrieval operations. + +This example shows: +1. Basic retrieval invocation lifecycle +2. Retrieval with vector search +3. Retrieval with text query and metadata +4. Retrieval with custom attributes +5. Error handling for retrieval operations +6. Retrieval with agent context +7. Metrics and span emission for retrievals +""" + +import time + +from opentelemetry import _logs as logs +from opentelemetry import trace +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import ( + ConsoleLogExporter, + SimpleLogRecordProcessor, +) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + PeriodicExportingMetricReader, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, +) +from opentelemetry.util.genai.handler import get_telemetry_handler +from opentelemetry.util.genai.types import Error, RetrievalInvocation + + +def setup_telemetry(): + """Set up OpenTelemetry providers for tracing, metrics, and logging.""" + # Set up tracing + trace_provider = TracerProvider() + trace_provider.add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + trace.set_tracer_provider(trace_provider) + + # Set up metrics + metric_reader = PeriodicExportingMetricReader( + ConsoleMetricExporter(), export_interval_millis=5000 + ) + meter_provider = MeterProvider(metric_readers=[metric_reader]) + + # Set up logging (for events) + logger_provider = LoggerProvider() + logger_provider.add_log_record_processor( + SimpleLogRecordProcessor(ConsoleLogExporter()) + ) + logs.set_logger_provider(logger_provider) + + return trace_provider, meter_provider, logger_provider + + +def example_basic_retrieval(): + """Example 1: Basic retrieval invocation with text query.""" + print("\n" + "=" * 60) + print("Example 1: Basic Retrieval Invocation") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create retrieval invocation + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="What is OpenTelemetry?", + top_k=5, + retriever_type="vector_store", + provider="pinecone", + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.05) # Simulate API call + + # Simulate response - populate results + retrieval.documents_retrieved = 5 + retrieval.results = [ + {"id": "doc1", "score": 0.95, "content": "OpenTelemetry is..."}, + {"id": "doc2", "score": 0.89, "content": "OTEL provides..."}, + {"id": "doc3", "score": 0.85, "content": "Observability with..."}, + {"id": "doc4", "score": 0.82, "content": "Tracing and metrics..."}, + {"id": "doc5", "score": 0.78, "content": "Distributed tracing..."}, + ] + + # Finish the retrieval operation + handler.stop_retrieval(retrieval) + + print("✓ Completed retrieval for text query") + print(f" Query: {retrieval.query}") + print(f" Documents retrieved: {retrieval.documents_retrieved}") + print(f" Provider: {retrieval.provider}") + + +def example_vector_search(): + """Example 2: Retrieval with vector search.""" + print("\n" + "=" * 60) + print("Example 2: Vector Search Retrieval") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create retrieval with query vector + query_vector = [0.1, 0.2, 0.3, 0.4, 0.5] * 100 # 500-dim vector + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query_vector=query_vector, + top_k=10, + retriever_type="vector_store", + provider="chroma", + framework="langchain", + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.08) # Simulate API call + + # Simulate response + retrieval.documents_retrieved = 10 + retrieval.results = [ + {"id": f"doc{i}", "score": 0.95 - i * 0.05} + for i in range(10) + ] + + # Finish the retrieval operation + handler.stop_retrieval(retrieval) + + print("✓ Completed vector search retrieval") + print(f" Vector dimensions: {len(query_vector)}") + print(f" Documents retrieved: {retrieval.documents_retrieved}") + print(f" Framework: {retrieval.framework}") + + +def example_retrieval_with_metadata(): + """Example 3: Retrieval with result metadata.""" + print("\n" + "=" * 60) + print("Example 3: Retrieval with Metadata") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create retrieval + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="machine learning tutorials", + top_k=3, + retriever_type="hybrid_search", + provider="weaviate", + framework="langchain", + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.06) # Simulate API call + + # Simulate response with metadata + retrieval.documents_retrieved = 3 + retrieval.results = [ + { + "id": "tut1", + "score": 0.92, + "content": "Intro to ML", + "metadata": {"category": "tutorial", "difficulty": "beginner"}, + }, + { + "id": "tut2", + "score": 0.88, + "content": "Python ML basics", + "metadata": {"category": "tutorial", "difficulty": "beginner"}, + }, + { + "id": "tut3", + "score": 0.85, + "content": "Getting started with ML", + "metadata": {"category": "tutorial", "difficulty": "beginner"}, + }, + ] + + # Finish the retrieval operation + handler.stop_retrieval(retrieval) + + print("✓ Completed retrieval with metadata") + print(f" Query: {retrieval.query}") + print(f" Retriever type: {retrieval.retriever_type}") + print(f" Documents retrieved: {retrieval.documents_retrieved}") + + +def example_retrieval_with_custom_attributes(): + """Example 4: Retrieval with custom attributes.""" + print("\n" + "=" * 60) + print("Example 4: Retrieval with Custom Attributes") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create retrieval with custom attributes + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="customer support documentation", + top_k=5, + retriever_type="semantic_search", + provider="qdrant", + attributes={ + "collection_name": "support_docs", + "user_id": "user-789", + "session_id": "session-456", + "search_type": "semantic", + }, + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.07) # Simulate API call + + # Simulate response + retrieval.documents_retrieved = 5 + + # Finish the retrieval operation + handler.stop_retrieval(retrieval) + + print("✓ Completed retrieval with custom attributes") + print(f" Query: {retrieval.query}") + print(f" Custom attributes: {retrieval.attributes}") + + +def example_retrieval_with_agent_context(): + """Example 5: Retrieval within an agent context.""" + print("\n" + "=" * 60) + print("Example 5: Retrieval with Agent Context") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create retrieval with agent context + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="latest product updates", + top_k=7, + retriever_type="vector_store", + provider="milvus", + framework="langchain", + agent_name="product_assistant", + agent_id="agent-123", + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.05) # Simulate API call + + # Simulate response + retrieval.documents_retrieved = 7 + + # Finish the retrieval operation + handler.stop_retrieval(retrieval) + + print("✓ Completed retrieval with agent context") + print(f" Agent: {retrieval.agent_name} (ID: {retrieval.agent_id})") + print(f" Query: {retrieval.query}") + print(f" Documents retrieved: {retrieval.documents_retrieved}") + + +def example_retrieval_error(): + """Example 6: Handling retrieval errors.""" + print("\n" + "=" * 60) + print("Example 6: Retrieval Error Handling") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create retrieval invocation + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="test query", + top_k=5, + retriever_type="vector_store", + provider="pinecone", + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.03) # Simulate API call + + # Simulate an error + error = Error( + message="Connection timeout to vector store", + type=TimeoutError, + ) + + # Fail the retrieval operation + handler.fail_retrieval(retrieval, error) + + print("✗ Retrieval failed with error") + print(f" Error: {error.message}") + print(f" Provider: {retrieval.provider}") + + +def example_multiple_retrievals(): + """Example 7: Multiple sequential retrievals.""" + print("\n" + "=" * 60) + print("Example 7: Multiple Sequential Retrievals") + print("=" * 60) + + handler = get_telemetry_handler() + + queries = [ + "What is machine learning?", + "How does deep learning work?", + "Explain neural networks", + ] + + for idx, query_text in enumerate(queries, 1): + retrieval = RetrievalInvocation( + operation_name="retrieval", + query=query_text, + top_k=5, + retriever_type="vector_store", + provider="pinecone", + attributes={"query_index": idx}, + ) + + handler.start_retrieval(retrieval) + time.sleep(0.04) # Simulate API call + + # Simulate response + retrieval.documents_retrieved = 5 + + handler.stop_retrieval(retrieval) + print(f" ✓ Completed retrieval {idx}/{len(queries)}") + + print(f"✓ Completed all {len(queries)} retrievals") + + +def example_hybrid_retrieval(): + """Example 8: Hybrid retrieval combining text and vector search.""" + print("\n" + "=" * 60) + print("Example 8: Hybrid Retrieval") + print("=" * 60) + + handler = get_telemetry_handler() + + # Create hybrid retrieval with both query and vector + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="artificial intelligence applications", + query_vector=[0.2] * 768, # 768-dim vector + top_k=8, + retriever_type="hybrid_search", + provider="elasticsearch", + framework="langchain", + attributes={ + "alpha": 0.5, # Balance between text and vector search + "boost_query": True, + }, + ) + + # Start the retrieval operation + handler.start_retrieval(retrieval) + time.sleep(0.09) # Simulate API call + + # Simulate response + retrieval.documents_retrieved = 8 + retrieval.results = [ + {"id": f"doc{i}", "score": 0.9 - i * 0.05, "type": "hybrid"} + for i in range(8) + ] + + # Finish the retrieval operation + handler.stop_retrieval(retrieval) + + print("✓ Completed hybrid retrieval") + print(f" Query: {retrieval.query}") + print(f" Vector dimensions: {len(retrieval.query_vector)}") + print(f" Retriever type: {retrieval.retriever_type}") + print(f" Documents retrieved: {retrieval.documents_retrieved}") + + +def main(): + """Run all retrieval examples.""" + print("\n" + "=" * 60) + print("OpenTelemetry GenAI Retrievals Examples") + print("=" * 60) + + # Set up telemetry + trace_provider, meter_provider, logger_provider = setup_telemetry() + + # Run examples + example_basic_retrieval() + example_vector_search() + example_retrieval_with_metadata() + example_retrieval_with_custom_attributes() + example_retrieval_with_agent_context() + example_retrieval_error() + example_multiple_retrievals() + example_hybrid_retrieval() + + # Force flush to ensure all telemetry is exported + print("\n" + "=" * 60) + print("Flushing telemetry data...") + print("=" * 60) + trace_provider.force_flush() + meter_provider.force_flush() + logger_provider.force_flush() + + print("\n✓ All examples completed successfully!") + print("Check the console output above for spans, metrics, and events.\n") + + +if __name__ == "__main__": + main() diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/attributes.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/attributes.py index 3c64da6e..a496582e 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/attributes.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/attributes.py @@ -55,6 +55,13 @@ GEN_AI_EMBEDDINGS_INPUT_TEXTS = "gen_ai.embeddings.input.texts" GEN_AI_REQUEST_ENCODING_FORMATS = "gen_ai.request.encoding_formats" +# Retrieval attributes +GEN_AI_RETRIEVAL_TYPE = "gen_ai.retrieval.type" +GEN_AI_RETRIEVAL_QUERY_TEXT = "gen_ai.retrieval.query.text" +GEN_AI_RETRIEVAL_TOP_K = "gen_ai.retrieval.top_k" +GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED = "gen_ai.retrieval.documents_retrieved" +GEN_AI_RETRIEVAL_DOCUMENTS = "gen_ai.retrieval.documents" + # Server attributes (from semantic conventions) SERVER_ADDRESS = "server.address" SERVER_PORT = "server.port" diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/metrics.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/metrics.py index f5291c1c..4988fc0d 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/metrics.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/metrics.py @@ -16,6 +16,7 @@ from ..types import ( AgentInvocation, EmbeddingInvocation, + RetrievalInvocation, Error, LLMInvocation, ToolCall, @@ -50,6 +51,9 @@ def __init__(self, meter: Optional[Meter] = None): self._agent_duration_histogram: Histogram = ( instruments.agent_duration_histogram ) + self._retrieval_duration_histogram: Histogram = ( + instruments.retrieval_duration_histogram + ) def on_start(self, obj: Any) -> None: # no-op for metrics return None @@ -146,6 +150,9 @@ def on_end(self, obj: Any) -> None: span=getattr(embedding_invocation, "span", None), ) + if isinstance(obj, RetrievalInvocation): + self._record_retrieval_metrics(obj) + def on_error(self, error: Error, obj: Any) -> None: # Handle new agentic types if isinstance(obj, Workflow): @@ -242,6 +249,9 @@ def on_error(self, error: Error, obj: Any) -> None: span=getattr(embedding_invocation, "span", None), ) + if isinstance(obj, RetrievalInvocation): + self._record_retrieval_metrics(obj, error) + def handles(self, obj: Any) -> bool: return isinstance( obj, @@ -251,6 +261,7 @@ def handles(self, obj: Any) -> bool: Workflow, AgentInvocation, EmbeddingInvocation, + RetrievalInvocation, ), ) @@ -306,3 +317,40 @@ def _record_agent_metrics(self, agent: AgentInvocation) -> None: self._agent_duration_histogram.record( duration, attributes=metric_attrs, context=context ) + + def _record_retrieval_metrics( + self, retrieval: RetrievalInvocation, error: Optional[Error] = None + ) -> None: + """Record metrics for a retrieval operation.""" + if retrieval.end_time is None: + return + duration = retrieval.end_time - retrieval.start_time + metric_attrs = { + GenAI.GEN_AI_OPERATION_NAME: retrieval.operation_name, + } + if retrieval.retriever_type: + metric_attrs["gen_ai.retrieval.type"] = retrieval.retriever_type + if retrieval.framework: + metric_attrs["gen_ai.framework"] = retrieval.framework + if retrieval.provider: + metric_attrs[GenAI.GEN_AI_PROVIDER_NAME] = retrieval.provider + # Add agent context if available + if retrieval.agent_name: + metric_attrs[GenAI.GEN_AI_AGENT_NAME] = retrieval.agent_name + if retrieval.agent_id: + metric_attrs[GenAI.GEN_AI_AGENT_ID] = retrieval.agent_id + # Add error type if present + if error is not None and getattr(error, "type", None) is not None: + metric_attrs[ErrorAttributes.ERROR_TYPE] = error.type.__qualname__ + + context = None + span = getattr(retrieval, "span", None) + if span is not None: + try: + context = trace.set_span_in_context(span) + except (ValueError, RuntimeError): # pragma: no cover - defensive + context = None + + self._retrieval_duration_histogram.record( + duration, attributes=metric_attrs, context=context + ) \ No newline at end of file diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py index 5d1eab05..7961cd84 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py @@ -26,6 +26,10 @@ GEN_AI_OUTPUT_MESSAGES, GEN_AI_PROVIDER_NAME, GEN_AI_REQUEST_ENCODING_FORMATS, + GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED, + GEN_AI_RETRIEVAL_QUERY_TEXT, + GEN_AI_RETRIEVAL_TOP_K, + GEN_AI_RETRIEVAL_TYPE, GEN_AI_STEP_ASSIGNED_AGENT, GEN_AI_STEP_NAME, GEN_AI_STEP_OBJECTIVE, @@ -45,6 +49,7 @@ AgentInvocation, ContentCapturingMode, EmbeddingInvocation, + RetrievalInvocation, Error, LLMInvocation, Step, @@ -201,9 +206,10 @@ def _apply_start_attrs(self, invocation: GenAIType): provider = getattr(invocation, "provider", None) if provider: span.set_attribute(GEN_AI_PROVIDER_NAME, provider) - # framework (named field) - if isinstance(invocation, LLMInvocation) and invocation.framework: - span.set_attribute("gen_ai.framework", invocation.framework) + # framework (named field) - applies to all invocation types + framework = getattr(invocation, "framework", None) + if framework: + span.set_attribute("gen_ai.framework", framework) # function definitions (semantic conv derived from structured list) if isinstance(invocation, LLMInvocation): _apply_function_definitions(span, invocation.request_functions) @@ -302,6 +308,8 @@ def on_start( self._apply_start_attrs(invocation) elif isinstance(invocation, EmbeddingInvocation): self._start_embedding(invocation) + elif isinstance(invocation, RetrievalInvocation): + self._start_retrieval(invocation) else: # Use operation field for span name (defaults to "chat") operation = getattr(invocation, "operation", "chat") @@ -335,6 +343,8 @@ def on_end(self, invocation: LLMInvocation | EmbeddingInvocation) -> None: self._finish_step(invocation) elif isinstance(invocation, EmbeddingInvocation): self._finish_embedding(invocation) + elif isinstance(invocation, RetrievalInvocation): + self._finish_retrieval(invocation) else: span = getattr(invocation, "span", None) if span is None: @@ -359,6 +369,8 @@ def on_error( self._error_step(error, invocation) elif isinstance(invocation, EmbeddingInvocation): self._error_embedding(error, invocation) + elif isinstance(invocation, RetrievalInvocation): + self._error_retrieval(error, invocation) else: span = getattr(invocation, "span", None) if span is None: @@ -771,3 +783,70 @@ def _error_embedding( token.__exit__(None, None, None) # type: ignore[misc] except Exception: pass + + # ---- Retrieval lifecycle --------------------------------------------- + def _start_retrieval(self, retrieval: RetrievalInvocation) -> None: + """Start a retrieval span.""" + span_name = f"{retrieval.operation_name}" + if retrieval.provider: + span_name = f"{retrieval.operation_name} {retrieval.provider}" + parent_span = getattr(retrieval, "parent_span", None) + parent_ctx = ( + trace.set_span_in_context(parent_span) + if parent_span is not None + else None + ) + cm = self._tracer.start_as_current_span( + span_name, + kind=SpanKind.CLIENT, + end_on_exit=False, + context=parent_ctx, + ) + span = cm.__enter__() + self._attach_span(retrieval, span, cm) + self._apply_start_attrs(retrieval) + + # Set retrieval-specific start attributes + if retrieval.top_k is not None: + span.set_attribute(GEN_AI_RETRIEVAL_TOP_K, retrieval.top_k) + if self._capture_content and retrieval.query: + span.set_attribute(GEN_AI_RETRIEVAL_QUERY_TEXT, retrieval.query) + + def _finish_retrieval(self, retrieval: RetrievalInvocation) -> None: + """Finish a retrieval span.""" + span = retrieval.span + if span is None: + return + # Apply finish-time semantic conventions + if retrieval.documents_retrieved is not None: + span.set_attribute( + GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED, + retrieval.documents_retrieved, + ) + token = retrieval.context_token + if token is not None and hasattr(token, "__exit__"): + try: + token.__exit__(None, None, None) # type: ignore[misc] + except Exception: + pass + span.end() + + def _error_retrieval( + self, error: Error, retrieval: RetrievalInvocation + ) -> None: + """Fail a retrieval span with error status.""" + span = retrieval.span + if span is None: + return + span.set_status(Status(StatusCode.ERROR, error.message)) + if span.is_recording(): + span.set_attribute( + ErrorAttributes.ERROR_TYPE, error.type.__qualname__ + ) + token = retrieval.context_token + if token is not None and hasattr(token, "__exit__"): + try: + token.__exit__(None, None, None) # type: ignore[misc] + except Exception: + pass + span.end() \ No newline at end of file diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index b71b1ac0..dcaf76b0 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -87,6 +87,7 @@ def genai_debug_log(*_args: Any, **_kwargs: Any) -> None: # type: ignore AgentInvocation, ContentCapturingMode, EmbeddingInvocation, + RetrievalInvocation, Error, EvaluationResult, GenAI, @@ -475,6 +476,70 @@ def fail_embedding( pass return invocation + def start_retrieval( + self, invocation: RetrievalInvocation + ) -> RetrievalInvocation: + """Start a retrieval invocation and create a pending span entry.""" + self._refresh_capture_content() + if ( + not invocation.agent_name or not invocation.agent_id + ) and self._agent_context_stack: + top_name, top_id = self._agent_context_stack[-1] + if not invocation.agent_name: + invocation.agent_name = top_name + if not invocation.agent_id: + invocation.agent_id = top_id + invocation.start_time = time.time() + self._emitter.on_start(invocation) + span = getattr(invocation, "span", None) + if span is not None: + self._span_registry[str(invocation.run_id)] = span + self._entity_registry[str(invocation.run_id)] = invocation + return invocation + + def stop_retrieval( + self, invocation: RetrievalInvocation + ) -> RetrievalInvocation: + """Finalize a retrieval invocation successfully and end its span.""" + invocation.end_time = time.time() + + # Determine if this invocation should be sampled for evaluation + invocation.sample_for_evaluation = self._should_sample_for_evaluation( + invocation.trace_id + ) + + self._emitter.on_end(invocation) + self._notify_completion(invocation) + self._entity_registry.pop(str(invocation.run_id), None) + # Force flush metrics if a custom provider with force_flush is present + if ( + hasattr(self, "_meter_provider") + and self._meter_provider is not None + ): + try: # pragma: no cover + self._meter_provider.force_flush() # type: ignore[attr-defined] + except Exception: + pass + return invocation + + def fail_retrieval( + self, invocation: RetrievalInvocation, error: Error + ) -> RetrievalInvocation: + """Fail a retrieval invocation and end its span with error status.""" + invocation.end_time = time.time() + self._emitter.on_error(error, invocation) + self._notify_completion(invocation) + self._entity_registry.pop(str(invocation.run_id), None) + if ( + hasattr(self, "_meter_provider") + and self._meter_provider is not None + ): + try: # pragma: no cover + self._meter_provider.force_flush() # type: ignore[attr-defined] + except Exception: + pass + return invocation + # ToolCall lifecycle -------------------------------------------------- def start_tool_call(self, invocation: ToolCall) -> ToolCall: """Start a tool call invocation and create a pending span entry.""" @@ -880,6 +945,8 @@ def start(self, obj: Any) -> Any: return self.start_llm(obj) if isinstance(obj, EmbeddingInvocation): return self.start_embedding(obj) + if isinstance(obj, RetrievalInvocation): + return self.start_retrieval(obj) if isinstance(obj, ToolCall): return self.start_tool_call(obj) return obj @@ -960,6 +1027,8 @@ def finish(self, obj: Any) -> Any: return self.stop_llm(obj) if isinstance(obj, EmbeddingInvocation): return self.stop_embedding(obj) + if isinstance(obj, RetrievalInvocation): + return self.stop_retrieval(obj) if isinstance(obj, ToolCall): return self.stop_tool_call(obj) return obj @@ -976,6 +1045,8 @@ def fail(self, obj: Any, error: Error) -> Any: return self.fail_llm(obj, error) if isinstance(obj, EmbeddingInvocation): return self.fail_embedding(obj, error) + if isinstance(obj, RetrievalInvocation): + return self.fail_retrieval(obj, error) if isinstance(obj, ToolCall): return self.fail_tool_call(obj, error) return obj diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py index fd7381c6..9d88d627 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py @@ -42,3 +42,8 @@ def __init__(self, meter: Meter): unit="s", description="Duration of agent operations", ) + self.retrieval_duration_histogram: Histogram = meter.create_histogram( + name="gen_ai.retrieval.duration", + unit="s", + description="Duration of retrieval operations", + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 12424b71..1181494c 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -317,6 +317,42 @@ class EmbeddingInvocation(GenAI): ) error_type: Optional[str] = None +@dataclass +class RetrievalInvocation(GenAI): + """Represents a single retrieval/search invocation.""" + + #Required attribute + operation_name: str = field( + default="retrieval", + metadata={"semconv": GenAIAttributes.GEN_AI_OPERATION_NAME}, + ) + + # Recommended attributes + retriever_type: Optional[str] = field( + default=None, + metadata={"semconv": "gen_ai.retrieval.type"}, + ) + query: Optional[str] = field( + default=None, + metadata={"semconv": "gen_ai.retrieval.query.text"}, + ) + top_k: Optional[int] = field( + default=None, + metadata={"semconv": "gen_ai.retrieval.top_k"}, + ) + documents_retrieved: Optional[int] = field( + default=None, + metadata={"semconv": "gen_ai.retrieval.documents_retrieved"}, + ) + + # Opt-in attribute + results: list[dict[str, Any]] = field( + default_factory=list, + metadata={"semconv": "gen_ai.retrieval.documents"}, + ) + + # Additional utility fields (not in semantic conventions) + query_vector: Optional[list[float]] = None @dataclass class Workflow(GenAI): diff --git a/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py b/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py new file mode 100644 index 00000000..e63c26bf --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py @@ -0,0 +1,406 @@ +"""Tests for RetrievalInvocation lifecycle and telemetry.""" + +import pytest +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.util.genai.attributes import ( + GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED, + GEN_AI_RETRIEVAL_QUERY_TEXT, + GEN_AI_RETRIEVAL_TOP_K, + GEN_AI_RETRIEVAL_TYPE, +) +from opentelemetry.util.genai.handler import get_telemetry_handler +from opentelemetry.util.genai.types import Error, RetrievalInvocation + + +def test_retrieval_invocation_basic_lifecycle(): + """Test basic start/stop lifecycle for retrieval invocation.""" + handler = get_telemetry_handler() + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="test query", + top_k=5, + retriever_type="vector_store", + provider="pinecone", + ) + + # Start should assign span + result = handler.start_retrieval(retrieval) + assert result is retrieval + assert retrieval.span is not None + assert retrieval.start_time is not None + + # Stop should set end_time and end span + retrieval.documents_retrieved = 5 + handler.stop_retrieval(retrieval) + assert retrieval.end_time is not None + assert retrieval.end_time >= retrieval.start_time + + +def test_retrieval_invocation_with_error(): + """Test error handling for retrieval invocation.""" + handler = get_telemetry_handler() + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="failing query", + top_k=10, + retriever_type="vector_store", + provider="chroma", + ) + + handler.start_retrieval(retrieval) + assert retrieval.span is not None + + # Fail the retrieval + error = Error(message="Connection timeout", type=TimeoutError) + handler.fail_retrieval(retrieval, error) + assert retrieval.end_time is not None + + +def test_retrieval_invocation_creates_span_with_attributes(): + """Test that retrieval invocation creates span with correct attributes.""" + # Set up in-memory span exporter + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + handler = get_telemetry_handler() + span_emitters = list(handler._emitter.emitters_for("span")) + if span_emitters: + span_emitters[0]._tracer = tracer_provider.get_tracer(__name__) + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="OpenTelemetry documentation", + top_k=7, + retriever_type="semantic_search", + provider="weaviate", + framework="langchain", + ) + + handler.start_retrieval(retrieval) + retrieval.documents_retrieved = 7 + handler.stop_retrieval(retrieval) + + # Get exported spans + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + attrs = span.attributes + + # Check required attributes + assert attrs[GenAI.GEN_AI_OPERATION_NAME] == "retrieval" + + # Check recommended attributes + assert attrs[GEN_AI_RETRIEVAL_TYPE] == "semantic_search" + assert attrs[GEN_AI_RETRIEVAL_TOP_K] == 7 + assert attrs[GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED] == 7 + + # Check provider and framework + assert attrs[GenAI.GEN_AI_PROVIDER_NAME] == "weaviate" + assert attrs.get("gen_ai.framework") == "langchain" + + +def test_retrieval_invocation_with_vector_search(): + """Test retrieval with query vector.""" + handler = get_telemetry_handler() + query_vector = [0.1, 0.2, 0.3] * 256 # 768-dim vector + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query_vector=query_vector, + top_k=10, + retriever_type="vector_store", + provider="pinecone", + ) + + handler.start_retrieval(retrieval) + assert retrieval.span is not None + assert retrieval.query_vector == query_vector + + retrieval.documents_retrieved = 10 + handler.stop_retrieval(retrieval) + assert retrieval.end_time is not None + + +def test_retrieval_invocation_with_hybrid_search(): + """Test retrieval with both text query and vector.""" + handler = get_telemetry_handler() + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="machine learning", + query_vector=[0.5] * 384, + top_k=15, + retriever_type="hybrid_search", + provider="elasticsearch", + ) + + handler.start_retrieval(retrieval) + assert retrieval.span is not None + assert retrieval.query == "machine learning" + assert len(retrieval.query_vector) == 384 + + retrieval.documents_retrieved = 15 + handler.stop_retrieval(retrieval) + + +def test_retrieval_invocation_with_agent_context(): + """Test retrieval within agent context.""" + handler = get_telemetry_handler() + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="product information", + top_k=5, + retriever_type="vector_store", + provider="milvus", + agent_name="product_assistant", + agent_id="agent-123", + ) + + handler.start_retrieval(retrieval) + assert retrieval.span is not None + assert retrieval.agent_name == "product_assistant" + assert retrieval.agent_id == "agent-123" + + retrieval.documents_retrieved = 5 + handler.stop_retrieval(retrieval) + + +def test_retrieval_invocation_with_custom_attributes(): + """Test retrieval with custom attributes.""" + handler = get_telemetry_handler() + + custom_attrs = { + "collection_name": "docs", + "user_id": "user-456", + "session_id": "session-789", + } + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="custom search", + top_k=3, + retriever_type="vector_store", + provider="qdrant", + attributes=custom_attrs, + ) + + handler.start_retrieval(retrieval) + assert retrieval.span is not None + assert retrieval.attributes == custom_attrs + + retrieval.documents_retrieved = 3 + handler.stop_retrieval(retrieval) + + +def test_retrieval_invocation_with_results(): + """Test retrieval with result documents.""" + handler = get_telemetry_handler() + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="test", + top_k=2, + retriever_type="vector_store", + provider="pinecone", + ) + + handler.start_retrieval(retrieval) + + # Populate results + retrieval.documents_retrieved = 2 + retrieval.results = [ + {"id": "doc1", "score": 0.95, "content": "First document"}, + {"id": "doc2", "score": 0.87, "content": "Second document"}, + ] + + handler.stop_retrieval(retrieval) + assert len(retrieval.results) == 2 + assert retrieval.results[0]["score"] == 0.95 + + +def test_retrieval_invocation_semantic_convention_attributes(): + """Test that semantic convention attributes are properly extracted.""" + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="semantic test", + top_k=5, + retriever_type="vector_store", + provider="test_provider", + ) + + semconv_attrs = retrieval.semantic_convention_attributes() + + # Check that semantic convention attributes are present + assert GenAI.GEN_AI_OPERATION_NAME in semconv_attrs + assert semconv_attrs[GenAI.GEN_AI_OPERATION_NAME] == "retrieval" + assert "gen_ai.retrieval.type" in semconv_attrs + assert semconv_attrs["gen_ai.retrieval.type"] == "vector_store" + assert "gen_ai.retrieval.query.text" in semconv_attrs + assert semconv_attrs["gen_ai.retrieval.query.text"] == "semantic test" + assert "gen_ai.retrieval.top_k" in semconv_attrs + assert semconv_attrs["gen_ai.retrieval.top_k"] == 5 + + +def test_retrieval_invocation_minimal_required_fields(): + """Test retrieval with only required fields.""" + handler = get_telemetry_handler() + + # Only operation_name is required + retrieval = RetrievalInvocation( + operation_name="retrieval", + ) + + handler.start_retrieval(retrieval) + assert retrieval.span is not None + + handler.stop_retrieval(retrieval) + assert retrieval.end_time is not None + + +def test_retrieval_invocation_multiple_sequential(): + """Test multiple sequential retrieval invocations.""" + handler = get_telemetry_handler() + + queries = ["query1", "query2", "query3"] + retrievals = [] + + for query in queries: + retrieval = RetrievalInvocation( + operation_name="retrieval", + query=query, + top_k=5, + retriever_type="vector_store", + provider="pinecone", + ) + handler.start_retrieval(retrieval) + retrieval.documents_retrieved = 5 + handler.stop_retrieval(retrieval) + retrievals.append(retrieval) + + # All should have completed successfully + assert len(retrievals) == 3 + for retrieval in retrievals: + assert retrieval.span is not None + assert retrieval.end_time is not None + + +def test_generic_start_finish_for_retrieval(): + """Test generic handler methods route to retrieval lifecycle.""" + handler = get_telemetry_handler() + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="generic test", + top_k=5, + retriever_type="vector_store", + provider="test", + ) + + # Generic methods should route to retrieval lifecycle + handler.start(retrieval) + assert retrieval.span is not None + + handler.finish(retrieval) + assert retrieval.end_time is not None + + # Test fail path + retrieval2 = RetrievalInvocation( + operation_name="retrieval", + query="fail test", + top_k=3, + ) + handler.start(retrieval2) + handler.fail(retrieval2, Error(message="test error", type=RuntimeError)) + assert retrieval2.end_time is not None + + +def test_retrieval_invocation_span_name(): + """Test that span name is correctly formatted.""" + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + handler = get_telemetry_handler() + span_emitters = list(handler._emitter.emitters_for("span")) + if span_emitters: + span_emitters[0]._tracer = tracer_provider.get_tracer(__name__) + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="test", + provider="pinecone", + ) + + handler.start_retrieval(retrieval) + handler.stop_retrieval(retrieval) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + # Span name should be "retrieval pinecone" + assert spans[0].name == "retrieval pinecone" + + +def test_retrieval_invocation_without_provider(): + """Test retrieval without provider (span name should be just operation).""" + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + handler = get_telemetry_handler() + span_emitters = list(handler._emitter.emitters_for("span")) + if span_emitters: + span_emitters[0]._tracer = tracer_provider.get_tracer(__name__) + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="test", + ) + + handler.start_retrieval(retrieval) + handler.stop_retrieval(retrieval) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + # Span name should be just "retrieval" + assert spans[0].name == "retrieval" + + +@pytest.mark.parametrize( + "retriever_type,provider", + [ + ("vector_store", "pinecone"), + ("semantic_search", "weaviate"), + ("hybrid_search", "elasticsearch"), + ("keyword_search", "opensearch"), + ], +) +def test_retrieval_invocation_different_types(retriever_type, provider): + """Test retrieval with different retriever types and providers.""" + handler = get_telemetry_handler() + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query=f"test {retriever_type}", + top_k=5, + retriever_type=retriever_type, + provider=provider, + ) + + handler.start_retrieval(retrieval) + assert retrieval.span is not None + assert retrieval.retriever_type == retriever_type + assert retrieval.provider == provider + + retrieval.documents_retrieved = 5 + handler.stop_retrieval(retrieval) + assert retrieval.end_time is not None From 323707b771191fde0e16886cf40e2ea006ffb6d9 Mon Sep 17 00:00:00 2001 From: JWinermaSplunk Date: Fri, 5 Dec 2025 10:09:04 -0800 Subject: [PATCH 06/14] lint --- .../examples/retrievals_example.py | 3 +-- .../src/opentelemetry/util/genai/emitters/metrics.py | 4 ++-- .../src/opentelemetry/util/genai/emitters/span.py | 5 ++--- .../src/opentelemetry/util/genai/handler.py | 2 +- .../src/opentelemetry/util/genai/types.py | 10 ++++++---- .../tests/test_retrieval_invocation.py | 2 +- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/util/opentelemetry-util-genai/examples/retrievals_example.py b/util/opentelemetry-util-genai/examples/retrievals_example.py index 0e6b8833..fbbbe5c1 100644 --- a/util/opentelemetry-util-genai/examples/retrievals_example.py +++ b/util/opentelemetry-util-genai/examples/retrievals_example.py @@ -126,8 +126,7 @@ def example_vector_search(): # Simulate response retrieval.documents_retrieved = 10 retrieval.results = [ - {"id": f"doc{i}", "score": 0.95 - i * 0.05} - for i in range(10) + {"id": f"doc{i}", "score": 0.95 - i * 0.05} for i in range(10) ] # Finish the retrieval operation diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/metrics.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/metrics.py index 4988fc0d..c2f7ceb5 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/metrics.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/metrics.py @@ -16,9 +16,9 @@ from ..types import ( AgentInvocation, EmbeddingInvocation, - RetrievalInvocation, Error, LLMInvocation, + RetrievalInvocation, ToolCall, Workflow, ) @@ -353,4 +353,4 @@ def _record_retrieval_metrics( self._retrieval_duration_histogram.record( duration, attributes=metric_attrs, context=context - ) \ No newline at end of file + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py index 7961cd84..d9324a4f 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py @@ -29,7 +29,6 @@ GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED, GEN_AI_RETRIEVAL_QUERY_TEXT, GEN_AI_RETRIEVAL_TOP_K, - GEN_AI_RETRIEVAL_TYPE, GEN_AI_STEP_ASSIGNED_AGENT, GEN_AI_STEP_NAME, GEN_AI_STEP_OBJECTIVE, @@ -49,9 +48,9 @@ AgentInvocation, ContentCapturingMode, EmbeddingInvocation, - RetrievalInvocation, Error, LLMInvocation, + RetrievalInvocation, Step, ToolCall, Workflow, @@ -849,4 +848,4 @@ def _error_retrieval( token.__exit__(None, None, None) # type: ignore[misc] except Exception: pass - span.end() \ No newline at end of file + span.end() diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index dcaf76b0..544593ea 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -87,11 +87,11 @@ def genai_debug_log(*_args: Any, **_kwargs: Any) -> None: # type: ignore AgentInvocation, ContentCapturingMode, EmbeddingInvocation, - RetrievalInvocation, Error, EvaluationResult, GenAI, LLMInvocation, + RetrievalInvocation, Step, ToolCall, Workflow, diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 1181494c..b2984b2e 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -317,16 +317,17 @@ class EmbeddingInvocation(GenAI): ) error_type: Optional[str] = None + @dataclass class RetrievalInvocation(GenAI): """Represents a single retrieval/search invocation.""" - #Required attribute + # Required attribute operation_name: str = field( default="retrieval", metadata={"semconv": GenAIAttributes.GEN_AI_OPERATION_NAME}, ) - + # Recommended attributes retriever_type: Optional[str] = field( default=None, @@ -344,16 +345,17 @@ class RetrievalInvocation(GenAI): default=None, metadata={"semconv": "gen_ai.retrieval.documents_retrieved"}, ) - + # Opt-in attribute results: list[dict[str, Any]] = field( default_factory=list, metadata={"semconv": "gen_ai.retrieval.documents"}, ) - + # Additional utility fields (not in semantic conventions) query_vector: Optional[list[float]] = None + @dataclass class Workflow(GenAI): """Represents a workflow orchestrating multiple agents and steps. diff --git a/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py b/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py index e63c26bf..18239c4e 100644 --- a/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py +++ b/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py @@ -1,6 +1,7 @@ """Tests for RetrievalInvocation lifecycle and telemetry.""" import pytest + from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( @@ -11,7 +12,6 @@ ) from opentelemetry.util.genai.attributes import ( GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED, - GEN_AI_RETRIEVAL_QUERY_TEXT, GEN_AI_RETRIEVAL_TOP_K, GEN_AI_RETRIEVAL_TYPE, ) From 1157310536ec7420086c9f4875be9a2ff0736384 Mon Sep 17 00:00:00 2001 From: JWinermaSplunk Date: Fri, 5 Dec 2025 13:40:01 -0800 Subject: [PATCH 07/14] add additional attributes --- README.md | 30 +++++++- .../opentelemetry/util/genai/emitters/span.py | 9 +++ .../src/opentelemetry/util/genai/types.py | 7 ++ .../tests/test_retrieval_invocation.py | 75 +++++++++++++++++++ 4 files changed, 120 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index dd29393e..939fb24d 100644 --- a/README.md +++ b/README.md @@ -291,7 +291,35 @@ inv.output_messages = [OutputMessage(role="assistant", parts=[Text("Hi!")], fini handler.stop_llm(inv) ``` -## 16. Validation Strategy +## 16. Resolving CI Lint Failures + +If the CI lint job fails on your PR, you can automatically fix all linting and formatting issues: + +```bash +make lint +``` + +This command will: +1. Install ruff 0.6.9 (the version used in CI) +2. Auto-fix all linting issues +3. Auto-format all code +4. Verify that all checks pass + +After running the command, commit and push the changes: + +```bash +git add . +git commit -m "fix: auto-fix linting issues" +git push +``` + +The CI lint job checks two things: +- **Linting**: `ruff check .` - checks for code quality issues +- **Formatting**: `ruff format --check .` - checks code formatting + +The `make lint` command fixes both automatically. + +## 17. Validation Strategy - Unit tests: env parsing, category overrides, evaluator grammar, sampling, content capture gating. - Future: ordering hints tests once implemented. diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py index d9324a4f..d69a9b37 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py @@ -806,6 +806,10 @@ def _start_retrieval(self, retrieval: RetrievalInvocation) -> None: self._apply_start_attrs(retrieval) # Set retrieval-specific start attributes + if retrieval.server_address: + span.set_attribute(SERVER_ADDRESS, retrieval.server_address) + if retrieval.server_port: + span.set_attribute(SERVER_PORT, retrieval.server_port) if retrieval.top_k is not None: span.set_attribute(GEN_AI_RETRIEVAL_TOP_K, retrieval.top_k) if self._capture_content and retrieval.query: @@ -842,6 +846,11 @@ def _error_retrieval( span.set_attribute( ErrorAttributes.ERROR_TYPE, error.type.__qualname__ ) + # Set error type from invocation if available + if retrieval.error_type: + span.set_attribute( + ErrorAttributes.ERROR_TYPE, retrieval.error_type + ) token = retrieval.context_token if token is not None and hasattr(token, "__exit__"): try: diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index b2984b2e..09b255ca 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -333,6 +333,10 @@ class RetrievalInvocation(GenAI): default=None, metadata={"semconv": "gen_ai.retrieval.type"}, ) + request_model: Optional[str] = field( + default=None, + metadata={"semconv": GenAIAttributes.GEN_AI_REQUEST_MODEL}, + ) query: Optional[str] = field( default=None, metadata={"semconv": "gen_ai.retrieval.query.text"}, @@ -354,6 +358,9 @@ class RetrievalInvocation(GenAI): # Additional utility fields (not in semantic conventions) query_vector: Optional[list[float]] = None + server_port: Optional[int] = None + server_address: Optional[str] = None + error_type: Optional[str] = None @dataclass diff --git a/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py b/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py index 18239c4e..01a3d27f 100644 --- a/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py +++ b/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py @@ -232,6 +232,7 @@ def test_retrieval_invocation_semantic_convention_attributes(): """Test that semantic convention attributes are properly extracted.""" retrieval = RetrievalInvocation( operation_name="retrieval", + request_model="text-embedding-ada-002", query="semantic test", top_k=5, retriever_type="vector_store", @@ -243,6 +244,8 @@ def test_retrieval_invocation_semantic_convention_attributes(): # Check that semantic convention attributes are present assert GenAI.GEN_AI_OPERATION_NAME in semconv_attrs assert semconv_attrs[GenAI.GEN_AI_OPERATION_NAME] == "retrieval" + assert GenAI.GEN_AI_REQUEST_MODEL in semconv_attrs + assert semconv_attrs[GenAI.GEN_AI_REQUEST_MODEL] == "text-embedding-ada-002" assert "gen_ai.retrieval.type" in semconv_attrs assert semconv_attrs["gen_ai.retrieval.type"] == "vector_store" assert "gen_ai.retrieval.query.text" in semconv_attrs @@ -404,3 +407,75 @@ def test_retrieval_invocation_different_types(retriever_type, provider): retrieval.documents_retrieved = 5 handler.stop_retrieval(retrieval) assert retrieval.end_time is not None + + +def test_retrieval_invocation_with_server_and_model_attributes(): + """Test retrieval with server address, port, and model attributes.""" + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + handler = get_telemetry_handler() + span_emitters = list(handler._emitter.emitters_for("span")) + if span_emitters: + span_emitters[0]._tracer = tracer_provider.get_tracer(__name__) + + retrieval = RetrievalInvocation( + operation_name="retrieval", + request_model="text-embedding-ada-002", + query="test query", + top_k=5, + retriever_type="vector_store", + provider="weaviate", + server_address="localhost", + server_port=8080, + ) + + handler.start_retrieval(retrieval) + retrieval.documents_retrieved = 5 + handler.stop_retrieval(retrieval) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + attrs = span.attributes + + # Check new attributes + assert attrs[GenAI.GEN_AI_REQUEST_MODEL] == "text-embedding-ada-002" + assert attrs["server.address"] == "localhost" + assert attrs["server.port"] == 8080 + + +def test_retrieval_invocation_with_error_type(): + """Test retrieval with error_type attribute.""" + span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + handler = get_telemetry_handler() + span_emitters = list(handler._emitter.emitters_for("span")) + if span_emitters: + span_emitters[0]._tracer = tracer_provider.get_tracer(__name__) + + retrieval = RetrievalInvocation( + operation_name="retrieval", + query="test query", + top_k=5, + retriever_type="vector_store", + provider="pinecone", + error_type="ConnectionError", + ) + + handler.start_retrieval(retrieval) + error = Error(message="Connection failed", type=ConnectionError) + handler.fail_retrieval(retrieval, error) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + attrs = span.attributes + + # Check error type attribute (should be set from invocation.error_type) + assert attrs["error.type"] == "ConnectionError" From ab66267674ac1cebc385c36363d60148e5270a47 Mon Sep 17 00:00:00 2001 From: JWinermaSplunk Date: Fri, 5 Dec 2025 14:04:51 -0800 Subject: [PATCH 08/14] updates --- README.md | 30 +----------------------------- 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/README.md b/README.md index 939fb24d..dd29393e 100644 --- a/README.md +++ b/README.md @@ -291,35 +291,7 @@ inv.output_messages = [OutputMessage(role="assistant", parts=[Text("Hi!")], fini handler.stop_llm(inv) ``` -## 16. Resolving CI Lint Failures - -If the CI lint job fails on your PR, you can automatically fix all linting and formatting issues: - -```bash -make lint -``` - -This command will: -1. Install ruff 0.6.9 (the version used in CI) -2. Auto-fix all linting issues -3. Auto-format all code -4. Verify that all checks pass - -After running the command, commit and push the changes: - -```bash -git add . -git commit -m "fix: auto-fix linting issues" -git push -``` - -The CI lint job checks two things: -- **Linting**: `ruff check .` - checks for code quality issues -- **Formatting**: `ruff format --check .` - checks code formatting - -The `make lint` command fixes both automatically. - -## 17. Validation Strategy +## 16. Validation Strategy - Unit tests: env parsing, category overrides, evaluator grammar, sampling, content capture gating. - Future: ordering hints tests once implemented. From 0dcd49042b3937c37e3f168fc1dc24ad93958567 Mon Sep 17 00:00:00 2001 From: JWinermaSplunk Date: Mon, 8 Dec 2025 10:47:56 -0800 Subject: [PATCH 09/14] updates --- README.md | 1 + util/opentelemetry-util-genai/CHANGELOG.md | 4 ++++ .../src/opentelemetry/util/genai/types.py | 1 + .../tests/test_retrieval_invocation.py | 4 +++- 4 files changed, 9 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index dd29393e..4ab55cef 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ Implemented dataclasses (in `types.py`): - `GenAI` - base class - `LLMInvocation` - `EmbeddingInvocation` +- `RetrievalInvocation` - `Workflow` - `AgentInvocation` - `Step` diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 0f947233..03e0065a 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to this repository are documented in this file. +## Unreleased + +- Added `RetrievalInvocation` type to support retrieval/search operations in GenAI workflows + ## Version 0.1.4 - 2025-11-07 - Initial 0.1.4 release of splunk-otel-util-genai diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 09b255ca..8e6b8a9c 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -474,6 +474,7 @@ class Step(GenAI): "GenAI", "LLMInvocation", "EmbeddingInvocation", + "RetrievalInvocation", "Error", "EvaluationResult", # agentic AI types diff --git a/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py b/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py index 01a3d27f..96341b09 100644 --- a/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py +++ b/util/opentelemetry-util-genai/tests/test_retrieval_invocation.py @@ -245,7 +245,9 @@ def test_retrieval_invocation_semantic_convention_attributes(): assert GenAI.GEN_AI_OPERATION_NAME in semconv_attrs assert semconv_attrs[GenAI.GEN_AI_OPERATION_NAME] == "retrieval" assert GenAI.GEN_AI_REQUEST_MODEL in semconv_attrs - assert semconv_attrs[GenAI.GEN_AI_REQUEST_MODEL] == "text-embedding-ada-002" + assert ( + semconv_attrs[GenAI.GEN_AI_REQUEST_MODEL] == "text-embedding-ada-002" + ) assert "gen_ai.retrieval.type" in semconv_attrs assert semconv_attrs["gen_ai.retrieval.type"] == "vector_store" assert "gen_ai.retrieval.query.text" in semconv_attrs From 00b1fe92839b090ad1e8d6bf661cff090433cc16 Mon Sep 17 00:00:00 2001 From: shuningc Date: Tue, 16 Dec 2025 05:40:17 -0800 Subject: [PATCH 10/14] Use RetrievalInvocation instead of Step for retrieval operations --- .../llamaindex/callback_handler.py | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py index a070e06d..d829623c 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -8,6 +8,7 @@ InputMessage, LLMInvocation, OutputMessage, + RetrievalInvocation, Step, Text, Workflow, @@ -411,14 +412,14 @@ def _handle_retrieve_start( payload: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: - """Handle retrieval start - create Step for retrieve task.""" + """Handle retrieval start - create RetrievalInvocation for retrieve task.""" if not self._handler or not payload: return query_str = payload.get("query_str", "") # If parent_id doesn't exist or doesn't resolve to a tracked entity, - # create a root Workflow to hold the RAG steps + # create a root Workflow to hold the RAG operations parent_entity = self._handler.get_entity(parent_id) if parent_id else None if not parent_entity: @@ -445,12 +446,11 @@ def _handle_retrieve_start( # Valid parent exists - resolve to parent_span parent_span = self._get_parent_span(parent_id) - # Create a step for the retrieval task - step = Step( - name="retrieve.task", - step_type="retrieve", - objective="Retrieve relevant documents", - input_data=_safe_str(query_str), + # Create a retrieval invocation for the retrieval task + retrieval = RetrievalInvocation( + operation_name="retrieve", + retriever_type="llamaindex_retriever", + query=_safe_str(query_str), run_id=event_id, parent_run_id=parent_id if parent_id else None, attributes={}, @@ -458,9 +458,9 @@ def _handle_retrieve_start( # Set parent_span if we have one if parent_span: - step.parent_span = parent_span # type: ignore[attr-defined] + retrieval.parent_span = parent_span # type: ignore[attr-defined] - self._handler.start_step(step) + self._handler.start_retrieval(retrieval) def _handle_retrieve_end( self, @@ -468,19 +468,21 @@ def _handle_retrieve_end( payload: Optional[Dict[str, Any]], **kwargs: Any, ) -> None: - """Handle retrieval end - update step with retrieved nodes.""" + """Handle retrieval end - update RetrievalInvocation with retrieved nodes.""" if not self._handler: return - step = self._handler.get_entity(event_id) - if not step or not isinstance(step, Step): + retrieval = self._handler.get_entity(event_id) + if not retrieval or not isinstance(retrieval, RetrievalInvocation): return if payload: nodes = payload.get("nodes", []) if nodes: - # Store document count and scores - step.attributes["retrieve.documents_count"] = len(nodes) + # Set document count + retrieval.document_count = len(nodes) + + # Store scores and document IDs as attributes scores = [] doc_ids = [] for node in nodes: @@ -492,14 +494,11 @@ def _handle_retrieve_end( doc_ids.append(str(node.id_)) if scores: - step.attributes["retrieve.scores"] = scores + retrieval.attributes["retrieve.scores"] = scores if doc_ids: - step.attributes["retrieve.document_ids"] = doc_ids - - # Create output summary - step.output_data = f"Retrieved {len(nodes)} documents" + retrieval.attributes["retrieve.document_ids"] = doc_ids - self._handler.stop_step(step) + self._handler.stop_retrieval(retrieval) def _handle_synthesize_start( self, From a24ea2299686ac0640a8de12b2c6fe320ac91ba6 Mon Sep 17 00:00:00 2001 From: shuningc Date: Tue, 16 Dec 2025 06:20:59 -0800 Subject: [PATCH 11/14] Apply ruff formatting to llamaindex instrumentation files --- .../instrumentation/llamaindex/__init__.py | 4 +- .../llamaindex/callback_handler.py | 60 ++++++++----------- .../llamaindex/vendor_detection.py | 6 +- .../tests/test_embedding_instrumentation.py | 6 +- .../tests/test_llm_instrumentation.py | 31 +++++----- .../tests/test_rag.py | 11 ++-- 6 files changed, 56 insertions(+), 62 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py index c3bea984..c8c760e3 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py @@ -19,9 +19,7 @@ def __init__( super().__init__() Config._exception_logger = exception_logger Config.use_legacy_attributes = use_legacy_attributes - self._disable_trace_context_propagation = ( - disable_trace_context_propagation - ) + self._disable_trace_context_propagation = disable_trace_context_propagation self._telemetry_handler = None def instrumentation_dependencies(self): diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py index d829623c..a68419bd 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -116,9 +116,7 @@ def _handle_llm_start( # Extract model information from payload serialized = payload.get("serialized", {}) model_name = ( - serialized.get("model") - or serialized.get("model_name") - or "unknown" + serialized.get("model") or serialized.get("model_name") or "unknown" ) # Extract messages from payload @@ -131,16 +129,12 @@ def _handle_llm_start( if hasattr(msg, "content") and hasattr(msg, "role"): # Extract role - could be MessageRole enum role_value = ( - str(msg.role.value) - if hasattr(msg.role, "value") - else str(msg.role) + str(msg.role.value) if hasattr(msg.role, "value") else str(msg.role) ) # Extract content - this is a property that pulls from blocks[0].text content = _safe_str(msg.content) input_messages.append( - InputMessage( - role=role_value, parts=[Text(content=content)] - ) + InputMessage(role=role_value, parts=[Text(content=content)]) ) elif isinstance(msg, dict): # Handle serialized messages (dict format) @@ -153,9 +147,7 @@ def _handle_llm_start( # Fallback to direct content field content = msg.get("content", "") - role_value = ( - str(role.value) if hasattr(role, "value") else str(role) - ) + role_value = str(role.value) if hasattr(role, "value") else str(role) input_messages.append( InputMessage( role=role_value, @@ -172,7 +164,7 @@ def _handle_llm_start( parent_run_id=parent_id if parent_id else None, # Set parent for hierarchy ) llm_inv.framework = "llamaindex" - + # Resolve parent_id to parent_span for proper span context parent_span = self._get_parent_span(parent_id) if parent_span: @@ -180,7 +172,7 @@ def _handle_llm_start( # Start the LLM invocation (handler stores it in _entity_registry) self._handler.start_llm(llm_inv) - + def _handle_llm_end( self, event_id: str, @@ -247,7 +239,7 @@ def _handle_llm_end( else: # It's an object, try to get usage attribute usage = getattr(raw_response, "usage", None) - + if usage: # usage could also be dict or object if isinstance(usage, dict): @@ -255,7 +247,9 @@ def _handle_llm_end( llm_inv.output_tokens = usage.get("completion_tokens") else: llm_inv.input_tokens = getattr(usage, "prompt_tokens", None) - llm_inv.output_tokens = getattr(usage, "completion_tokens", None) + llm_inv.output_tokens = getattr( + usage, "completion_tokens", None + ) # Stop the LLM invocation self._handler.stop_llm(llm_inv) @@ -274,9 +268,7 @@ def _handle_embedding_start( # Extract model information from payload serialized = payload.get("serialized", {}) model_name = ( - serialized.get("model_name") - or serialized.get("model") - or "unknown" + serialized.get("model_name") or serialized.get("model") or "unknown" ) # Detect provider from class name @@ -296,7 +288,7 @@ def _handle_embedding_start( parent_run_id=parent_id if parent_id else None, # Set parent for hierarchy ) emb_inv.framework = "llamaindex" - + # Resolve parent_id to parent_span for proper span context parent_span = self._get_parent_span(parent_id) if parent_span: @@ -326,11 +318,11 @@ def _handle_embedding_end( chunks = payload.get("chunks", []) if chunks: emb_inv.input_texts = [_safe_str(chunk) for chunk in chunks] - + # Extract embedding vectors from response # embeddings is the list of output vectors embeddings = payload.get("embeddings", []) - + # Determine dimension from first embedding vector if embeddings and len(embeddings) > 0: first_embedding = embeddings[0] @@ -354,7 +346,7 @@ def _handle_query_start( return query_str = payload.get("query_str", "") - + # If no parent, this is the root workflow if not parent_id: workflow = Workflow( @@ -417,11 +409,11 @@ def _handle_retrieve_start( return query_str = payload.get("query_str", "") - + # If parent_id doesn't exist or doesn't resolve to a tracked entity, # create a root Workflow to hold the RAG operations parent_entity = self._handler.get_entity(parent_id) if parent_id else None - + if not parent_entity: # No valid parent - create auto-workflow workflow_id = f"{event_id}_workflow" @@ -445,7 +437,7 @@ def _handle_retrieve_start( else: # Valid parent exists - resolve to parent_span parent_span = self._get_parent_span(parent_id) - + # Create a retrieval invocation for the retrieval task retrieval = RetrievalInvocation( operation_name="retrieve", @@ -455,11 +447,11 @@ def _handle_retrieve_start( parent_run_id=parent_id if parent_id else None, attributes={}, ) - + # Set parent_span if we have one if parent_span: retrieval.parent_span = parent_span # type: ignore[attr-defined] - + self._handler.start_retrieval(retrieval) def _handle_retrieve_end( @@ -481,7 +473,7 @@ def _handle_retrieve_end( if nodes: # Set document count retrieval.document_count = len(nodes) - + # Store scores and document IDs as attributes scores = [] doc_ids = [] @@ -492,7 +484,7 @@ def _handle_retrieve_end( doc_ids.append(str(node.node_id)) elif hasattr(node, "id_"): doc_ids.append(str(node.id_)) - + if scores: retrieval.attributes["retrieve.scores"] = scores if doc_ids: @@ -512,7 +504,7 @@ def _handle_synthesize_start( return query_str = payload.get("query_str", "") - + # Create a step for the synthesis task step = Step( name="synthesize.task", @@ -523,12 +515,12 @@ def _handle_synthesize_start( parent_run_id=parent_id if parent_id else None, attributes={}, ) - + # Resolve parent_id to parent_span for proper span context parent_span = self._get_parent_span(parent_id) if parent_span: step.parent_span = parent_span # type: ignore[attr-defined] - + self._handler.start_step(step) def _handle_synthesize_end( @@ -557,7 +549,7 @@ def _handle_synthesize_end( step.output_data = _safe_str(response_text) self._handler.stop_step(step) - + # If we auto-created a workflow, close it after synthesize completes if self._auto_workflow_id: workflow = self._handler.get_entity(self._auto_workflow_id) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py index 6f9c9f06..3feeaee9 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py @@ -44,7 +44,11 @@ def _get_vendor_rules() -> List[VendorRule]: vendor_name="aws", ), VendorRule( - exact_matches={"VertexTextEmbedding", "GeminiEmbedding", "GooglePaLMEmbedding"}, + exact_matches={ + "VertexTextEmbedding", + "GeminiEmbedding", + "GooglePaLMEmbedding", + }, patterns=["vertex", "google", "palm", "gemini"], vendor_name="google", ), diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py index 355a0570..9828bb58 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py @@ -22,10 +22,10 @@ def setup_telemetry(): """Setup OpenTelemetry with span and metric exporters (once).""" global metric_reader, instrumentor - + if metric_reader is not None: return metric_reader - + # Enable metrics os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" @@ -145,7 +145,7 @@ def test_embedding_batch(): test_embedding_single_text() print("\n" + "=" * 60 + "\n") test_embedding_batch() - + # Cleanup if instrumentor: instrumentor.uninstrument() diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py index 50324c37..3081a15c 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py @@ -21,9 +21,7 @@ def setup_telemetry(): # Setup tracing trace.set_tracer_provider(TracerProvider()) tracer_provider = trace.get_tracer_provider() - tracer_provider.add_span_processor( - SimpleSpanProcessor(ConsoleSpanExporter()) - ) + tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) # Setup metrics with InMemoryMetricReader metric_reader = InMemoryMetricReader() @@ -43,12 +41,8 @@ def test_with_openai(): llm = OpenAI(model="gpt-3.5-turbo") messages = [ - ChatMessage( - role=MessageRole.SYSTEM, content="You are a helpful assistant." - ), - ChatMessage( - role=MessageRole.USER, content="Say hello in exactly 5 words" - ), + ChatMessage(role=MessageRole.SYSTEM, content="You are a helpful assistant."), + ChatMessage(role=MessageRole.USER, content="Say hello in exactly 5 words"), ] response = llm.chat(messages) @@ -70,7 +64,9 @@ def test_with_openai(): completion_tokens = getattr(usage, "completion_tokens", None) total_tokens = getattr(usage, "total_tokens", None) - print(f"\nToken Usage: input={prompt_tokens}, output={completion_tokens}, total={total_tokens}") + print( + f"\nToken Usage: input={prompt_tokens}, output={completion_tokens}, total={total_tokens}" + ) print("=" * 80) @@ -94,9 +90,7 @@ def test_with_mock(): llm = MockLLM(max_tokens=50) messages = [ - ChatMessage( - role=MessageRole.SYSTEM, content="You are a helpful assistant." - ), + ChatMessage(role=MessageRole.SYSTEM, content="You are a helpful assistant."), ChatMessage(role=MessageRole.USER, content="Say hello in 5 words"), ] @@ -132,8 +126,7 @@ def test_message_extraction(): # Instrument LlamaIndex instrumentor = LlamaindexInstrumentor() instrumentor.instrument( - tracer_provider=tracer_provider, - meter_provider=meter_provider + tracer_provider=tracer_provider, meter_provider=meter_provider ) print("LlamaIndex instrumentation enabled\n") @@ -174,8 +167,12 @@ def test_message_extraction(): found_token_usage = True dps = getattr(metric.data, "data_points", []) for dp in dps: - token_type = dp.attributes.get("gen_ai.token.type", "unknown") - print(f" Token type: {token_type}, Sum: {dp.sum}, Count: {dp.count}") + token_type = dp.attributes.get( + "gen_ai.token.type", "unknown" + ) + print( + f" Token type: {token_type}, Sum: {dp.sum}, Count: {dp.count}" + ) print("\n" + "=" * 80) status = [] diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py index fb8b7180..69af11d2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_rag.py @@ -4,7 +4,7 @@ This test validates that: 1. QUERY events create Workflow spans at the root level 2. RETRIEVE events create Step spans with parent_run_id pointing to the Workflow -3. SYNTHESIZE events create Step spans with parent_run_id pointing to the Workflow +3. SYNTHESIZE events create Step spans with parent_run_id pointing to the Workflow 4. LLM invocations nest under their Step parent via parent_run_id 5. Embedding invocations nest under their Step parent via parent_run_id """ @@ -28,7 +28,7 @@ def setup_telemetry(): def test_rag_without_agents(): """Test RAG instrumentation creates correct hierarchy: Workflow -> Steps -> LLM/Embedding""" - + print("=" * 80) print("Setting up telemetry...") print("=" * 80) @@ -41,9 +41,10 @@ def test_rag_without_agents(): # Instrument instrumentor = LlamaindexInstrumentor() instrumentor.instrument() - + # Debug: Check callback handler from llama_index.core import Settings as LlamaSettings + print(f"\nCallbacks registered: {len(LlamaSettings.callback_manager.handlers)}") for handler in LlamaSettings.callback_manager.handlers: print(f" Handler: {type(handler).__name__}") @@ -71,7 +72,9 @@ def test_rag_without_agents(): query_engine = index.as_query_engine(similarity_top_k=2) print("\n" + "=" * 80) - print("Executing RAG query (should see Workflow -> retrieve.task/synthesize.task -> LLM/Embedding)...") + print( + "Executing RAG query (should see Workflow -> retrieve.task/synthesize.task -> LLM/Embedding)..." + ) print("=" * 80) response = query_engine.query("What is the capital of France?") From 6cdc9e73d32cf0603a68d0d7b515ee19685df5a1 Mon Sep 17 00:00:00 2001 From: shuningc Date: Fri, 19 Dec 2025 06:37:37 -0800 Subject: [PATCH 12/14] Fixing document_count to documents_retrieved --- .../instrumentation/llamaindex/callback_handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py index a68419bd..03f225b5 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -471,8 +471,8 @@ def _handle_retrieve_end( if payload: nodes = payload.get("nodes", []) if nodes: - # Set document count - retrieval.document_count = len(nodes) + # Set documents retrieved count + retrieval.documents_retrieved = len(nodes) # Store scores and document IDs as attributes scores = [] From 9c38b32f25260fd23270fcc05c2ce5ec5eaa6ac5 Mon Sep 17 00:00:00 2001 From: shuningc Date: Fri, 19 Dec 2025 06:58:15 -0800 Subject: [PATCH 13/14] Fixing lint --- .../src/opentelemetry/util/genai/emitters/span.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py index bddf3b14..66aca9cd 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py @@ -26,9 +26,6 @@ GEN_AI_OUTPUT_MESSAGES, GEN_AI_PROVIDER_NAME, GEN_AI_REQUEST_ENCODING_FORMATS, - GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED, - GEN_AI_RETRIEVAL_QUERY_TEXT, - GEN_AI_RETRIEVAL_TOP_K, GEN_AI_STEP_ASSIGNED_AGENT, GEN_AI_STEP_NAME, GEN_AI_STEP_OBJECTIVE, From 205ba2ab20aac9fd635aee258c9fef1237a6bc1e Mon Sep 17 00:00:00 2001 From: shuningc Date: Fri, 19 Dec 2025 07:20:24 -0800 Subject: [PATCH 14/14] Recovering lost codes during merge --- .../opentelemetry/util/genai/emitters/span.py | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py index 66aca9cd..fdeeac5d 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters/span.py @@ -26,6 +26,9 @@ GEN_AI_OUTPUT_MESSAGES, GEN_AI_PROVIDER_NAME, GEN_AI_REQUEST_ENCODING_FORMATS, + GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED, + GEN_AI_RETRIEVAL_QUERY_TEXT, + GEN_AI_RETRIEVAL_TOP_K, GEN_AI_STEP_ASSIGNED_AGENT, GEN_AI_STEP_NAME, GEN_AI_STEP_OBJECTIVE, @@ -202,6 +205,10 @@ def _apply_start_attrs(self, invocation: GenAIType): provider = getattr(invocation, "provider", None) if provider: span.set_attribute(GEN_AI_PROVIDER_NAME, provider) + # framework (named field) - applies to all invocation types + framework = getattr(invocation, "framework", None) + if framework: + span.set_attribute("gen_ai.framework", framework) server_address = getattr(invocation, "server_address", None) if server_address: span.set_attribute(SERVER_ADDRESS, server_address) @@ -781,3 +788,78 @@ def _error_embedding( except Exception: pass span.end() + + def _start_retrieval(self, retrieval: RetrievalInvocation) -> None: + """Start a retrieval span.""" + span_name = f"{retrieval.operation_name}" + if retrieval.provider: + span_name = f"{retrieval.operation_name} {retrieval.provider}" + parent_span = getattr(retrieval, "parent_span", None) + parent_ctx = ( + trace.set_span_in_context(parent_span) + if parent_span is not None + else None + ) + cm = self._tracer.start_as_current_span( + span_name, + kind=SpanKind.CLIENT, + end_on_exit=False, + context=parent_ctx, + ) + span = cm.__enter__() + self._attach_span(retrieval, span, cm) + self._apply_start_attrs(retrieval) + + # Set retrieval-specific start attributes + if retrieval.server_address: + span.set_attribute(SERVER_ADDRESS, retrieval.server_address) + if retrieval.server_port: + span.set_attribute(SERVER_PORT, retrieval.server_port) + if retrieval.top_k is not None: + span.set_attribute(GEN_AI_RETRIEVAL_TOP_K, retrieval.top_k) + if self._capture_content and retrieval.query: + span.set_attribute(GEN_AI_RETRIEVAL_QUERY_TEXT, retrieval.query) + + def _finish_retrieval(self, retrieval: RetrievalInvocation) -> None: + """Finish a retrieval span.""" + span = retrieval.span + if span is None: + return + # Apply finish-time semantic conventions + if retrieval.documents_retrieved is not None: + span.set_attribute( + GEN_AI_RETRIEVAL_DOCUMENTS_RETRIEVED, + retrieval.documents_retrieved, + ) + token = retrieval.context_token + if token is not None and hasattr(token, "__exit__"): + try: + token.__exit__(None, None, None) # type: ignore[misc] + except Exception: + pass + span.end() + + def _error_retrieval( + self, error: Error, retrieval: RetrievalInvocation + ) -> None: + """Fail a retrieval span with error status.""" + span = retrieval.span + if span is None: + return + span.set_status(Status(StatusCode.ERROR, error.message)) + if span.is_recording(): + span.set_attribute( + ErrorAttributes.ERROR_TYPE, error.type.__qualname__ + ) + # Set error type from invocation if available + if retrieval.error_type: + span.set_attribute( + ErrorAttributes.ERROR_TYPE, retrieval.error_type + ) + token = retrieval.context_token + if token is not None and hasattr(token, "__exit__"): + try: + token.__exit__(None, None, None) # type: ignore[misc] + except Exception: + pass + span.end()