From d411b688a0b0b6fab91eae24c8d95457ba0f9482 Mon Sep 17 00:00:00 2001 From: shuningc Date: Mon, 17 Nov 2025 18:36:42 -0800 Subject: [PATCH 1/8] Adding Llamaindex llm instrumentation spike --- .../pyproject.toml | 58 +++++ .../instrumentation/llamaindex/__init__.py | 67 ++++++ .../llamaindex/callback_handler.py | 222 ++++++++++++++++++ .../instrumentation/llamaindex/config.py | 3 + .../instrumentation/llamaindex/version.py | 1 + .../tests/test_llm_instrumentation.py | 190 +++++++++++++++ 6 files changed, 541 insertions(+) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml new file mode 100644 index 0000000..55a6708 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml @@ -0,0 +1,58 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "splunk-otel-instrumentation-llamaindex" +dynamic = ["version"] +description = "OpenTelemetry LlamaIndex instrumentation" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.9" +authors = [ + { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] +dependencies = [ + "opentelemetry-api ~= 1.38.0.dev0", + "opentelemetry-instrumentation ~= 0.59b0.dev0", + "opentelemetry-semantic-conventions ~= 0.59b0.dev0", + "splunk-otel-util-genai>=0.1.4", +] + +[project.optional-dependencies] +instruments = ["llama-index-core >= 0.14.0"] +test = [ + "llama-index-core >= 0.14.0", + "llama-index-llms-openai >= 0.6.0", + "pytest >= 7.0.0", +] + +[project.entry-points.opentelemetry_instrumentor] +llamaindex = "opentelemetry.instrumentation.llamaindex:LlamaindexInstrumentor" + +[project.urls] +Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation-genai/opentelemetry-instrumentation-llamaindex" +Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" + +[tool.hatch.version] +path = "src/opentelemetry/instrumentation/llamaindex/version.py" + +[tool.hatch.build.targets.sdist] +include = ["/src", "/tests"] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] + +[tool.ruff] +exclude = ["./"] diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py new file mode 100644 index 0000000..c3bea98 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py @@ -0,0 +1,67 @@ +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.util.genai.handler import get_telemetry_handler +from opentelemetry.instrumentation.llamaindex.config import Config +from opentelemetry.instrumentation.llamaindex.callback_handler import ( + LlamaindexCallbackHandler, +) +from wrapt import wrap_function_wrapper + +_instruments = ("llama-index-core >= 0.14.0",) + + +class LlamaindexInstrumentor(BaseInstrumentor): + def __init__( + self, + exception_logger=None, + disable_trace_context_propagation=False, + use_legacy_attributes: bool = True, + ): + super().__init__() + Config._exception_logger = exception_logger + Config.use_legacy_attributes = use_legacy_attributes + self._disable_trace_context_propagation = ( + disable_trace_context_propagation + ) + self._telemetry_handler = None + + def instrumentation_dependencies(self): + return _instruments + + def _instrument(self, **kwargs): + tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") + logger_provider = kwargs.get("logger_provider") + + self._telemetry_handler = get_telemetry_handler( + tracer_provider=tracer_provider, + meter_provider=meter_provider, + logger_provider=logger_provider, + ) + + llamaindexCallBackHandler = LlamaindexCallbackHandler( + telemetry_handler=self._telemetry_handler + ) + + wrap_function_wrapper( + module="llama_index.core.callbacks.base", + name="CallbackManager.__init__", + wrapper=_BaseCallbackManagerInitWrapper(llamaindexCallBackHandler), + ) + + def _uninstrument(self, **kwargs): + pass + + +class _BaseCallbackManagerInitWrapper: + def __init__(self, callback_handler: "LlamaindexCallbackHandler"): + self._callback_handler = callback_handler + + def __call__(self, wrapped, instance, args, kwargs) -> None: + wrapped(*args, **kwargs) + # LlamaIndex uses 'handlers' instead of 'inheritable_handlers' + for handler in instance.handlers: + if isinstance(handler, type(self._callback_handler)): + break + else: + self._callback_handler._callback_manager = instance + instance.add_handler(self._callback_handler) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py new file mode 100644 index 0000000..7846cfb --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -0,0 +1,222 @@ +from typing import Any, Dict, Optional + +from llama_index.core.callbacks.base_handler import BaseCallbackHandler +from llama_index.core.callbacks.schema import CBEventType + +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import ( + InputMessage, + LLMInvocation, + OutputMessage, + Text, +) + + +def _safe_str(value: Any) -> str: + """Safely convert value to string.""" + try: + return str(value) + except (TypeError, ValueError): + return "" + + +class LlamaindexCallbackHandler(BaseCallbackHandler): + """Simplified LlamaIndex callback handler - LLM invocation only.""" + + def __init__( + self, + telemetry_handler: Optional[TelemetryHandler] = None, + ) -> None: + super().__init__( + event_starts_to_ignore=[], + event_ends_to_ignore=[], + ) + self._handler = telemetry_handler + + def start_trace(self, trace_id: Optional[str] = None) -> None: + """Start a trace - required by BaseCallbackHandler.""" + pass + + def end_trace( + self, + trace_id: Optional[str] = None, + trace_map: Optional[Dict[str, Any]] = None, + ) -> None: + """End a trace - required by BaseCallbackHandler.""" + pass + + def on_event_start( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + parent_id: str = "", + **kwargs: Any, + ) -> str: + """Handle event start - only processing LLM events.""" + if event_type == CBEventType.LLM: + self._handle_llm_start(event_id, parent_id, payload, **kwargs) + return event_id + + def on_event_end( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + **kwargs: Any, + ) -> None: + """Handle event end - only processing LLM events.""" + if event_type == CBEventType.LLM: + self._handle_llm_end(event_id, payload, **kwargs) + + def _handle_llm_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle LLM invocation start.""" + if not self._handler or not payload: + return + + # Extract model information from payload + serialized = payload.get("serialized", {}) + model_name = ( + serialized.get("model") + or serialized.get("model_name") + or "unknown" + ) + + # Extract messages from payload + # LlamaIndex messages are ChatMessage objects with .content and .role properties + messages = payload.get("messages", []) + input_messages = [] + + for msg in messages: + # Handle ChatMessage objects (has .content property and .role attribute) + if hasattr(msg, "content") and hasattr(msg, "role"): + # Extract role - could be MessageRole enum + role_value = ( + str(msg.role.value) + if hasattr(msg.role, "value") + else str(msg.role) + ) + # Extract content - this is a property that pulls from blocks[0].text + content = _safe_str(msg.content) + input_messages.append( + InputMessage( + role=role_value, parts=[Text(content=content)] + ) + ) + elif isinstance(msg, dict): + # Handle serialized messages (dict format) + role = msg.get("role", "user") + # Try to extract from blocks first (LlamaIndex format) + blocks = msg.get("blocks", []) + if blocks and isinstance(blocks[0], dict): + content = blocks[0].get("text", "") + else: + # Fallback to direct content field + content = msg.get("content", "") + + role_value = ( + str(role.value) if hasattr(role, "value") else str(role) + ) + input_messages.append( + InputMessage( + role=role_value, + parts=[Text(content=_safe_str(content))], + ) + ) + + # Create LLM invocation with event_id as run_id + llm_inv = LLMInvocation( + request_model=_safe_str(model_name), + input_messages=input_messages, + attributes={}, + run_id=event_id, # Use event_id as run_id for registry lookup + ) + llm_inv.framework = "llamaindex" + + # Start the LLM invocation (handler stores it in _entity_registry) + self._handler.start_llm(llm_inv) + + def _handle_llm_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle LLM invocation end.""" + if not self._handler: + return + + # Get the LLM invocation from handler's registry using event_id + llm_inv = self._handler.get_entity(event_id) + if not llm_inv or not isinstance(llm_inv, LLMInvocation): + return + + if payload: + # Extract response from payload + response = payload.get("response") + + # Handle both dict and object types for response + if response: + # Get message - could be dict or object + if isinstance(response, dict): + message = response.get("message", {}) + raw_response = response.get("raw") + else: + # response is a ChatResponse object + message = getattr(response, "message", None) + raw_response = getattr(response, "raw", None) + + # Extract content from message + if message: + if isinstance(message, dict): + # Message is dict + blocks = message.get("blocks", []) + if blocks and isinstance(blocks[0], dict): + content = blocks[0].get("text", "") + else: + content = message.get("content", "") + else: + # Message is ChatMessage object + blocks = getattr(message, "blocks", []) + if blocks and len(blocks) > 0: + content = getattr(blocks[0], "text", "") + else: + content = getattr(message, "content", "") + + # Create output message + llm_inv.output_messages = [ + OutputMessage( + role="assistant", + parts=[Text(content=_safe_str(content))], + finish_reason="stop", + ) + ] + + # Extract token usage from response.raw (OpenAI format) + # LlamaIndex stores the raw API response (e.g., OpenAI response) in response.raw + # raw_response could be a dict or an object (e.g., ChatCompletion from OpenAI) + if raw_response: + # Try to get usage from dict or object + if isinstance(raw_response, dict): + usage = raw_response.get("usage", {}) + else: + # It's an object, try to get usage attribute + usage = getattr(raw_response, "usage", None) + + if usage: + # usage could also be dict or object + if isinstance(usage, dict): + llm_inv.input_tokens = usage.get("prompt_tokens") + llm_inv.output_tokens = usage.get("completion_tokens") + else: + llm_inv.input_tokens = getattr(usage, "prompt_tokens", None) + llm_inv.output_tokens = getattr(usage, "completion_tokens", None) + + # Stop the LLM invocation + self._handler.stop_llm(llm_inv) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py new file mode 100644 index 0000000..44199c0 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py @@ -0,0 +1,3 @@ +class Config: + exception_logger = None + use_legacy_attributes = True diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py new file mode 100644 index 0000000..50324c3 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py @@ -0,0 +1,190 @@ +"""Tests for LlamaIndex LLM instrumentation with OpenTelemetry.""" + +import os + +from llama_index.core.llms import ChatMessage, MessageRole +from llama_index.core.llms.mock import MockLLM +from opentelemetry import metrics, trace +from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, +) +from opentelemetry.semconv._incubating.metrics import gen_ai_metrics + + +def setup_telemetry(): + """Setup OpenTelemetry with both trace and metrics exporters.""" + # Setup tracing + trace.set_tracer_provider(TracerProvider()) + tracer_provider = trace.get_tracer_provider() + tracer_provider.add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + # Setup metrics with InMemoryMetricReader + metric_reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + return tracer_provider, meter_provider, metric_reader + + +def test_with_openai(): + """Test with real OpenAI API - requires OPENAI_API_KEY environment variable.""" + from llama_index.llms.openai import OpenAI + + print("=" * 80) + print("Testing with OpenAI API") + print("=" * 80) + + llm = OpenAI(model="gpt-3.5-turbo") + messages = [ + ChatMessage( + role=MessageRole.SYSTEM, content="You are a helpful assistant." + ), + ChatMessage( + role=MessageRole.USER, content="Say hello in exactly 5 words" + ), + ] + + response = llm.chat(messages) + print(f"\nResponse: {response.message.content}") + + if hasattr(response, "raw") and response.raw: + if isinstance(response.raw, dict): + usage = response.raw.get("usage", {}) + else: + usage = getattr(response.raw, "usage", None) + + if usage: + if isinstance(usage, dict): + prompt_tokens = usage.get("prompt_tokens") + completion_tokens = usage.get("completion_tokens") + total_tokens = usage.get("total_tokens") + else: + prompt_tokens = getattr(usage, "prompt_tokens", None) + completion_tokens = getattr(usage, "completion_tokens", None) + total_tokens = getattr(usage, "total_tokens", None) + + print(f"\nToken Usage: input={prompt_tokens}, output={completion_tokens}, total={total_tokens}") + + print("=" * 80) + + +class MockLLMWithUsage(MockLLM): + """MockLLM that includes fake usage data for testing.""" + + def _complete(self, prompt, **kwargs): + """Override internal complete to inject usage data.""" + response = super()._complete(prompt, **kwargs) + # Note: MockLLM uses _complete internally, but we can't easily inject + # usage here because the ChatResponse is created later + return response + + +def test_with_mock(): + """Test with MockLLM - no API key needed.""" + print("=" * 80) + print("Testing with MockLLM") + print("=" * 80) + + llm = MockLLM(max_tokens=50) + messages = [ + ChatMessage( + role=MessageRole.SYSTEM, content="You are a helpful assistant." + ), + ChatMessage(role=MessageRole.USER, content="Say hello in 5 words"), + ] + + response = llm.chat(messages) + print(f"\nResponse: {response.message.content[:100]}...") + print("=" * 80) + + +def test_message_extraction(): + """Test message extraction.""" + print("\n" + "=" * 80) + print("Testing message extraction") + print("=" * 80) + + llm = MockLLM(max_tokens=20) + messages = [ + ChatMessage(role=MessageRole.SYSTEM, content="You are helpful."), + ChatMessage(role=MessageRole.USER, content="Test message"), + ] + + response = llm.chat(messages) + print(f"\nResponse: {response.message.content[:50]}...") + print("=" * 80) + + +if __name__ == "__main__": + # Enable metrics emission + os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" + + # Setup telemetry + tracer_provider, meter_provider, metric_reader = setup_telemetry() + + # Instrument LlamaIndex + instrumentor = LlamaindexInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + meter_provider=meter_provider + ) + print("LlamaIndex instrumentation enabled\n") + + # Run tests + if os.environ.get("OPENAI_API_KEY"): + print("Testing with real OpenAI API\n") + test_with_openai() + else: + print("Testing with MockLLM (set OPENAI_API_KEY to test real API)\n") + test_with_mock() + + # Test message extraction + test_message_extraction() + + # Check metrics + print("\n" + "=" * 80) + print("Metrics Summary") + print("=" * 80) + + metrics_data = metric_reader.get_metrics_data() + found_duration = False + found_token_usage = False + + if metrics_data: + for rm in getattr(metrics_data, "resource_metrics", []) or []: + for scope in getattr(rm, "scope_metrics", []) or []: + for metric in getattr(scope, "metrics", []) or []: + print(f"\nMetric: {metric.name}") + + if metric.name == gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION: + found_duration = True + dps = getattr(metric.data, "data_points", []) + if dps: + print(f" Duration: {dps[0].sum:.4f} seconds") + print(f" Count: {dps[0].count}") + + if metric.name == gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE: + found_token_usage = True + dps = getattr(metric.data, "data_points", []) + for dp in dps: + token_type = dp.attributes.get("gen_ai.token.type", "unknown") + print(f" Token type: {token_type}, Sum: {dp.sum}, Count: {dp.count}") + + print("\n" + "=" * 80) + status = [] + if found_duration: + status.append("Duration: OK") + if found_token_usage: + status.append("Token Usage: OK") + if not found_duration and not found_token_usage: + status.append("No metrics (use real API for metrics)") + + print("Status: " + " | ".join(status)) + print("=" * 80) From 79fd8aa889b1777c04c39dcceb2e180b8bb6e336 Mon Sep 17 00:00:00 2001 From: shuningc Date: Mon, 17 Nov 2025 18:43:00 -0800 Subject: [PATCH 2/8] Updating readme --- .../README.rst | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst new file mode 100644 index 0000000..5371d3c --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst @@ -0,0 +1,155 @@ +OpenTelemetry LlamaIndex Instrumentation +========================================= + +This library provides automatic instrumentation for LlamaIndex applications using OpenTelemetry. + +Installation +------------ + +Development installation:: + + # Install the package in editable mode + cd instrumentation-genai/opentelemetry-instrumentation-llamaindex + pip install -e . + + # Install test dependencies + pip install -e ".[test]" + + # Install util-genai (required for telemetry) + cd ../../util/opentelemetry-util-genai + pip install -e . + + +Quick Start +----------- + +.. code-block:: python + + import os + from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor + from opentelemetry import trace, metrics + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import InMemoryMetricReader + + # Enable metrics (default is spans only) + os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" + + # Setup tracing + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + # Setup metrics + metric_reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + # Enable instrumentation with providers + LlamaindexInstrumentor().instrument( + tracer_provider=trace.get_tracer_provider(), + meter_provider=meter_provider + ) + + # Use LlamaIndex as normal + from llama_index.llms.openai import OpenAI + from llama_index.core.llms import ChatMessage, MessageRole + + llm = OpenAI(model="gpt-3.5-turbo") + messages = [ChatMessage(role=MessageRole.USER, content="Hello")] + response = llm.chat(messages) + + +Running Tests +------------- + +.. code-block:: bash + + # Set environment variables + export OPENAI_API_KEY=your-api-key + export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric + + # Run the test + cd tests + python test_llm_instrumentation.py + + +Expected Output +--------------- + +**Span Attributes**:: + + { + "gen_ai.framework": "llamaindex", + "gen_ai.request.model": "gpt-3.5-turbo", + "gen_ai.operation.name": "chat", + "gen_ai.usage.input_tokens": 24, + "gen_ai.usage.output_tokens": 7 + } + +**Metrics**:: + + Metric: gen_ai.client.operation.duration + Duration: 0.6900 seconds + Count: 1 + + Metric: gen_ai.client.token.usage + Token type: input, Sum: 24, Count: 1 + Token type: output, Sum: 7, Count: 1 + + +Key Implementation Differences from LangChain +---------------------------------------------- + +**1. Event-Based Callbacks** + +LlamaIndex uses ``on_event_start(event_type, ...)`` and ``on_event_end(event_type, ...)`` +instead of LangChain's method-based callbacks (``on_llm_start``, ``on_llm_end``). + +Event types are dispatched via ``CBEventType`` enum:: + + CBEventType.LLM # LLM invocations + CBEventType.AGENT # Agent steps + CBEventType.EMBEDDING # Embedding operations + +**2. Handler Registration** + +LlamaIndex uses ``handlers`` list:: + + callback_manager.handlers.append(handler) + +LangChain uses ``inheritable_handlers``:: + + callback_manager.inheritable_handlers.append(handler) + +**3. Response Structure** + +LlamaIndex ``ChatMessage`` uses ``blocks`` (list of TextBlock objects):: + + message.content # Computed property from blocks[0].text + +LangChain uses simple strings:: + + message.content # Direct string property + +**4. Token Usage** + +LlamaIndex returns objects (not dicts):: + + response.raw.usage.prompt_tokens # Object attribute + response.raw.usage.completion_tokens # Object attribute + +LangChain returns dicts:: + + response["usage"]["prompt_tokens"] # Dict key + response["usage"]["completion_tokens"] # Dict key + + +References +---------- + +* `OpenTelemetry Project `_ +* `LlamaIndex `_ +* `LlamaIndex Callbacks `_ From 8e39da211f1be022c8d5e1f9e09eefccb8ae0bc9 Mon Sep 17 00:00:00 2001 From: shuningc Date: Tue, 18 Nov 2025 14:39:39 -0800 Subject: [PATCH 3/8] feat: Add embedding instrumentation for LlamaIndex - Add embedding event handlers (_handle_embedding_start, _handle_embedding_end) - Extract model name, input texts, and dimension count from embedding events - Create vendor_detection.py module with VendorRule-based provider detection - Support 13+ embedding providers (OpenAI, Azure, AWS, Google, Cohere, etc.) - Add test_embedding_instrumentation.py with single and batch embedding tests - Update README with embedding documentation and provider list - Tested successfully with OpenAI embeddings API --- .../README.rst | 70 +++++++- .../llamaindex/callback_handler.py | 89 ++++++++++- .../llamaindex/vendor_detection.py | 119 ++++++++++++++ .../tests/test_embedding_instrumentation.py | 151 ++++++++++++++++++ 4 files changed, 422 insertions(+), 7 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst index 5371d3c..bf9ea59 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst @@ -65,6 +65,8 @@ Quick Start Running Tests ------------- +**LLM Tests**: + .. code-block:: bash # Set environment variables @@ -75,11 +77,23 @@ Running Tests cd tests python test_llm_instrumentation.py +**Embedding Tests**: + +.. code-block:: bash + + # Set environment variables + export OPENAI_API_KEY=your-api-key + export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric + + # Run the test + cd tests + python test_embedding_instrumentation.py + Expected Output --------------- -**Span Attributes**:: +**LLM Span Attributes**:: { "gen_ai.framework": "llamaindex", @@ -89,6 +103,15 @@ Expected Output "gen_ai.usage.output_tokens": 7 } +**Embedding Span Attributes**:: + + { + "gen_ai.operation.name": "embeddings", + "gen_ai.request.model": "text-embedding-3-small", + "gen_ai.provider.name": "openai", + "gen_ai.embeddings.dimension.count": 1536 + } + **Metrics**:: Metric: gen_ai.client.operation.duration @@ -110,9 +133,9 @@ instead of LangChain's method-based callbacks (``on_llm_start``, ``on_llm_end``) Event types are dispatched via ``CBEventType`` enum:: - CBEventType.LLM # LLM invocations - CBEventType.AGENT # Agent steps - CBEventType.EMBEDDING # Embedding operations + CBEventType.LLM # LLM invocations (chat, complete) + CBEventType.AGENT # Agent steps (not yet instrumented) + CBEventType.EMBEDDING # Embedding operations (get_text_embedding, get_text_embedding_batch) **2. Handler Registration** @@ -147,6 +170,45 @@ LangChain returns dicts:: response["usage"]["completion_tokens"] # Dict key +Supported Features +------------------ + +**LLM Operations** + +* ✅ Chat completion (``llm.chat()``, ``llm.stream_chat()``) +* ✅ Text completion (``llm.complete()``, ``llm.stream_complete()``) +* ✅ Token usage tracking +* ✅ Model name detection +* ✅ Framework attribution + +**Embedding Operations** + +* ✅ Single text embedding (``embed_model.get_text_embedding()``) +* ✅ Batch embedding (``embed_model.get_text_embedding_batch()``) +* ✅ Query embedding (``embed_model.get_query_embedding()``) +* ✅ Provider detection (OpenAI, Azure, AWS Bedrock, Google, Cohere, HuggingFace, Ollama, and more) +* ✅ Dimension count tracking +* ✅ Input text capture + +**Provider Detection** + +Embedding instrumentation automatically detects the provider from class names: + +* **OpenAI**: ``OpenAIEmbedding`` +* **Azure**: ``AzureOpenAIEmbedding`` +* **AWS**: ``BedrockEmbedding`` +* **Google**: ``GeminiEmbedding``, ``VertexTextEmbedding``, ``GooglePaLMEmbedding`` +* **Cohere**: ``CohereEmbedding`` +* **HuggingFace**: ``HuggingFaceEmbedding``, ``HuggingFaceInferenceAPIEmbedding`` +* **Ollama**: ``OllamaEmbedding`` +* **Anthropic**: ``AnthropicEmbedding`` +* **MistralAI**: ``MistralAIEmbedding`` +* **Together**: ``TogetherEmbedding`` +* **Fireworks**: ``FireworksEmbedding`` +* **Voyage**: ``VoyageEmbedding`` +* **Jina**: ``JinaEmbedding`` + + References ---------- diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py index 7846cfb..fe9a1a1 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -5,12 +5,15 @@ from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( + EmbeddingInvocation, InputMessage, LLMInvocation, OutputMessage, Text, ) +from .vendor_detection import detect_vendor_from_class + def _safe_str(value: Any) -> str: """Safely convert value to string.""" @@ -21,7 +24,7 @@ def _safe_str(value: Any) -> str: class LlamaindexCallbackHandler(BaseCallbackHandler): - """Simplified LlamaIndex callback handler - LLM invocation only.""" + """LlamaIndex callback handler supporting LLM and Embedding instrumentation.""" def __init__( self, @@ -53,9 +56,11 @@ def on_event_start( parent_id: str = "", **kwargs: Any, ) -> str: - """Handle event start - only processing LLM events.""" + """Handle event start - processing LLM and EMBEDDING events.""" if event_type == CBEventType.LLM: self._handle_llm_start(event_id, parent_id, payload, **kwargs) + elif event_type == CBEventType.EMBEDDING: + self._handle_embedding_start(event_id, parent_id, payload, **kwargs) return event_id def on_event_end( @@ -65,9 +70,11 @@ def on_event_end( event_id: str = "", **kwargs: Any, ) -> None: - """Handle event end - only processing LLM events.""" + """Handle event end - processing LLM and EMBEDDING events.""" if event_type == CBEventType.LLM: self._handle_llm_end(event_id, payload, **kwargs) + elif event_type == CBEventType.EMBEDDING: + self._handle_embedding_end(event_id, payload, **kwargs) def _handle_llm_start( self, @@ -220,3 +227,79 @@ def _handle_llm_end( # Stop the LLM invocation self._handler.stop_llm(llm_inv) + + def _handle_embedding_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle embedding invocation start.""" + if not self._handler or not payload: + return + + # Extract model information from payload + serialized = payload.get("serialized", {}) + model_name = ( + serialized.get("model_name") + or serialized.get("model") + or "unknown" + ) + + # Detect provider from class name + class_name = serialized.get("class_name", "") + provider = detect_vendor_from_class(class_name) + + # Note: input texts are not available at start time in LlamaIndex + # They will be available in the end event payload + + # Create embedding invocation with event_id as run_id + emb_inv = EmbeddingInvocation( + request_model=_safe_str(model_name), + input_texts=[], # Will be populated on end event + provider=provider, + attributes={}, + run_id=event_id, + ) + emb_inv.framework = "llamaindex" + + # Start the embedding invocation + self._handler.start_embedding(emb_inv) + + def _handle_embedding_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle embedding invocation end.""" + if not self._handler: + return + + # Get the embedding invocation from handler's registry using event_id + emb_inv = self._handler.get_entity(event_id) + if not emb_inv or not isinstance(emb_inv, EmbeddingInvocation): + return + + if payload: + # Extract input chunks (texts) from response + # chunks is the list of input texts that were embedded + chunks = payload.get("chunks", []) + if chunks: + emb_inv.input_texts = [_safe_str(chunk) for chunk in chunks] + + # Extract embedding vectors from response + # embeddings is the list of output vectors + embeddings = payload.get("embeddings", []) + + # Determine dimension from first embedding vector + if embeddings and len(embeddings) > 0: + first_embedding = embeddings[0] + if isinstance(first_embedding, list): + emb_inv.dimension_count = len(first_embedding) + elif hasattr(first_embedding, "__len__"): + emb_inv.dimension_count = len(first_embedding) + + # Stop the embedding invocation + self._handler.stop_embedding(emb_inv) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py new file mode 100644 index 0000000..6f9c9f0 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py @@ -0,0 +1,119 @@ +"""Vendor detection for LlamaIndex embedding providers.""" + +from dataclasses import dataclass +from typing import List, Set + + +@dataclass(frozen=True) +class VendorRule: + """Rule for detecting vendor from LlamaIndex class names.""" + + exact_matches: Set[str] + patterns: List[str] + vendor_name: str + + def matches(self, class_name: str) -> bool: + """Check if class name matches this vendor rule.""" + if class_name in self.exact_matches: + return True + class_lower = class_name.lower() + return any(pattern in class_lower for pattern in self.patterns) + + +def _get_vendor_rules() -> List[VendorRule]: + """ + Get vendor detection rules ordered by specificity (most specific first). + + Returns: + List of VendorRule objects for detecting embedding vendors from class names + """ + return [ + VendorRule( + exact_matches={"AzureOpenAIEmbedding"}, + patterns=["azure"], + vendor_name="azure", + ), + VendorRule( + exact_matches={"OpenAIEmbedding"}, + patterns=["openai"], + vendor_name="openai", + ), + VendorRule( + exact_matches={"BedrockEmbedding"}, + patterns=["bedrock", "aws"], + vendor_name="aws", + ), + VendorRule( + exact_matches={"VertexTextEmbedding", "GeminiEmbedding", "GooglePaLMEmbedding"}, + patterns=["vertex", "google", "palm", "gemini"], + vendor_name="google", + ), + VendorRule( + exact_matches={"CohereEmbedding"}, + patterns=["cohere"], + vendor_name="cohere", + ), + VendorRule( + exact_matches={"HuggingFaceEmbedding", "HuggingFaceInferenceAPIEmbedding"}, + patterns=["huggingface"], + vendor_name="huggingface", + ), + VendorRule( + exact_matches={"OllamaEmbedding"}, + patterns=["ollama"], + vendor_name="ollama", + ), + VendorRule( + exact_matches={"AnthropicEmbedding"}, + patterns=["anthropic"], + vendor_name="anthropic", + ), + VendorRule( + exact_matches={"MistralAIEmbedding"}, + patterns=["mistral"], + vendor_name="mistralai", + ), + VendorRule( + exact_matches={"TogetherEmbedding"}, + patterns=["together"], + vendor_name="together", + ), + VendorRule( + exact_matches={"FireworksEmbedding"}, + patterns=["fireworks"], + vendor_name="fireworks", + ), + VendorRule( + exact_matches={"VoyageEmbedding"}, + patterns=["voyage"], + vendor_name="voyage", + ), + VendorRule( + exact_matches={"JinaEmbedding"}, + patterns=["jina"], + vendor_name="jina", + ), + ] + + +def detect_vendor_from_class(class_name: str) -> str: + """ + Detect vendor from LlamaIndex embedding class name. + Uses unified detection rules combining exact matches and patterns. + + Args: + class_name: The class name from serialized embedding information + + Returns: + Vendor string (lowercase), defaults to None if no match found + """ + if not class_name: + return None + + vendor_rules = _get_vendor_rules() + + for rule in vendor_rules: + if rule.matches(class_name): + return rule.vendor_name + + return None diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py new file mode 100644 index 0000000..355a057 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py @@ -0,0 +1,151 @@ +"""Test embedding instrumentation for LlamaIndex.""" + +import os + +from llama_index.core import Settings +from llama_index.core.callbacks import CallbackManager +from llama_index.embeddings.openai import OpenAIEmbedding +from opentelemetry import metrics, trace +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor + +from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor + + +# Global setup - shared across tests +metric_reader = None +instrumentor = None + + +def setup_telemetry(): + """Setup OpenTelemetry with span and metric exporters (once).""" + global metric_reader, instrumentor + + if metric_reader is not None: + return metric_reader + + # Enable metrics + os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" + + # Setup tracing + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + # Setup metrics with InMemoryMetricReader + metric_reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + # Enable instrumentation once + instrumentor = LlamaindexInstrumentor() + instrumentor.instrument( + tracer_provider=trace.get_tracer_provider(), + meter_provider=metrics.get_meter_provider(), + ) + + return metric_reader + + +def test_embedding_single_text(): + """Test single text embedding instrumentation.""" + print("\nTest: Single Text Embedding") + print("=" * 60) + + metric_reader = setup_telemetry() + + # Configure embedding model + embed_model = OpenAIEmbedding( + model="text-embedding-3-small", + api_key=os.environ.get("OPENAI_API_KEY"), + ) + Settings.embed_model = embed_model + + # Make sure callback manager is initialized + if Settings.callback_manager is None: + Settings.callback_manager = CallbackManager() + + # Generate single embedding + text = "LlamaIndex is a data framework for LLM applications" + embedding = embed_model.get_text_embedding(text) + + print(f"\nText: {text}") + print(f"Embedding dimension: {len(embedding)}") + print(f"First 5 values: {embedding[:5]}") + + # Validate metrics + print("\nMetrics:") + metrics_data = metric_reader.get_metrics_data() + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + print(f"\nMetric: {metric.name}") + for data_point in metric.data.data_points: + if hasattr(data_point, "bucket_counts"): + # Histogram + print(f" Count: {sum(data_point.bucket_counts)}") + else: + # Counter + print(f" Value: {data_point.value}") + + print("\nTest completed successfully") + + +def test_embedding_batch(): + """Test batch embedding instrumentation.""" + print("\nTest: Batch Embeddings") + print("=" * 60) + + metric_reader = setup_telemetry() + + # Configure embedding model + embed_model = OpenAIEmbedding( + model="text-embedding-3-small", + api_key=os.environ.get("OPENAI_API_KEY"), + ) + Settings.embed_model = embed_model + + # Make sure callback manager is initialized + if Settings.callback_manager is None: + Settings.callback_manager = CallbackManager() + + # Generate batch embeddings + texts = [ + "Paris is the capital of France", + "Berlin is the capital of Germany", + "Rome is the capital of Italy", + ] + embeddings = embed_model.get_text_embedding_batch(texts) + + print(f"\nEmbedded {len(embeddings)} texts") + print(f"Dimension: {len(embeddings[0])}") + + # Validate metrics + print("\nMetrics:") + metrics_data = metric_reader.get_metrics_data() + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + print(f"\nMetric: {metric.name}") + for data_point in metric.data.data_points: + if hasattr(data_point, "bucket_counts"): + # Histogram + print(f" Count: {sum(data_point.bucket_counts)}") + else: + # Counter + print(f" Value: {data_point.value}") + + print("\nTest completed successfully") + + +if __name__ == "__main__": + test_embedding_single_text() + print("\n" + "=" * 60 + "\n") + test_embedding_batch() + + # Cleanup + if instrumentor: + instrumentor.uninstrument() From 43cfe8d25c1f151d2262c023d6157069c28bce52 Mon Sep 17 00:00:00 2001 From: shuningc Date: Wed, 26 Nov 2025 15:36:04 -0800 Subject: [PATCH 4/8] Adding single agent tool instrumentation for llamaindex --- .../instrumentation/llamaindex/__init__.py | 14 ++ .../llamaindex/callback_handler.py | 186 +++++++++++++++++- .../llamaindex/workflow_instrumentation.py | 166 ++++++++++++++++ .../tests/test_workflow_agent.py | 90 +++++++++ 4 files changed, 454 insertions(+), 2 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py index c3bea98..7aeb05c 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py @@ -4,6 +4,9 @@ from opentelemetry.instrumentation.llamaindex.callback_handler import ( LlamaindexCallbackHandler, ) +from opentelemetry.instrumentation.llamaindex.workflow_instrumentation import ( + wrap_agent_run, +) from wrapt import wrap_function_wrapper _instruments = ("llama-index-core >= 0.14.0",) @@ -48,6 +51,17 @@ def _instrument(self, **kwargs): wrapper=_BaseCallbackManagerInitWrapper(llamaindexCallBackHandler), ) + # Instrument Workflow-based agents + try: + wrap_function_wrapper( + module="llama_index.core.agent", + name="ReActAgent.run", + wrapper=wrap_agent_run, + ) + except Exception: + # ReActAgent might not be available or importable + pass + def _uninstrument(self, **kwargs): pass diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py index fe9a1a1..c7f00ad 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -5,11 +5,15 @@ from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( + AgentInvocation, EmbeddingInvocation, InputMessage, LLMInvocation, OutputMessage, + Step, Text, + ToolCall, + Workflow, ) from .vendor_detection import detect_vendor_from_class @@ -56,11 +60,15 @@ def on_event_start( parent_id: str = "", **kwargs: Any, ) -> str: - """Handle event start - processing LLM and EMBEDDING events.""" + """Handle event start - processing LLM, EMBEDDING, AGENT_STEP, and FUNCTION_CALL events.""" if event_type == CBEventType.LLM: self._handle_llm_start(event_id, parent_id, payload, **kwargs) elif event_type == CBEventType.EMBEDDING: self._handle_embedding_start(event_id, parent_id, payload, **kwargs) + elif event_type == CBEventType.AGENT_STEP: + self._handle_agent_step_start(event_id, parent_id, payload, **kwargs) + elif event_type == CBEventType.FUNCTION_CALL: + self._handle_function_call_start(event_id, parent_id, payload, **kwargs) return event_id def on_event_end( @@ -70,11 +78,15 @@ def on_event_end( event_id: str = "", **kwargs: Any, ) -> None: - """Handle event end - processing LLM and EMBEDDING events.""" + """Handle event end - processing LLM, EMBEDDING, AGENT_STEP, and FUNCTION_CALL events.""" if event_type == CBEventType.LLM: self._handle_llm_end(event_id, payload, **kwargs) elif event_type == CBEventType.EMBEDDING: self._handle_embedding_end(event_id, payload, **kwargs) + elif event_type == CBEventType.AGENT_STEP: + self._handle_agent_step_end(event_id, payload, **kwargs) + elif event_type == CBEventType.FUNCTION_CALL: + self._handle_function_call_end(event_id, payload, **kwargs) def _handle_llm_start( self, @@ -303,3 +315,173 @@ def _handle_embedding_end( # Stop the embedding invocation self._handler.stop_embedding(emb_inv) + + def _find_nearest_agent(self, parent_id: Optional[str]) -> Optional[AgentInvocation]: + """Walk up parent chain to find the nearest agent invocation.""" + if not self._handler: + return None + current_id = parent_id + while current_id: + entity = self._handler.get_entity(current_id) + if isinstance(entity, AgentInvocation): + return entity + # Move to parent + current_id = getattr(entity, "parent_run_id", None) + if current_id: + current_id = str(current_id) + return None + + def _handle_agent_step_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle agent step start - create AgentInvocation span.""" + if not self._handler or not payload: + return + + # Extract agent information from payload + task_id = payload.get("task_id", "") + input_text = payload.get("input") + step = payload.get("step") # TaskStep object with agent metadata + + # Extract agent metadata from step or payload + agent_name = None + agent_type = None + agent_description = None + model_name = None + + if step and hasattr(step, "step_state"): + # Try to get agent from step state + step_state = step.step_state + if hasattr(step_state, "agent"): + agent = step_state.agent + agent_name = getattr(agent, "name", None) + agent_type = getattr(agent, "agent_type", None) or type(agent).__name__ + agent_description = getattr(agent, "description", None) + # Try to get model from agent's LLM + if hasattr(agent, "llm"): + llm = agent.llm + model_name = getattr(llm, "model", None) or getattr(llm, "model_name", None) + + # Create AgentInvocation for the agent execution + agent_invocation = AgentInvocation( + name=f"agent.task.{task_id}" if task_id else "agent.invoke", + run_id=event_id, + parent_run_id=parent_id if parent_id else None, + input_context=input_text if input_text else "", + attributes={}, + ) + agent_invocation.framework = "llamaindex" + + # Set enhanced metadata + if agent_name: + agent_invocation.agent_name = _safe_str(agent_name) + if agent_type: + agent_invocation.agent_type = _safe_str(agent_type) + if agent_description: + agent_invocation.description = _safe_str(agent_description) + if model_name: + agent_invocation.model = _safe_str(model_name) + + self._handler.start_agent_invocation(agent_invocation) + + def _handle_agent_step_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle agent step end.""" + if not self._handler: + return + + agent_invocation = self._handler.get_entity(event_id) + if not agent_invocation or not isinstance(agent_invocation, AgentInvocation): + return + + if payload: + # Extract response/output if available + response = payload.get("response") + if response: + agent_invocation.output_result = _safe_str(response) + + # Stop the agent invocation + self._handler.stop_agent_invocation(agent_invocation) + + def _handle_function_call_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle function/tool call start.""" + if not self._handler or not payload: + return + + # Extract tool information + tool = payload.get("tool") + if not tool: + return + + tool_name = getattr(tool, "name", "unknown_tool") if hasattr(tool, "name") else "unknown_tool" + tool_description = getattr(tool, "description", "") if hasattr(tool, "description") else "" + + # Extract function arguments + function_call = payload.get("function_call", {}) + arguments = function_call if function_call else {} + + # Find nearest agent for context propagation + context_agent = self._find_nearest_agent(parent_id) if parent_id else None + + # Create ToolCall entity + tool_call = ToolCall( + name=tool_name, + arguments=arguments, + id=event_id, + ) + + # Set attributes + tool_call.attributes = { + "tool.description": tool_description, + } + tool_call.run_id = event_id # type: ignore[attr-defined] + tool_call.parent_run_id = parent_id if parent_id else None # type: ignore[attr-defined] + tool_call.framework = "llamaindex" # type: ignore[attr-defined] + + # Propagate agent context to tool call + if context_agent: + agent_name = getattr(context_agent, "agent_name", None) or getattr(context_agent, "name", None) + if agent_name: + tool_call.agent_name = _safe_str(agent_name) # type: ignore[attr-defined] + tool_call.agent_id = str(context_agent.run_id) # type: ignore[attr-defined] + + # Start the tool call + self._handler.start_tool_call(tool_call) + + def _handle_function_call_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle function/tool call end.""" + if not self._handler: + return + + tool_call = self._handler.get_entity(event_id) + if not tool_call or not isinstance(tool_call, ToolCall): + return + + if payload: + # Extract tool output/result + tool_output = payload.get("tool_output") + if tool_output: + # Store the result as response + tool_call.response = _safe_str(tool_output) # type: ignore[attr-defined] + + # Stop the tool call + self._handler.stop_tool_call(tool_call) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py new file mode 100644 index 0000000..8449c3e --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py @@ -0,0 +1,166 @@ +""" +Workflow-based agent instrumentation for LlamaIndex. + +This module provides instrumentation for Workflow-based agents (ReActAgent, etc.) +by intercepting workflow event streams to capture agent steps and tool calls. +""" + +import asyncio +from typing import Any, Optional +from uuid import uuid4 + +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import AgentInvocation, ToolCall + + +class WorkflowEventInstrumentor: + """Instrumentor that wraps WorkflowHandler to capture agent and tool events.""" + + def __init__(self, handler: TelemetryHandler): + self._handler = handler + self._active_agents = {} # event_id -> AgentInvocation + self._active_tools = {} # tool_id -> ToolCall + + async def instrument_workflow_handler(self, workflow_handler, initial_message: str): + """ + Instrument a WorkflowHandler by streaming its events and creating telemetry spans. + + Args: + workflow_handler: The WorkflowHandler returned by agent.run() + initial_message: The user's initial message to the agent + """ + from llama_index.core.agent.workflow.workflow_events import ( + AgentInput, + AgentOutput, + ToolCall as WorkflowToolCall, + ToolCallResult, + ) + + agent_invocation = None + agent_run_id = None + + try: + async for event in workflow_handler.stream_events(): + # Agent step start + if isinstance(event, AgentInput): + # Start a new agent invocation + agent_run_id = str(uuid4()) + agent_invocation = AgentInvocation( + name=f"agent.{event.current_agent_name}", + run_id=agent_run_id, + input_context=str(event.input) if hasattr(event, 'input') and event.input else initial_message, + attributes={}, + ) + agent_invocation.framework = "llamaindex" + agent_invocation.agent_name = event.current_agent_name + + self._handler.start_agent(agent_invocation) + self._active_agents[agent_run_id] = agent_invocation + + # Tool call start + elif isinstance(event, WorkflowToolCall): + tool_call = ToolCall( + arguments=event.tool_kwargs, + name=event.tool_name, + id=event.tool_id, + attributes={}, + ) + tool_call.framework = "llamaindex" + + # Associate with current agent if available + if agent_invocation: + tool_call.agent_name = agent_invocation.agent_name + tool_call.agent_id = str(agent_invocation.run_id) + # Set parent_span explicitly - the agent span is the parent of this tool + if hasattr(agent_invocation, 'span') and agent_invocation.span: + tool_call.parent_span = agent_invocation.span + + self._handler.start_tool_call(tool_call) + self._active_tools[event.tool_id] = tool_call + + # Tool call end + elif isinstance(event, ToolCallResult): + tool_call = self._active_tools.get(event.tool_id) + if tool_call: + # Extract result + result = event.tool_output + if hasattr(result, 'content'): + tool_call.result = str(result.content) + else: + tool_call.result = str(result) + + self._handler.stop_tool_call(tool_call) + del self._active_tools[event.tool_id] + + # Agent step end (when no more tools to call) + elif isinstance(event, AgentOutput): + # Check if this is the final output (no tool calls) + if not event.tool_calls and agent_invocation: + # Extract response + if hasattr(event.response, 'content'): + agent_invocation.output_result = str(event.response.content) + else: + agent_invocation.output_result = str(event.response) + + self._handler.stop_agent(agent_invocation) + if agent_run_id: + del self._active_agents[agent_run_id] + agent_invocation = None + agent_run_id = None + + except Exception as e: + # Clean up any active spans on error + for tool_call in list(self._active_tools.values()): + from opentelemetry.util.genai.types import Error + error = Error(message=str(e), type=type(e)) + self._handler.fail_tool_call(tool_call, error) + self._active_tools.clear() + + if agent_invocation: + from opentelemetry.util.genai.types import Error + error = Error(message=str(e), type=type(e)) + self._handler.fail_agent(agent_invocation, error) + if agent_run_id: + del self._active_agents[agent_run_id] + + raise + + +def wrap_agent_run(wrapped, instance, args, kwargs): + """ + Wrap agent.run() to instrument workflow events. + + This function wraps the run() method of Workflow-based agents to capture + agent steps and tool calls via workflow event streaming. + """ + handler = wrapped(*args, **kwargs) + + # Get the initial user message + user_msg = kwargs.get('user_msg') or (args[0] if args else "") + + # Get TelemetryHandler from callback handler if available + from llama_index.core import Settings + telemetry_handler = None + for callback_handler in Settings.callback_manager.handlers: + if hasattr(callback_handler, '_handler'): + telemetry_handler = callback_handler._handler + break + + if telemetry_handler: + # Create workflow instrumentor + instrumentor = WorkflowEventInstrumentor(telemetry_handler) + + # Start background task to stream events + async def stream_events_background(): + try: + await instrumentor.instrument_workflow_handler(handler, str(user_msg)) + except Exception as e: + # Log error but don't crash the workflow + import logging + logger = logging.getLogger(__name__) + logger.warning(f"Error instrumenting workflow events: {e}") + + # Launch background task + asyncio.create_task(stream_events_background()) + + return handler diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py new file mode 100644 index 0000000..ddfcfa5 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py @@ -0,0 +1,90 @@ +""" +Test Workflow-based agent instrumentation. + +This test validates that workflow event streaming captures agent steps and tool calls. +""" +import asyncio +from llama_index.core.agent import ReActAgent +from llama_index.core import Settings +from llama_index.llms.openai import OpenAI +from llama_index.core.tools import FunctionTool +from opentelemetry import trace +from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor + + +def multiply(a: int, b: int) -> int: + """Multiply two numbers.""" + return a * b + + +def add(a: int, b: int) -> int: + """Add two numbers.""" + return a + b + + +def setup_telemetry(): + """Setup OpenTelemetry with console exporter.""" + trace.set_tracer_provider(TracerProvider()) + tracer_provider = trace.get_tracer_provider() + tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + return tracer_provider + + +async def test_workflow_agent(): + """Test Workflow-based agent instrumentation.""" + + print("=" * 80) + print("Setting up telemetry...") + print("=" * 80) + tracer_provider = setup_telemetry() + + # Setup LlamaIndex + Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0.1) + + # Instrument + print("\n" + "=" * 80) + print("Instrumenting LlamaIndex...") + print("=" * 80) + instrumentor = LlamaindexInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + # Create tools + multiply_tool = FunctionTool.from_defaults(fn=multiply) + add_tool = FunctionTool.from_defaults(fn=add) + + print("\n" + "=" * 80) + print("Creating Workflow-based ReActAgent...") + print("=" * 80) + agent = ReActAgent(tools=[multiply_tool, add_tool], llm=Settings.llm, verbose=True) + + print("\n" + "=" * 80) + print("Running agent task (should see AgentInvocation -> ToolCall spans)...") + print("=" * 80) + + handler = agent.run(user_msg="Calculate 5 times 3, then add 2 to the result") + result = await handler + + # Give background instrumentation task time to complete + await asyncio.sleep(0.5) + + print("\n" + "=" * 80) + print("RESULTS") + print("=" * 80) + print(f"Response: {result.response.content}") + + print("\n" + "=" * 80) + print("✓ Test completed!") + print("=" * 80) + print("\nExpected trace structure:") + print(" AgentInvocation (gen_ai.agent.name=agent.Agent)") + print(" ├─ LLMInvocation") + print(" ├─ ToolCall (gen_ai.tool.name=multiply)") + print(" ├─ ToolCall (gen_ai.tool.name=add)") + print(" └─ LLMInvocation (final answer)") + print("=" * 80) + + +if __name__ == "__main__": + asyncio.run(test_workflow_agent()) From 1cabb4d1228d22e89bdb3ba59da2ddedc008886e Mon Sep 17 00:00:00 2001 From: shuningc Date: Tue, 9 Dec 2025 05:10:22 -0800 Subject: [PATCH 5/8] Adding travel_planner_agent example to llamaindex instrumentation --- .../examples/travel_planner_agent.py | 89 ++++++++++++++++++ .../llamaindex/callback_handler.py | 35 +++++++ .../llamaindex/workflow_instrumentation.py | 94 +++++++++++++++---- .../tests/test_workflow_agent.py | 74 ++++++++++++++- 4 files changed, 273 insertions(+), 19 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_agent.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_agent.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_agent.py new file mode 100644 index 0000000..734c42c --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_agent.py @@ -0,0 +1,89 @@ +import asyncio +import os +import sys + +from llama_index.core.agent import ReActAgent +from llama_index.core.tools import FunctionTool +from llama_index.llms.openai import OpenAI +from llama_index.core import Settings + +from opentelemetry import trace, metrics +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor + +# 1. Setup Telemetry +def setup_telemetry(): + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + BatchSpanProcessor(OTLPSpanExporter(insecure=True)) + ) + + metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter(insecure=True)) + metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader])) + +# 2. Define Tools +def search_flights(origin: str, destination: str, date: str) -> str: + """Search for flights between two cities on a specific date.""" + print(f" [Tool] Searching flights from {origin} to {destination} on {date}...") + return f"Flight UA123 from {origin} to {destination} on {date} costs $500." + +def search_hotels(city: str, check_in: str) -> str: + """Search for hotels in a city.""" + print(f" [Tool] Searching hotels in {city} for {check_in}...") + return f"Hotel Grand in {city} is available for $200/night." + +def book_ticket(flight_number: str) -> str: + """Book a flight ticket.""" + print(f" [Tool] Booking flight {flight_number}...") + return f"Confirmed booking for {flight_number}. Ticket #999." + +# 3. Main Agent Logic +async def run_travel_planner(): + # Check for API Key + if not os.getenv("OPENAI_API_KEY"): + print("Error: OPENAI_API_KEY environment variable is not set.") + sys.exit(1) + + setup_telemetry() + + # Instrument LlamaIndex + LlamaindexInstrumentor().instrument() + + # Setup LLM + Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0) + + # Create Tools + tools = [ + FunctionTool.from_defaults(fn=search_flights), + FunctionTool.from_defaults(fn=search_hotels), + FunctionTool.from_defaults(fn=book_ticket), + ] + + # Create Agent + # ReActAgent in LlamaIndex uses the workflow engine internally + agent = ReActAgent(tools=tools, llm=Settings.llm, verbose=True) + + # Run Workflow + user_request = "I want to fly from New York to Paris on 2023-12-01. Find a flight and book it, then find a hotel." + + # We use the async run method which returns the handler we instrumented + # This triggers wrap_agent_run -> WorkflowEventInstrumentor + handler = agent.run(user_msg=user_request) + response = await handler + + print(f"\nFinal Response: {response}") + + # Ensure spans are flushed before exit + provider = trace.get_tracer_provider() + if hasattr(provider, "force_flush"): + provider.force_flush() + if hasattr(provider, "shutdown"): + provider.shutdown() + +if __name__ == "__main__": + asyncio.run(run_travel_planner()) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py index c7f00ad..e15419c 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -3,6 +3,7 @@ from llama_index.core.callbacks.base_handler import BaseCallbackHandler from llama_index.core.callbacks.schema import CBEventType +from opentelemetry import trace from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( AgentInvocation, @@ -158,6 +159,23 @@ def _handle_llm_start( ) llm_inv.framework = "llamaindex" + # Get the currently active span to establish parent-child relationship + # First try to get from active agent context (workflow-based agents) + parent_span = None + if self._handler._agent_context_stack: + # Get the current agent's span from the span registry + _, agent_run_id = self._handler._agent_context_stack[-1] + parent_span = self._handler._span_registry.get(agent_run_id) + + # Fallback to OpenTelemetry context if no agent span found + if not parent_span: + current_span = trace.get_current_span() + if current_span and current_span.is_recording(): + parent_span = current_span + + if parent_span: + llm_inv.parent_span = parent_span + # Start the LLM invocation (handler stores it in _entity_registry) self._handler.start_llm(llm_inv) @@ -276,6 +294,23 @@ def _handle_embedding_start( ) emb_inv.framework = "llamaindex" + # Get the currently active span to establish parent-child relationship + # First try to get from active agent context (workflow-based agents) + parent_span = None + if self._handler._agent_context_stack: + # Get the current agent's span from the span registry + _, agent_run_id = self._handler._agent_context_stack[-1] + parent_span = self._handler._span_registry.get(agent_run_id) + + # Fallback to OpenTelemetry context if no agent span found + if not parent_span: + current_span = trace.get_current_span() + if current_span and current_span.is_recording(): + parent_span = current_span + + if parent_span: + emb_inv.parent_span = parent_span + # Start the embedding invocation self._handler.start_embedding(emb_inv) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py index 8449c3e..db49fc2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py @@ -130,37 +130,99 @@ def wrap_agent_run(wrapped, instance, args, kwargs): """ Wrap agent.run() to instrument workflow events. - This function wraps the run() method of Workflow-based agents to capture - agent steps and tool calls via workflow event streaming. - """ - handler = wrapped(*args, **kwargs) + This creates a root agent span immediately when agent.run() is called, + ensuring all LLM and tool calls inherit the same trace context. + The root span is pushed onto the agent_context_stack, which allows the + callback handler to retrieve it when LLM/embedding events occur. + """ # Get the initial user message user_msg = kwargs.get('user_msg') or (args[0] if args else "") # Get TelemetryHandler from callback handler if available from llama_index.core import Settings + from opentelemetry.util.genai.types import AgentInvocation + telemetry_handler = None for callback_handler in Settings.callback_manager.handlers: if hasattr(callback_handler, '_handler'): telemetry_handler = callback_handler._handler break + # Create a root agent span immediately to ensure all subsequent calls + # (LLM, tools) inherit this trace context + root_agent = None if telemetry_handler: - # Create workflow instrumentor + from uuid import uuid4 + + # Create root agent invocation before workflow starts + root_agent = AgentInvocation( + name=f"agent.{type(instance).__name__}", + run_id=str(uuid4()), + input_context=str(user_msg), + attributes={}, + ) + root_agent.framework = "llamaindex" + root_agent.agent_name = type(instance).__name__ + + # Start the root agent span immediately + # This pushes (agent_name, run_id) onto the _agent_context_stack + # and stores the span in _span_registry[run_id] + telemetry_handler.start_agent(root_agent) + + # Call the original run() method to get the workflow handler + handler = wrapped(*args, **kwargs) + + if telemetry_handler and root_agent: + # Create workflow instrumentor for detailed step tracking instrumentor = WorkflowEventInstrumentor(telemetry_handler) - # Start background task to stream events - async def stream_events_background(): - try: - await instrumentor.instrument_workflow_handler(handler, str(user_msg)) - except Exception as e: - # Log error but don't crash the workflow - import logging - logger = logging.getLogger(__name__) - logger.warning(f"Error instrumenting workflow events: {e}") + # Wrap the handler to close the root span when the workflow completes + original_handler = handler + + class InstrumentedHandler: + """Wrapper that closes the root agent span when workflow completes.""" + def __init__(self, original, root_span_agent): + self._original = original + self._root_agent = root_span_agent + self._result = None + + def __await__(self): + # Start background task to instrument workflow events + async def stream_events(): + try: + await instrumentor.instrument_workflow_handler( + self._original, str(user_msg) + ) + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.warning(f"Error instrumenting workflow events: {e}") + + asyncio.create_task(stream_events()) + + # Wait for the actual workflow to complete and return the result + return self._await_impl().__await__() + + async def _await_impl(self): + """Actual async implementation.""" + try: + self._result = await self._original + self._root_agent.output_result = str(self._result) + telemetry_handler.stop_agent(self._root_agent) + except Exception as e: + from opentelemetry.util.genai.types import Error + telemetry_handler.fail_agent( + self._root_agent, + Error(message=str(e), type=type(e)) + ) + raise + return self._result + + def __getattr__(self, name): + # Delegate all other attributes to the original handler + return getattr(self._original, name) - # Launch background task - asyncio.create_task(stream_events_background()) + handler = InstrumentedHandler(original_handler, root_agent) return handler diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py index ddfcfa5..6dd5ec9 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py @@ -4,9 +4,12 @@ This test validates that workflow event streaming captures agent steps and tool calls. """ import asyncio +import pytest +from typing import List from llama_index.core.agent import ReActAgent from llama_index.core import Settings -from llama_index.llms.openai import OpenAI +from llama_index.core.llms import MockLLM +from llama_index.core.base.llms.types import ChatMessage, MessageRole from llama_index.core.tools import FunctionTool from opentelemetry import trace from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor @@ -32,6 +35,56 @@ def setup_telemetry(): return tracer_provider +class SequenceMockLLM(MockLLM): + responses: List[ChatMessage] = [] + response_index: int = 0 + + def __init__(self, responses: List[ChatMessage], max_tokens: int = 256): + super().__init__(max_tokens=max_tokens) + self.responses = responses + self.response_index = 0 + + def chat(self, messages, **kwargs): + if self.response_index < len(self.responses): + response = self.responses[self.response_index] + self.response_index += 1 + from llama_index.core.base.llms.types import ChatResponse + return ChatResponse(message=response) + return ChatResponse(message=ChatMessage(role=MessageRole.ASSISTANT, content="Done.")) + + async def achat(self, messages, **kwargs): + if self.response_index < len(self.responses): + response = self.responses[self.response_index] + self.response_index += 1 + from llama_index.core.base.llms.types import ChatResponse + return ChatResponse(message=response) + return ChatResponse(message=ChatMessage(role=MessageRole.ASSISTANT, content="Done.")) + + def stream_chat(self, messages, **kwargs): + if self.response_index < len(self.responses): + response = self.responses[self.response_index] + self.response_index += 1 + from llama_index.core.base.llms.types import ChatResponseGen, ChatResponse + # Yield a single response chunk + yield ChatResponse(message=response, delta=response.content) + else: + yield ChatResponse(message=ChatMessage(role=MessageRole.ASSISTANT, content="Done."), delta="Done.") + + async def astream_chat(self, messages, **kwargs): + async def gen(): + if self.response_index < len(self.responses): + response = self.responses[self.response_index] + self.response_index += 1 + from llama_index.core.base.llms.types import ChatResponse + # Yield a single response chunk + yield ChatResponse(message=response, delta=response.content) + else: + yield ChatResponse(message=ChatMessage(role=MessageRole.ASSISTANT, content="Done."), delta="Done.") + + return gen() + + +@pytest.mark.asyncio async def test_workflow_agent(): """Test Workflow-based agent instrumentation.""" @@ -40,8 +93,23 @@ async def test_workflow_agent(): print("=" * 80) tracer_provider = setup_telemetry() - # Setup LlamaIndex - Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0.1) + # Setup Mock LLM + mock_responses = [ + # Step 1: Decide to multiply + ChatMessage(role=MessageRole.ASSISTANT, content="""Thought: I need to multiply 5 by 3 first. +Action: multiply +Action Input: {"a": 5, "b": 3}"""), + + # Step 2: Decide to add + ChatMessage(role=MessageRole.ASSISTANT, content="""Thought: The result is 15. Now I need to add 2 to 15. +Action: add +Action Input: {"a": 15, "b": 2}"""), + + # Step 3: Final Answer + ChatMessage(role=MessageRole.ASSISTANT, content="""Thought: The final result is 17. +Answer: The result is 17."""), + ] + Settings.llm = SequenceMockLLM(responses=mock_responses, max_tokens=256) # Instrument print("\n" + "=" * 80) From 4765dcbc91714cf62c83e222b7939fe5c280ab1c Mon Sep 17 00:00:00 2001 From: shuningc Date: Tue, 9 Dec 2025 06:38:17 -0800 Subject: [PATCH 6/8] Fix ruff linting errors: remove unused imports and variables --- .../instrumentation/llamaindex/callback_handler.py | 2 -- .../instrumentation/llamaindex/workflow_instrumentation.py | 1 - .../tests/test_workflow_agent.py | 2 +- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py index e15419c..b4654e5 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -11,10 +11,8 @@ InputMessage, LLMInvocation, OutputMessage, - Step, Text, ToolCall, - Workflow, ) from .vendor_detection import detect_vendor_from_class diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py index db49fc2..ae2fc96 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py @@ -6,7 +6,6 @@ """ import asyncio -from typing import Any, Optional from uuid import uuid4 from opentelemetry.util.genai.handler import TelemetryHandler diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py index 6dd5ec9..8c2d014 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py @@ -64,7 +64,7 @@ def stream_chat(self, messages, **kwargs): if self.response_index < len(self.responses): response = self.responses[self.response_index] self.response_index += 1 - from llama_index.core.base.llms.types import ChatResponseGen, ChatResponse + from llama_index.core.base.llms.types import ChatResponse # Yield a single response chunk yield ChatResponse(message=response, delta=response.content) else: From 8e3b049fb463d795f1c45cb45f5693dd0b09b8c5 Mon Sep 17 00:00:00 2001 From: shuningc Date: Tue, 9 Dec 2025 06:46:23 -0800 Subject: [PATCH 7/8] Apply ruff formatting to LlamaIndex instrumentation files --- .../examples/travel_planner_agent.py | 12 ++- .../instrumentation/llamaindex/__init__.py | 4 +- .../llamaindex/callback_handler.py | 76 +++++++-------- .../llamaindex/vendor_detection.py | 6 +- .../llamaindex/workflow_instrumentation.py | 92 ++++++++++--------- .../tests/test_embedding_instrumentation.py | 6 +- .../tests/test_llm_instrumentation.py | 31 +++---- .../tests/test_workflow_agent.py | 54 +++++++---- 8 files changed, 159 insertions(+), 122 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_agent.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_agent.py index 734c42c..234fc14 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_agent.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_agent.py @@ -16,6 +16,7 @@ from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor + # 1. Setup Telemetry def setup_telemetry(): trace.set_tracer_provider(TracerProvider()) @@ -26,22 +27,26 @@ def setup_telemetry(): metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter(insecure=True)) metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader])) + # 2. Define Tools def search_flights(origin: str, destination: str, date: str) -> str: """Search for flights between two cities on a specific date.""" print(f" [Tool] Searching flights from {origin} to {destination} on {date}...") return f"Flight UA123 from {origin} to {destination} on {date} costs $500." + def search_hotels(city: str, check_in: str) -> str: """Search for hotels in a city.""" print(f" [Tool] Searching hotels in {city} for {check_in}...") return f"Hotel Grand in {city} is available for $200/night." + def book_ticket(flight_number: str) -> str: """Book a flight ticket.""" print(f" [Tool] Booking flight {flight_number}...") return f"Confirmed booking for {flight_number}. Ticket #999." + # 3. Main Agent Logic async def run_travel_planner(): # Check for API Key @@ -50,7 +55,7 @@ async def run_travel_planner(): sys.exit(1) setup_telemetry() - + # Instrument LlamaIndex LlamaindexInstrumentor().instrument() @@ -70,12 +75,12 @@ async def run_travel_planner(): # Run Workflow user_request = "I want to fly from New York to Paris on 2023-12-01. Find a flight and book it, then find a hotel." - + # We use the async run method which returns the handler we instrumented # This triggers wrap_agent_run -> WorkflowEventInstrumentor handler = agent.run(user_msg=user_request) response = await handler - + print(f"\nFinal Response: {response}") # Ensure spans are flushed before exit @@ -85,5 +90,6 @@ async def run_travel_planner(): if hasattr(provider, "shutdown"): provider.shutdown() + if __name__ == "__main__": asyncio.run(run_travel_planner()) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py index 7aeb05c..64a2836 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py @@ -22,9 +22,7 @@ def __init__( super().__init__() Config._exception_logger = exception_logger Config.use_legacy_attributes = use_legacy_attributes - self._disable_trace_context_propagation = ( - disable_trace_context_propagation - ) + self._disable_trace_context_propagation = disable_trace_context_propagation self._telemetry_handler = None def instrumentation_dependencies(self): diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py index b4654e5..a59a1c0 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -101,9 +101,7 @@ def _handle_llm_start( # Extract model information from payload serialized = payload.get("serialized", {}) model_name = ( - serialized.get("model") - or serialized.get("model_name") - or "unknown" + serialized.get("model") or serialized.get("model_name") or "unknown" ) # Extract messages from payload @@ -116,16 +114,12 @@ def _handle_llm_start( if hasattr(msg, "content") and hasattr(msg, "role"): # Extract role - could be MessageRole enum role_value = ( - str(msg.role.value) - if hasattr(msg.role, "value") - else str(msg.role) + str(msg.role.value) if hasattr(msg.role, "value") else str(msg.role) ) # Extract content - this is a property that pulls from blocks[0].text content = _safe_str(msg.content) input_messages.append( - InputMessage( - role=role_value, parts=[Text(content=content)] - ) + InputMessage(role=role_value, parts=[Text(content=content)]) ) elif isinstance(msg, dict): # Handle serialized messages (dict format) @@ -138,9 +132,7 @@ def _handle_llm_start( # Fallback to direct content field content = msg.get("content", "") - role_value = ( - str(role.value) if hasattr(role, "value") else str(role) - ) + role_value = str(role.value) if hasattr(role, "value") else str(role) input_messages.append( InputMessage( role=role_value, @@ -164,13 +156,13 @@ def _handle_llm_start( # Get the current agent's span from the span registry _, agent_run_id = self._handler._agent_context_stack[-1] parent_span = self._handler._span_registry.get(agent_run_id) - + # Fallback to OpenTelemetry context if no agent span found if not parent_span: current_span = trace.get_current_span() if current_span and current_span.is_recording(): parent_span = current_span - + if parent_span: llm_inv.parent_span = parent_span @@ -243,7 +235,7 @@ def _handle_llm_end( else: # It's an object, try to get usage attribute usage = getattr(raw_response, "usage", None) - + if usage: # usage could also be dict or object if isinstance(usage, dict): @@ -251,7 +243,9 @@ def _handle_llm_end( llm_inv.output_tokens = usage.get("completion_tokens") else: llm_inv.input_tokens = getattr(usage, "prompt_tokens", None) - llm_inv.output_tokens = getattr(usage, "completion_tokens", None) + llm_inv.output_tokens = getattr( + usage, "completion_tokens", None + ) # Stop the LLM invocation self._handler.stop_llm(llm_inv) @@ -270,9 +264,7 @@ def _handle_embedding_start( # Extract model information from payload serialized = payload.get("serialized", {}) model_name = ( - serialized.get("model_name") - or serialized.get("model") - or "unknown" + serialized.get("model_name") or serialized.get("model") or "unknown" ) # Detect provider from class name @@ -299,13 +291,13 @@ def _handle_embedding_start( # Get the current agent's span from the span registry _, agent_run_id = self._handler._agent_context_stack[-1] parent_span = self._handler._span_registry.get(agent_run_id) - + # Fallback to OpenTelemetry context if no agent span found if not parent_span: current_span = trace.get_current_span() if current_span and current_span.is_recording(): parent_span = current_span - + if parent_span: emb_inv.parent_span = parent_span @@ -333,11 +325,11 @@ def _handle_embedding_end( chunks = payload.get("chunks", []) if chunks: emb_inv.input_texts = [_safe_str(chunk) for chunk in chunks] - + # Extract embedding vectors from response # embeddings is the list of output vectors embeddings = payload.get("embeddings", []) - + # Determine dimension from first embedding vector if embeddings and len(embeddings) > 0: first_embedding = embeddings[0] @@ -349,7 +341,9 @@ def _handle_embedding_end( # Stop the embedding invocation self._handler.stop_embedding(emb_inv) - def _find_nearest_agent(self, parent_id: Optional[str]) -> Optional[AgentInvocation]: + def _find_nearest_agent( + self, parent_id: Optional[str] + ) -> Optional[AgentInvocation]: """Walk up parent chain to find the nearest agent invocation.""" if not self._handler: return None @@ -385,7 +379,7 @@ def _handle_agent_step_start( agent_type = None agent_description = None model_name = None - + if step and hasattr(step, "step_state"): # Try to get agent from step state step_state = step.step_state @@ -397,7 +391,9 @@ def _handle_agent_step_start( # Try to get model from agent's LLM if hasattr(agent, "llm"): llm = agent.llm - model_name = getattr(llm, "model", None) or getattr(llm, "model_name", None) + model_name = getattr(llm, "model", None) or getattr( + llm, "model_name", None + ) # Create AgentInvocation for the agent execution agent_invocation = AgentInvocation( @@ -408,7 +404,7 @@ def _handle_agent_step_start( attributes={}, ) agent_invocation.framework = "llamaindex" - + # Set enhanced metadata if agent_name: agent_invocation.agent_name = _safe_str(agent_name) @@ -418,7 +414,7 @@ def _handle_agent_step_start( agent_invocation.description = _safe_str(agent_description) if model_name: agent_invocation.model = _safe_str(model_name) - + self._handler.start_agent_invocation(agent_invocation) def _handle_agent_step_end( @@ -459,10 +455,16 @@ def _handle_function_call_start( tool = payload.get("tool") if not tool: return - - tool_name = getattr(tool, "name", "unknown_tool") if hasattr(tool, "name") else "unknown_tool" - tool_description = getattr(tool, "description", "") if hasattr(tool, "description") else "" - + + tool_name = ( + getattr(tool, "name", "unknown_tool") + if hasattr(tool, "name") + else "unknown_tool" + ) + tool_description = ( + getattr(tool, "description", "") if hasattr(tool, "description") else "" + ) + # Extract function arguments function_call = payload.get("function_call", {}) arguments = function_call if function_call else {} @@ -476,7 +478,7 @@ def _handle_function_call_start( arguments=arguments, id=event_id, ) - + # Set attributes tool_call.attributes = { "tool.description": tool_description, @@ -484,14 +486,16 @@ def _handle_function_call_start( tool_call.run_id = event_id # type: ignore[attr-defined] tool_call.parent_run_id = parent_id if parent_id else None # type: ignore[attr-defined] tool_call.framework = "llamaindex" # type: ignore[attr-defined] - + # Propagate agent context to tool call if context_agent: - agent_name = getattr(context_agent, "agent_name", None) or getattr(context_agent, "name", None) + agent_name = getattr(context_agent, "agent_name", None) or getattr( + context_agent, "name", None + ) if agent_name: tool_call.agent_name = _safe_str(agent_name) # type: ignore[attr-defined] tool_call.agent_id = str(context_agent.run_id) # type: ignore[attr-defined] - + # Start the tool call self._handler.start_tool_call(tool_call) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py index 6f9c9f0..3feeaee 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py @@ -44,7 +44,11 @@ def _get_vendor_rules() -> List[VendorRule]: vendor_name="aws", ), VendorRule( - exact_matches={"VertexTextEmbedding", "GeminiEmbedding", "GooglePaLMEmbedding"}, + exact_matches={ + "VertexTextEmbedding", + "GeminiEmbedding", + "GooglePaLMEmbedding", + }, patterns=["vertex", "google", "palm", "gemini"], vendor_name="google", ), diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py index ae2fc96..bce3dce 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py @@ -14,16 +14,16 @@ class WorkflowEventInstrumentor: """Instrumentor that wraps WorkflowHandler to capture agent and tool events.""" - + def __init__(self, handler: TelemetryHandler): self._handler = handler self._active_agents = {} # event_id -> AgentInvocation - self._active_tools = {} # tool_id -> ToolCall - + self._active_tools = {} # tool_id -> ToolCall + async def instrument_workflow_handler(self, workflow_handler, initial_message: str): """ Instrument a WorkflowHandler by streaming its events and creating telemetry spans. - + Args: workflow_handler: The WorkflowHandler returned by agent.run() initial_message: The user's initial message to the agent @@ -34,10 +34,10 @@ async def instrument_workflow_handler(self, workflow_handler, initial_message: s ToolCall as WorkflowToolCall, ToolCallResult, ) - + agent_invocation = None agent_run_id = None - + try: async for event in workflow_handler.stream_events(): # Agent step start @@ -47,15 +47,17 @@ async def instrument_workflow_handler(self, workflow_handler, initial_message: s agent_invocation = AgentInvocation( name=f"agent.{event.current_agent_name}", run_id=agent_run_id, - input_context=str(event.input) if hasattr(event, 'input') and event.input else initial_message, + input_context=str(event.input) + if hasattr(event, "input") and event.input + else initial_message, attributes={}, ) agent_invocation.framework = "llamaindex" agent_invocation.agent_name = event.current_agent_name - + self._handler.start_agent(agent_invocation) self._active_agents[agent_run_id] = agent_invocation - + # Tool call start elif isinstance(event, WorkflowToolCall): tool_call = ToolCall( @@ -65,95 +67,97 @@ async def instrument_workflow_handler(self, workflow_handler, initial_message: s attributes={}, ) tool_call.framework = "llamaindex" - + # Associate with current agent if available if agent_invocation: tool_call.agent_name = agent_invocation.agent_name tool_call.agent_id = str(agent_invocation.run_id) # Set parent_span explicitly - the agent span is the parent of this tool - if hasattr(agent_invocation, 'span') and agent_invocation.span: + if hasattr(agent_invocation, "span") and agent_invocation.span: tool_call.parent_span = agent_invocation.span - + self._handler.start_tool_call(tool_call) self._active_tools[event.tool_id] = tool_call - + # Tool call end elif isinstance(event, ToolCallResult): tool_call = self._active_tools.get(event.tool_id) if tool_call: # Extract result result = event.tool_output - if hasattr(result, 'content'): + if hasattr(result, "content"): tool_call.result = str(result.content) else: tool_call.result = str(result) - + self._handler.stop_tool_call(tool_call) del self._active_tools[event.tool_id] - + # Agent step end (when no more tools to call) elif isinstance(event, AgentOutput): # Check if this is the final output (no tool calls) if not event.tool_calls and agent_invocation: # Extract response - if hasattr(event.response, 'content'): + if hasattr(event.response, "content"): agent_invocation.output_result = str(event.response.content) else: agent_invocation.output_result = str(event.response) - + self._handler.stop_agent(agent_invocation) if agent_run_id: del self._active_agents[agent_run_id] agent_invocation = None agent_run_id = None - + except Exception as e: # Clean up any active spans on error for tool_call in list(self._active_tools.values()): from opentelemetry.util.genai.types import Error + error = Error(message=str(e), type=type(e)) self._handler.fail_tool_call(tool_call, error) self._active_tools.clear() - + if agent_invocation: from opentelemetry.util.genai.types import Error + error = Error(message=str(e), type=type(e)) self._handler.fail_agent(agent_invocation, error) if agent_run_id: del self._active_agents[agent_run_id] - + raise def wrap_agent_run(wrapped, instance, args, kwargs): """ Wrap agent.run() to instrument workflow events. - + This creates a root agent span immediately when agent.run() is called, ensuring all LLM and tool calls inherit the same trace context. - + The root span is pushed onto the agent_context_stack, which allows the callback handler to retrieve it when LLM/embedding events occur. """ # Get the initial user message - user_msg = kwargs.get('user_msg') or (args[0] if args else "") - + user_msg = kwargs.get("user_msg") or (args[0] if args else "") + # Get TelemetryHandler from callback handler if available from llama_index.core import Settings from opentelemetry.util.genai.types import AgentInvocation - + telemetry_handler = None for callback_handler in Settings.callback_manager.handlers: - if hasattr(callback_handler, '_handler'): + if hasattr(callback_handler, "_handler"): telemetry_handler = callback_handler._handler break - + # Create a root agent span immediately to ensure all subsequent calls # (LLM, tools) inherit this trace context root_agent = None if telemetry_handler: from uuid import uuid4 - + # Create root agent invocation before workflow starts root_agent = AgentInvocation( name=f"agent.{type(instance).__name__}", @@ -163,29 +167,30 @@ def wrap_agent_run(wrapped, instance, args, kwargs): ) root_agent.framework = "llamaindex" root_agent.agent_name = type(instance).__name__ - + # Start the root agent span immediately # This pushes (agent_name, run_id) onto the _agent_context_stack # and stores the span in _span_registry[run_id] telemetry_handler.start_agent(root_agent) - + # Call the original run() method to get the workflow handler handler = wrapped(*args, **kwargs) - + if telemetry_handler and root_agent: # Create workflow instrumentor for detailed step tracking instrumentor = WorkflowEventInstrumentor(telemetry_handler) - + # Wrap the handler to close the root span when the workflow completes original_handler = handler - + class InstrumentedHandler: """Wrapper that closes the root agent span when workflow completes.""" + def __init__(self, original, root_span_agent): self._original = original self._root_agent = root_span_agent self._result = None - + def __await__(self): # Start background task to instrument workflow events async def stream_events(): @@ -195,14 +200,15 @@ async def stream_events(): ) except Exception as e: import logging + logger = logging.getLogger(__name__) logger.warning(f"Error instrumenting workflow events: {e}") - + asyncio.create_task(stream_events()) - + # Wait for the actual workflow to complete and return the result return self._await_impl().__await__() - + async def _await_impl(self): """Actual async implementation.""" try: @@ -211,17 +217,17 @@ async def _await_impl(self): telemetry_handler.stop_agent(self._root_agent) except Exception as e: from opentelemetry.util.genai.types import Error + telemetry_handler.fail_agent( - self._root_agent, - Error(message=str(e), type=type(e)) + self._root_agent, Error(message=str(e), type=type(e)) ) raise return self._result - + def __getattr__(self, name): # Delegate all other attributes to the original handler return getattr(self._original, name) - + handler = InstrumentedHandler(original_handler, root_agent) - + return handler diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py index 355a057..9828bb5 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py @@ -22,10 +22,10 @@ def setup_telemetry(): """Setup OpenTelemetry with span and metric exporters (once).""" global metric_reader, instrumentor - + if metric_reader is not None: return metric_reader - + # Enable metrics os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" @@ -145,7 +145,7 @@ def test_embedding_batch(): test_embedding_single_text() print("\n" + "=" * 60 + "\n") test_embedding_batch() - + # Cleanup if instrumentor: instrumentor.uninstrument() diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py index 50324c3..3081a15 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py @@ -21,9 +21,7 @@ def setup_telemetry(): # Setup tracing trace.set_tracer_provider(TracerProvider()) tracer_provider = trace.get_tracer_provider() - tracer_provider.add_span_processor( - SimpleSpanProcessor(ConsoleSpanExporter()) - ) + tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) # Setup metrics with InMemoryMetricReader metric_reader = InMemoryMetricReader() @@ -43,12 +41,8 @@ def test_with_openai(): llm = OpenAI(model="gpt-3.5-turbo") messages = [ - ChatMessage( - role=MessageRole.SYSTEM, content="You are a helpful assistant." - ), - ChatMessage( - role=MessageRole.USER, content="Say hello in exactly 5 words" - ), + ChatMessage(role=MessageRole.SYSTEM, content="You are a helpful assistant."), + ChatMessage(role=MessageRole.USER, content="Say hello in exactly 5 words"), ] response = llm.chat(messages) @@ -70,7 +64,9 @@ def test_with_openai(): completion_tokens = getattr(usage, "completion_tokens", None) total_tokens = getattr(usage, "total_tokens", None) - print(f"\nToken Usage: input={prompt_tokens}, output={completion_tokens}, total={total_tokens}") + print( + f"\nToken Usage: input={prompt_tokens}, output={completion_tokens}, total={total_tokens}" + ) print("=" * 80) @@ -94,9 +90,7 @@ def test_with_mock(): llm = MockLLM(max_tokens=50) messages = [ - ChatMessage( - role=MessageRole.SYSTEM, content="You are a helpful assistant." - ), + ChatMessage(role=MessageRole.SYSTEM, content="You are a helpful assistant."), ChatMessage(role=MessageRole.USER, content="Say hello in 5 words"), ] @@ -132,8 +126,7 @@ def test_message_extraction(): # Instrument LlamaIndex instrumentor = LlamaindexInstrumentor() instrumentor.instrument( - tracer_provider=tracer_provider, - meter_provider=meter_provider + tracer_provider=tracer_provider, meter_provider=meter_provider ) print("LlamaIndex instrumentation enabled\n") @@ -174,8 +167,12 @@ def test_message_extraction(): found_token_usage = True dps = getattr(metric.data, "data_points", []) for dp in dps: - token_type = dp.attributes.get("gen_ai.token.type", "unknown") - print(f" Token type: {token_type}, Sum: {dp.sum}, Count: {dp.count}") + token_type = dp.attributes.get( + "gen_ai.token.type", "unknown" + ) + print( + f" Token type: {token_type}, Sum: {dp.sum}, Count: {dp.count}" + ) print("\n" + "=" * 80) status = [] diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py index 8c2d014..e0dd0f2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py @@ -3,6 +3,7 @@ This test validates that workflow event streaming captures agent steps and tool calls. """ + import asyncio import pytest from typing import List @@ -38,7 +39,7 @@ def setup_telemetry(): class SequenceMockLLM(MockLLM): responses: List[ChatMessage] = [] response_index: int = 0 - + def __init__(self, responses: List[ChatMessage], max_tokens: int = 256): super().__init__(max_tokens=max_tokens) self.responses = responses @@ -49,26 +50,36 @@ def chat(self, messages, **kwargs): response = self.responses[self.response_index] self.response_index += 1 from llama_index.core.base.llms.types import ChatResponse + return ChatResponse(message=response) - return ChatResponse(message=ChatMessage(role=MessageRole.ASSISTANT, content="Done.")) + return ChatResponse( + message=ChatMessage(role=MessageRole.ASSISTANT, content="Done.") + ) async def achat(self, messages, **kwargs): if self.response_index < len(self.responses): response = self.responses[self.response_index] self.response_index += 1 from llama_index.core.base.llms.types import ChatResponse + return ChatResponse(message=response) - return ChatResponse(message=ChatMessage(role=MessageRole.ASSISTANT, content="Done.")) + return ChatResponse( + message=ChatMessage(role=MessageRole.ASSISTANT, content="Done.") + ) def stream_chat(self, messages, **kwargs): if self.response_index < len(self.responses): response = self.responses[self.response_index] self.response_index += 1 from llama_index.core.base.llms.types import ChatResponse + # Yield a single response chunk yield ChatResponse(message=response, delta=response.content) else: - yield ChatResponse(message=ChatMessage(role=MessageRole.ASSISTANT, content="Done."), delta="Done.") + yield ChatResponse( + message=ChatMessage(role=MessageRole.ASSISTANT, content="Done."), + delta="Done.", + ) async def astream_chat(self, messages, **kwargs): async def gen(): @@ -76,18 +87,22 @@ async def gen(): response = self.responses[self.response_index] self.response_index += 1 from llama_index.core.base.llms.types import ChatResponse + # Yield a single response chunk yield ChatResponse(message=response, delta=response.content) else: - yield ChatResponse(message=ChatMessage(role=MessageRole.ASSISTANT, content="Done."), delta="Done.") - + yield ChatResponse( + message=ChatMessage(role=MessageRole.ASSISTANT, content="Done."), + delta="Done.", + ) + return gen() @pytest.mark.asyncio async def test_workflow_agent(): """Test Workflow-based agent instrumentation.""" - + print("=" * 80) print("Setting up telemetry...") print("=" * 80) @@ -96,18 +111,25 @@ async def test_workflow_agent(): # Setup Mock LLM mock_responses = [ # Step 1: Decide to multiply - ChatMessage(role=MessageRole.ASSISTANT, content="""Thought: I need to multiply 5 by 3 first. + ChatMessage( + role=MessageRole.ASSISTANT, + content="""Thought: I need to multiply 5 by 3 first. Action: multiply -Action Input: {"a": 5, "b": 3}"""), - +Action Input: {"a": 5, "b": 3}""", + ), # Step 2: Decide to add - ChatMessage(role=MessageRole.ASSISTANT, content="""Thought: The result is 15. Now I need to add 2 to 15. + ChatMessage( + role=MessageRole.ASSISTANT, + content="""Thought: The result is 15. Now I need to add 2 to 15. Action: add -Action Input: {"a": 15, "b": 2}"""), - +Action Input: {"a": 15, "b": 2}""", + ), # Step 3: Final Answer - ChatMessage(role=MessageRole.ASSISTANT, content="""Thought: The final result is 17. -Answer: The result is 17."""), + ChatMessage( + role=MessageRole.ASSISTANT, + content="""Thought: The final result is 17. +Answer: The result is 17.""", + ), ] Settings.llm = SequenceMockLLM(responses=mock_responses, max_tokens=256) @@ -130,7 +152,7 @@ async def test_workflow_agent(): print("\n" + "=" * 80) print("Running agent task (should see AgentInvocation -> ToolCall spans)...") print("=" * 80) - + handler = agent.run(user_msg="Calculate 5 times 3, then add 2 to the result") result = await handler From ddbb103a0d22856e0b58d624a45737790245fc3f Mon Sep 17 00:00:00 2001 From: shuningc Date: Fri, 12 Dec 2025 08:38:03 -0800 Subject: [PATCH 8/8] Adding deployment files for demo tarvel-planner app --- .../examples/travel_planner_k8s/Dockerfile | 24 ++ .../examples/travel_planner_k8s/README.md | 119 +++++++ .../examples/travel_planner_k8s/cronjob.yaml | 85 +++++ .../travel_planner_k8s/deployment.yaml | 126 +++++++ .../travel_planner_k8s/main_server.py | 331 ++++++++++++++++++ .../travel_planner_k8s/requirements.txt | 5 + .../llamaindex/workflow_instrumentation.py | 28 +- 7 files changed, 712 insertions(+), 6 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/Dockerfile create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/README.md create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/cronjob.yaml create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/deployment.yaml create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/main_server.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/requirements.txt diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/Dockerfile b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/Dockerfile new file mode 100644 index 0000000..93138ff --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/Dockerfile @@ -0,0 +1,24 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Disable telemetry during build to avoid connection errors +ENV OTEL_SDK_DISABLED=true + +# Install dependencies +COPY instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/requirements.txt . +RUN pip install --default-timeout=100 --retries=5 -r requirements.txt + +# Copy and install local instrumentation package +COPY instrumentation-genai/opentelemetry-instrumentation-llamaindex /tmp/instrumentation-llamaindex/ +RUN pip install --no-cache-dir /tmp/instrumentation-llamaindex && \ + rm -rf /tmp/instrumentation-llamaindex + +# Copy application code +COPY instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/main_server.py . + +# Expose port +EXPOSE 8080 + +# Run the server +CMD ["python", "main_server.py"] diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/README.md b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/README.md new file mode 100644 index 0000000..bdae86f --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/README.md @@ -0,0 +1,119 @@ +# LlamaIndex Travel Planner - Kubernetes Deployment + +This directory contains Kubernetes deployment files for the LlamaIndex Travel Planner example application with OpenTelemetry instrumentation. + +## Components + +- **Server (`main_server.py`)**: HTTP server that exposes a `/plan` endpoint for travel planning requests using LlamaIndex ReActAgent +- **Deployment (`deployment.yaml`)**: Kubernetes Deployment and Service configuration +- **CronJob (`cronjob.yaml`)**: Automated load generator that sends periodic travel planning requests + +## Architecture + +``` +┌─────────────┐ +│ CronJob │ (Load Generator) +│ (curl) │ +└──────┬──────┘ + │ HTTP POST /plan + ▼ +┌─────────────────────────────┐ +│ Deployment │ +│ llamaindex-travel-planner │ +│ ┌─────────────────────┐ │ +│ │ LlamaIndex Agent │ │ +│ │ + OTEL Instrumentation│ │ +│ └─────────────────────┘ │ +└──────┬──────────────────────┘ + │ OTLP (gRPC) + ▼ +┌─────────────┐ +│ OTEL │ +│ Collector │ +└─────────────┘ +``` + +## Prerequisites + +1. Kubernetes cluster with namespace `travel-planner` +2. OpenAI API key stored as secret: + ```bash + kubectl create secret generic openai-api-keys \ + --from-literal=openai-api-key=YOUR_API_KEY \ + -n travel-planner + ``` +3. OpenTelemetry Collector running on cluster nodes (DaemonSet) + +## Building the Docker Image + +```bash +# From this directory +docker build -t shuniche855/llamaindex-travel-planner:0.0.1 . + +# Push to registry +docker push shuniche855/llamaindex-travel-planner:0.0.1 +``` + +## Deployment + +```bash +# Deploy the server +kubectl apply -f deployment.yaml + +# Deploy the load generator CronJob +kubectl apply -f cronjob.yaml +``` + +## Testing + +### Health Check + +```bash +kubectl port-forward -n travel-planner svc/llamaindex-travel-planner-service 8080:80 +curl http://localhost:8080/health +``` + +### Manual Request + +```bash +curl -X POST http://localhost:8080/plan \ + -H "Content-Type: application/json" \ + -d '{ + "destination": "Paris", + "origin": "New York", + "budget": 3000, + "duration": 5, + "travelers": 2, + "interests": ["sightseeing", "food", "culture"], + "departure_date": "2024-06-15" + }' +``` + +## Environment Variables + +Key environment variables configured in `deployment.yaml`: + +- `OTEL_SERVICE_NAME`: Service name for telemetry +- `OTEL_EXPORTER_OTLP_ENDPOINT`: OTLP collector endpoint +- `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT`: Enable message content capture +- `OPENAI_API_KEY`: OpenAI API key (from secret) + +## Monitoring + +The application generates: + +- **Traces**: LlamaIndex agent execution, LLM calls, tool invocations +- **Metrics**: LLM token usage, latency, error rates +- **Logs**: Application logs with trace correlation + +View traces in your observability platform (Splunk O11y, Jaeger, etc.) + +## Load Generation Schedule + +The CronJob runs: + +- **Schedule**: Every 30 minutes during business hours +- **Days**: Monday-Friday +- **Time**: 8am-6pm PST (16:00-02:00 UTC) + +Adjust the schedule in `cronjob.yaml` as needed. diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/cronjob.yaml b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/cronjob.yaml new file mode 100644 index 0000000..6c94887 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/cronjob.yaml @@ -0,0 +1,85 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: llamaindex-travel-planner-load-generator + namespace: llamaindex-travel-planner +spec: + schedule: "0,30 16-23,0-2 * * 1-5" # Every 30 min, 8am-6pm PST Mon-Fri (16:00-02:00 UTC) + jobTemplate: + spec: + template: + spec: + containers: + - name: load-generator + image: curlimages/curl:latest + command: + - /bin/sh + - -c + - | + echo "Generating LlamaIndex travel planning requests..." + + # Travel destinations and parameters + destinations="Tokyo Paris NewYork Sydney Barcelona Dubai London Singapore Rome Amsterdam" + origins="Boston Chicago Seattle LosAngeles Miami Denver Atlanta Portland" + budgets="2000 3000 1500 4000 2500 3500" + durations="3 5 7 10 14" + + # Interest combinations + interests_1="sightseeing food culture" + interests_2="adventure shopping food" + interests_3="culture sightseeing" + interests_4="food adventure shopping" + + # Generate a random request + RAND=$(date +%s) + dest=$(echo $destinations | cut -d' ' -f$((($RAND % 10) + 1))) + origin=$(echo $origins | cut -d' ' -f$((($RAND % 8) + 1))) + budget=$(echo $budgets | cut -d' ' -f$((($RAND % 6) + 1))) + duration=$(echo $durations | cut -d' ' -f$((($RAND % 5) + 1))) + travelers=$((($RAND % 4) + 1)) + + # Select random interests + case $(($RAND % 4)) in + 0) interests=$interests_1;; + 1) interests=$interests_2;; + 2) interests=$interests_3;; + *) interests=$interests_4;; + esac + + # Convert interests to JSON array + interest_json=$(echo $interests | awk '{ + printf "[" + for(i=1; i<=NF; i++) { + if(i>1) printf "," + printf "\"%s\"", $i + } + printf "]" + }') + + echo "Planning trip:" + echo " From: $origin" + echo " To: $dest" + echo " Budget: \$$budget" + echo " Duration: $duration days" + echo " Travelers: $travelers" + echo " Interests: $interests" + + # Make request to the LlamaIndex travel planner service + curl -X POST http://llamaindex-travel-planner-service.llamaindex-travel-planner.svc.cluster.local/plan \ + -H "Content-Type: application/json" \ + -d "{ + \"destination\": \"$dest\", + \"origin\": \"$origin\", + \"budget\": $budget, + \"duration\": $duration, + \"travelers\": $travelers, + \"interests\": $interest_json, + \"departure_date\": \"2024-06-15\" + }" \ + --max-time 600 || echo "Request failed" + + echo "Load generation completed" + restartPolicy: OnFailure + backoffLimit: 3 + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 3 diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/deployment.yaml b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/deployment.yaml new file mode 100644 index 0000000..77a11bf --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/deployment.yaml @@ -0,0 +1,126 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: llamaindex-travel-planner + namespace: llamaindex-travel-planner + labels: + app: llamaindex-travel-planner +spec: + replicas: 1 + selector: + matchLabels: + app: llamaindex-travel-planner + template: + metadata: + labels: + app: llamaindex-travel-planner + spec: + containers: + - name: travel-planner + image: shuniche855/llamaindex-travel-planner:0.0.9 + imagePullPolicy: Always + ports: + - containerPort: 8080 + name: http + env: + # Python unbuffered output for real-time logging + - name: PYTHONUNBUFFERED + value: "1" + # Load OpenAI API key from secret + - name: OPENAI_API_KEY + valueFrom: + secretKeyRef: + name: openai-api-keys + key: openai-api-key + # Enable OTEL SDK (disabled in Dockerfile during build) + - name: OTEL_SDK_DISABLED + value: "false" + # Service Name + - name: OTEL_SERVICE_NAME + value: "opentelemetry-python-llamaindex-travel-planner" + # Additional OTEL configuration + - name: OTEL_RESOURCE_ATTRIBUTES + value: "deployment.environment=o11y-inframon-ai" + - name: SPLUNK_OTEL_AGENT + valueFrom: + fieldRef: + fieldPath: status.hostIP + - name: OTEL_EXPORTER_OTLP_ENDPOINT + value: "http://$(SPLUNK_OTEL_AGENT):4317" + - name: OTEL_EXPORTER_OTLP_PROTOCOL + value: "grpc" + - name: HOME + value: "/tmp" + - name: OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE + value: "DELTA" + - name: OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED + value: "true" + - name: OTEL_PYTHON_EXCLUDED_URLS + value: "^(https?://)?[^/]+(/health)?$" + - name: OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT + value: "true" + - name: OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT_MODE + value: "SPAN_AND_EVENT" + - name: OTEL_INSTRUMENTATION_GENAI_EVALS_RESULTS_AGGREGATION + value: "true" + - name: OTEL_INSTRUMENTATION_GENAI_EMITTERS + value: "span_metric_event,splunk" + - name: OTEL_INSTRUMENTATION_GENAI_EMITTERS_EVALUATION + value: "replace-category:SplunkEvaluationResults" + - name: OTEL_GENAI_EVAL_DEBUG_SKIPS + value: "true" + - name: OTEL_GENAI_EVAL_DEBUG_EACH + value: "false" + - name: OTEL_INSTRUMENTATION_GENAI_DEBUG + value: "true" + - name: SPLUNK_PROFILER_ENABLED + value: "true" + # Set evaluation wait time to 10 seconds (short enough to avoid health check timeout) + - name: EVAL_WAIT_SECONDS + value: "10" + - name: PORT + value: "8080" + resources: + requests: + memory: "512Mi" + cpu: "250m" + limits: + memory: "2Gi" + cpu: "1000m" + securityContext: + allowPrivilegeEscalation: false + runAsNonRoot: true + runAsUser: 1000 + capabilities: + drop: + - ALL + readOnlyRootFilesystem: false + livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 30 + timeoutSeconds: 10 + failureThreshold: 10 + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 5 + periodSeconds: 5 + restartPolicy: Always +--- +apiVersion: v1 +kind: Service +metadata: + name: llamaindex-travel-planner-service + namespace: llamaindex-travel-planner +spec: + selector: + app: llamaindex-travel-planner + ports: + - name: http + port: 80 + targetPort: 8080 + type: ClusterIP diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/main_server.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/main_server.py new file mode 100644 index 0000000..bb6d2a9 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/main_server.py @@ -0,0 +1,331 @@ +""" +Travel Planner Server using LlamaIndex ReActAgent. + +This server exposes an HTTP endpoint for travel planning requests and uses +OpenTelemetry instrumentation to capture traces and metrics. +""" + +import asyncio +import json +import os +from datetime import datetime +from http.server import BaseHTTPRequestHandler, HTTPServer +from typing import Dict, Any + +from llama_index.core.agent import ReActAgent +from llama_index.core.tools import FunctionTool +from llama_index.llms.openai import OpenAI +from llama_index.core import Settings + +from opentelemetry import trace, metrics +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor + + +# Setup Telemetry +def setup_telemetry(): + """Initialize OpenTelemetry tracing and metrics. + + Service name and OTLP endpoint are configured via environment variables: + - OTEL_SERVICE_NAME + - OTEL_EXPORTER_OTLP_ENDPOINT + """ + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + BatchSpanProcessor(OTLPSpanExporter()) + ) + + metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter()) + metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader])) + + +# Define Travel Planning Tools +def search_flights(origin: str, destination: str, date: str) -> str: + """Search for flights between two cities on a specific date.""" + print(f" [Tool] Searching flights from {origin} to {destination} on {date}...") + + # Simulate flight search results + flight_price = 800 + return ( + f"Found Flight UA{abs(hash(origin + destination)) % 1000}: " + f"{origin} → {destination} on {date}, " + f"Price: ${flight_price}, " + f"Departure: 10:00 AM, Arrival: 2:00 PM" + ) + + +def search_hotels(city: str, check_in: str, check_out: str) -> str: + """Search for hotels in a city for given check-in and check-out dates.""" + print(f" [Tool] Searching hotels in {city} from {check_in} to {check_out}...") + + # Simulate hotel search results + nightly_rate = 200 + return ( + f"Found Hotel Grand {city}: " + f"Available from {check_in} to {check_out}, " + f"Rate: ${nightly_rate}/night, " + f"Rating: 4.5/5, Amenities: WiFi, Breakfast, Pool" + ) + + +def search_activities(city: str) -> str: + """Search for activities and attractions in a city.""" + print(f" [Tool] Searching activities in {city}...") + + activities = [ + f"City Tour of {city} - $50", + f"Food Tour in {city} - $80", + f"Museum Pass for {city} - $40", + ] + + return f"Recommended activities: {', '.join(activities)}" + + +# Global agent instances +_flight_agent = None +_hotel_agent = None +_activity_agent = None + + +def get_flight_agent(): + """Get or create the flight search agent.""" + global _flight_agent + if _flight_agent is None: + Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0) + tools = [FunctionTool.from_defaults(fn=search_flights)] + system_prompt = "You are a flight search specialist. Use the search_flights tool to find flights, then provide the result." + _flight_agent = ReActAgent( + tools=tools, + llm=Settings.llm, + verbose=True, + system_prompt=system_prompt + ) + return _flight_agent + + +def get_hotel_agent(): + """Get or create the hotel search agent.""" + global _hotel_agent + if _hotel_agent is None: + Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0) + tools = [FunctionTool.from_defaults(fn=search_hotels)] + system_prompt = "You are a hotel search specialist. Use the search_hotels tool to find hotels, then provide the result." + _hotel_agent = ReActAgent( + tools=tools, + llm=Settings.llm, + verbose=True, + system_prompt=system_prompt + ) + return _hotel_agent + + +def get_activity_agent(): + """Get or create the activity search agent.""" + global _activity_agent + if _activity_agent is None: + Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0) + tools = [FunctionTool.from_defaults(fn=search_activities)] + system_prompt = "You are an activity recommendation specialist. Use the search_activities tool to find activities, then provide the result." + _activity_agent = ReActAgent( + tools=tools, + llm=Settings.llm, + verbose=True, + system_prompt=system_prompt + ) + return _activity_agent + + +class TravelPlannerHandler(BaseHTTPRequestHandler): + """HTTP request handler for travel planning.""" + + def do_GET(self): + """Handle GET requests (health check).""" + if self.path == '/health': + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps({'status': 'healthy'}).encode()) + else: + self.send_response(404) + self.end_headers() + + def do_POST(self): + """Handle POST requests for travel planning.""" + if self.path == '/plan': + # Create a root span for the HTTP request + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span( + "POST /plan", + kind=trace.SpanKind.SERVER, + attributes={ + "http.method": "POST", + "http.target": "/plan", + "http.scheme": "http", + } + ) as span: + try: + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + request_data = json.loads(post_data.decode('utf-8')) + + # Extract parameters + destination = request_data.get('destination', 'Paris') + origin = request_data.get('origin', 'New York') + budget = request_data.get('budget', 3000) + duration = request_data.get('duration', 5) + travelers = request_data.get('travelers', 2) + interests = request_data.get('interests', ['sightseeing', 'food']) + departure_date = request_data.get('departure_date', '2024-06-01') + + print(f"\n{'='*60}") + print(f"New Travel Planning Request") + print(f"{'='*60}") + print(f"Destination: {destination}") + print(f"Origin: {origin}") + print(f"Budget: ${budget}") + print(f"Duration: {duration} days") + print(f"Travelers: {travelers}") + print(f"Interests: {', '.join(interests)}") + print(f"{'='*60}\n") + + # Calculate check-out date + from datetime import datetime, timedelta + check_in = datetime.strptime(departure_date, "%Y-%m-%d") + check_out = check_in + timedelta(days=duration) + check_out_date = check_out.strftime("%Y-%m-%d") + + # Run agents sequentially (like LangChain multi-agent approach) + async def run_agents(): + results = [] + + # 1. Flight specialist agent + print("\n--- Flight Specialist Agent ---") + flight_agent = get_flight_agent() + flight_query = f"Search for flights from {origin} to {destination} departing on {departure_date}" + flight_handler = flight_agent.run(user_msg=flight_query, max_iterations=3) + flight_response = await flight_handler + results.append(f"Flights: {flight_response}") + + # 2. Hotel specialist agent + print("\n--- Hotel Specialist Agent ---") + hotel_agent = get_hotel_agent() + hotel_query = f"Search for hotels in {destination} from {departure_date} to {check_out_date}" + hotel_handler = hotel_agent.run(user_msg=hotel_query, max_iterations=3) + hotel_response = await hotel_handler + results.append(f"Hotels: {hotel_response}") + + # 3. Activity specialist agent + print("\n--- Activity Specialist Agent ---") + activity_agent = get_activity_agent() + activity_query = f"Recommend activities in {destination}" + activity_handler = activity_agent.run(user_msg=activity_query, max_iterations=3) + activity_response = await activity_handler + results.append(f"Activities: {activity_response}") + + return "\n\n".join(results) + + try: + result = asyncio.run(run_agents()) + except RuntimeError as e: + if "asyncio.run() cannot be called from a running event loop" in str(e): + # Fallback for when event loop is already running + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete(run_agents()) + finally: + loop.close() + else: + raise + + print(f"\n{'='*60}") + print(f"Planning Complete") + print(f"{'='*60}\n") + + # Send response + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + + response_data = { + 'status': 'success', + 'request': request_data, + 'plan': result, + 'timestamp': datetime.utcnow().isoformat() + } + + self.wfile.write(json.dumps(response_data, indent=2).encode()) + span.set_attribute("http.status_code", 200) + + except Exception as e: + print(f"Error processing request: {e}") + import traceback + traceback.print_exc() + + span.set_attribute("http.status_code", 500) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) + + self.send_response(500) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps({ + 'status': 'error', + 'error': str(e) + }).encode()) + else: + self.send_response(404) + self.end_headers() + + def log_message(self, format, *args): + """Override to customize logging.""" + print(f"{self.address_string()} - {format % args}") + + +def main(): + """Start the travel planner server.""" + # Check for API Key + if not os.getenv("OPENAI_API_KEY"): + print("Error: OPENAI_API_KEY environment variable is not set.") + return 1 + + # Setup telemetry + setup_telemetry() + + # Auto-instrument LlamaIndex (captures telemetry automatically via callbacks) + LlamaindexInstrumentor().instrument() + + # Start HTTP server + port = int(os.getenv("PORT", "8080")) + server = HTTPServer(('0.0.0.0', port), TravelPlannerHandler) + + print(f"\n{'='*60}") + print(f"Travel Planner Server Starting") + print(f"{'='*60}") + print(f"Port: {port}") + print(f"Health check: http://localhost:{port}/health") + print(f"Planning endpoint: POST http://localhost:{port}/plan") + print(f"{'='*60}\n") + + try: + server.serve_forever() + except KeyboardInterrupt: + print("\nShutting down server...") + server.shutdown() + + # Flush telemetry + provider = trace.get_tracer_provider() + if hasattr(provider, 'force_flush'): + provider.force_flush() + if hasattr(provider, 'shutdown'): + provider.shutdown() + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/requirements.txt new file mode 100644 index 0000000..27af852 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/examples/travel_planner_k8s/requirements.txt @@ -0,0 +1,5 @@ +llama-index-core>=0.14.0 +llama-index-llms-openai>=0.6.0 +opentelemetry-api>=1.27.0 +opentelemetry-sdk>=1.27.0 +opentelemetry-exporter-otlp-proto-grpc>=1.27.0 diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py index bce3dce..f3898e5 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py @@ -145,6 +145,7 @@ def wrap_agent_run(wrapped, instance, args, kwargs): # Get TelemetryHandler from callback handler if available from llama_index.core import Settings from opentelemetry.util.genai.types import AgentInvocation + from opentelemetry import trace, context telemetry_handler = None for callback_handler in Settings.callback_manager.handlers: @@ -155,6 +156,7 @@ def wrap_agent_run(wrapped, instance, args, kwargs): # Create a root agent span immediately to ensure all subsequent calls # (LLM, tools) inherit this trace context root_agent = None + parent_context = None if telemetry_handler: from uuid import uuid4 @@ -172,11 +174,15 @@ def wrap_agent_run(wrapped, instance, args, kwargs): # This pushes (agent_name, run_id) onto the _agent_context_stack # and stores the span in _span_registry[run_id] telemetry_handler.start_agent(root_agent) + + # Capture the current context (which includes the active span) + # so we can propagate it to async tasks + parent_context = context.get_current() # Call the original run() method to get the workflow handler handler = wrapped(*args, **kwargs) - if telemetry_handler and root_agent: + if telemetry_handler and root_agent and parent_context: # Create workflow instrumentor for detailed step tracking instrumentor = WorkflowEventInstrumentor(telemetry_handler) @@ -186,18 +192,24 @@ def wrap_agent_run(wrapped, instance, args, kwargs): class InstrumentedHandler: """Wrapper that closes the root agent span when workflow completes.""" - def __init__(self, original, root_span_agent): + def __init__(self, original, root_span_agent, ctx): self._original = original self._root_agent = root_span_agent self._result = None + self._parent_context = ctx def __await__(self): # Start background task to instrument workflow events async def stream_events(): try: - await instrumentor.instrument_workflow_handler( - self._original, str(user_msg) - ) + # Attach the parent context before processing workflow events + token = context.attach(self._parent_context) + try: + await instrumentor.instrument_workflow_handler( + self._original, str(user_msg) + ) + finally: + context.detach(token) except Exception as e: import logging @@ -211,6 +223,8 @@ async def stream_events(): async def _await_impl(self): """Actual async implementation.""" + # Attach the parent context to ensure proper span hierarchy + token = context.attach(self._parent_context) try: self._result = await self._original self._root_agent.output_result = str(self._result) @@ -222,12 +236,14 @@ async def _await_impl(self): self._root_agent, Error(message=str(e), type=type(e)) ) raise + finally: + context.detach(token) return self._result def __getattr__(self, name): # Delegate all other attributes to the original handler return getattr(self._original, name) - handler = InstrumentedHandler(original_handler, root_agent) + handler = InstrumentedHandler(original_handler, root_agent, parent_context) return handler